Bài trước, Multi-agent patterns: supervisor, handoff, debate, tôi đã nói về cấu trúc hệ thống nhiều agent. Bài này đi sâu một tầng thấp hơn: khi nhiều agent chạy song song, chúng nói chuyện với nhau bằng cách nào?
Câu hỏi tưởng nhỏ. Thực ra là câu hỏi quyết định toàn bộ kiến trúc. Chọn sai ở đây, bài 18, 19 sẽ gặp hậu quả.
Hai mô hình
Trong distributed systems, câu hỏi này có hai trường phái lớn:
- Shared state (còn gọi là blackboard pattern): tất cả agent đọc và ghi vào một kho dữ liệu chung. Ai cần gì thì vào lấy, cập nhật xong thì ghi lại.
- Message passing (actor model, queue-based): agent không chia sẻ state. Muốn truyền thông tin thì gửi message. Người nhận xử lý khi sẵn sàng.
Cả hai đều đúng. Không có cái nào tốt hơn tuyệt đối. Vấn đề là hiểu trade-off để chọn đúng ngữ cảnh.
Shared state
Cách hoạt động
Hình dung một bảng trắng to (blackboard) trong phòng họp. Mỗi agent là một người đứng quanh bảng. Ai cần đọc thì nhìn bảng. Ai cần ghi thì bước tới viết.
Trong code, blackboard thường là:
- Dict trong RAM (single-process)
- Redis (multi-process, multi-machine)
- Database (durability cần thiết)
- File (đơn giản nhất, cũng chậm nhất)
import redis
import json
from anthropic import Anthropic
client = Anthropic()
r = redis.Redis(host="localhost", port=6379, decode_responses=True)
def researcher_agent(task_id: str, query: str):
"""Agent A: nghiên cứu, ghi kết quả lên blackboard."""
resp = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": f"Research: {query}"}],
)
result = resp.content[0].text
# Ghi lên shared state
r.hset(f"task:{task_id}", mapping={
"research": result,
"research_status": "done",
})
print(f"[Researcher] Wrote to blackboard: task:{task_id}")
def writer_agent(task_id: str):
"""Agent B: chờ research xong, đọc blackboard, viết bài."""
# Polling: đợi agent A viết xong
import time
for _ in range(30):
status = r.hget(f"task:{task_id}", "research_status")
if status == "done":
break
time.sleep(1)
else:
raise TimeoutError("Research agent did not finish in time")
research = r.hget(f"task:{task_id}", "research")
resp = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=2048,
messages=[{
"role": "user",
"content": f"Write a blog post based on this research:\n{research}"
}],
)
draft = resp.content[0].text
# Ghi kết quả mới lên blackboard
r.hset(f"task:{task_id}", mapping={
"draft": draft,
"writer_status": "done",
})
print(f"[Writer] Draft written to blackboard: task:{task_id}")
Ưu điểm
Đơn giản để bắt đầu. Không cần broker, không cần thiết kế message schema. Agent cần gì thì đọc key tương ứng. Debug dễ: mở Redis CLI, xem HGETALL task:123, biết ngay hệ thống đang ở trạng thái nào.
Truy cập tuỳ ý. Agent C có thể đọc kết quả của cả A và B cùng lúc mà không cần A và B phải “forward” gì. Phù hợp khi cần tổng hợp từ nhiều nguồn.
Observability tự nhiên. Toàn bộ state nằm một chỗ. Một dashboard query Redis là thấy mọi thứ.
Nhược điểm
Race condition. Khi hai agent cùng đọc-sửa-ghi cùng một key, kết quả không xác định. Ví dụ thật: một hệ thống review code tôi build có reviewer_A và reviewer_B cùng ghi vào review_comments. Cuối cùng chỉ có comment của agent nào chạy sau tồn tại. Mất nửa ngày debug mới tìm ra.
Fix chuẩn là dùng Redis transactions (MULTI/EXEC) hoặc atomic operations như LPUSH thay vì SET:
# Không an toàn: hai agent cùng SET sẽ overwrite nhau
r.set("comments", json.dumps(new_comment))
# An toàn hơn: mỗi agent APPEND vào list
r.rpush("comments", json.dumps(new_comment))
Stale state khi đọc song song. Đây là pitfall ít được nói đến nhất. Agent đọc state tại thời điểm T. Quyết định dựa trên state đó. Nhưng agent khác đã thay đổi state tại T+1. Quyết định của agent đầu giờ lỗi thời.
T=0: Agent A đọc: balance = 100
T=0: Agent B đọc: balance = 100
T=1: Agent A ghi: balance = 80 (trừ 20)
T=1: Agent B ghi: balance = 60 (trừ 40)
T=2: Thực tế: balance = 60, nhưng đúng phải là 40
Đây là lost update. Trong agent context, nó xảy ra khi nhiều agent đọc cùng một “fact” rồi cùng cập nhật state. Với Redis, dùng Lua script hoặc WATCH/MULTI/EXEC để tránh.
Không scale ngang tốt. Khi chuyển từ một Redis node sang cluster, key phải nằm cùng slot mới dùng được MULTI/EXEC. Thiết kế lại schema khó. Với DynamoDB hay Postgres, cost của strong consistency cao hơn eventual consistency.
Message passing
Cách hoạt động
Agent A không biết Agent B tồn tại. A chỉ biết: khi xong việc, publish một event. B subscribe event đó, nhận message, xử lý. A và B không bao giờ chạm trực tiếp vào state của nhau.
import redis
import json
import threading
from anthropic import Anthropic
client = Anthropic()
r = redis.Redis(host="localhost", port=6379, decode_responses=True)
CHANNEL_RESEARCH_DONE = "agent:research:done"
CHANNEL_DRAFT_DONE = "agent:draft:done"
def researcher_agent(task_id: str, query: str):
"""Agent A: nghiên cứu, publish kết quả."""
resp = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": f"Research: {query}"}],
)
result = resp.content[0].text
# Publish message, không ghi trực tiếp vào shared state
message = json.dumps({"task_id": task_id, "research": result})
r.publish(CHANNEL_RESEARCH_DONE, message)
print(f"[Researcher] Published to {CHANNEL_RESEARCH_DONE}")
def writer_agent():
"""Agent B: lắng nghe research events, xử lý từng cái."""
pubsub = r.pubsub()
pubsub.subscribe(CHANNEL_RESEARCH_DONE)
for raw_msg in pubsub.listen():
if raw_msg["type"] != "message":
continue
data = json.loads(raw_msg["data"])
task_id = data["task_id"]
research = data["research"]
resp = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=2048,
messages=[{
"role": "user",
"content": f"Write a blog post based on:\n{research}"
}],
)
draft = resp.content[0].text
# Publish tiếp cho agent kế tiếp
result_msg = json.dumps({"task_id": task_id, "draft": draft})
r.publish(CHANNEL_DRAFT_DONE, result_msg)
print(f"[Writer] Published draft for task {task_id}")
# Chạy writer trong background thread
t = threading.Thread(target=writer_agent, daemon=True)
t.start()
# Khởi động task
researcher_agent("task_001", "Latest trends in vector databases 2026")
Ưu điểm
Loosely coupled. Researcher không cần biết Writer tồn tại. Thêm một Formatter agent lắng nghe agent:draft:done mà không cần sửa Researcher hay Writer. Scale từng agent độc lập.
Không race condition trên state. Mỗi agent chỉ xử lý message trong mailbox của mình. Không ai ghi đè state của ai.
Ordering tự nhiên hơn. Message queue đảm bảo thứ tự (ít nhất trong cùng một channel). Với NATS hoặc Kafka, có thể replay message khi agent crash.
Dễ retry và backpressure. Message chưa được xử lý nằm trong queue. Agent crash rồi restart, message vẫn còn. Với shared state, nếu agent crash giữa chừng, state có thể ở trạng thái không nhất quán.
Nhược điểm
Orchestration overhead. Để biết cả pipeline đang ở trạng thái nào, phải reconstruct từ event log. Không có một chỗ nào “nhìn vào là biết”. Debug tốn công hơn.
Schema phải rõ từ đầu. Message schema là contract giữa các agent. Thay đổi schema của Researcher mà không cập nhật Writer là bug ngay. Với shared state, thêm field mới vào Redis không phá vỡ agent khác (chúng chỉ ignore field đó).
Latency pipeline. Mỗi hop qua queue thêm latency. Nếu pipeline có 5 agent nối tiếp qua queue, tổng latency là tổng của 5 lần enqueue/dequeue cộng processing time. Với shared state, các agent có thể poll nhanh hơn.
Trạng thái toàn cục khó thấy. Nếu user hỏi “task 123 đang ở đâu rồi”, shared state trả lời ngay. Message passing phải query nhiều nơi.
Hybrid: shared DB cộng event log
Trong thực tế, hầu hết hệ thống production dùng cả hai.
Pattern phổ biến nhất:
- Event log (Kafka, NATS, Redis Stream) để truyền thông điệp giữa agent.
- Shared DB (Postgres, DynamoDB) để lưu trạng thái cuối cùng (single source of truth).
Researcher ──publish──> NATS "research.done" ──> Writer
│
▼
Postgres tasks table
(task_id, draft, status)
│
Reviewer reads from DB
Agent không đọc state của nhau qua queue. Nhưng khi cần state đã settle, đọc từ DB. Queue chỉ để trigger.
LangGraph implement theo hướng này: graph state là shared (dict), nhưng transitions giữa node theo event (edge conditions). Mỗi node đọc và ghi vào state, nhưng execution order do graph topology quyết định, không phải agent tự poll.
# LangGraph style: shared state dict, orchestrated transitions
from langgraph.graph import StateGraph
from typing import TypedDict
class AgentState(TypedDict):
query: str
research: str
draft: str
status: str
def research_node(state: AgentState) -> AgentState:
# Đọc từ state, ghi vào state
result = run_researcher(state["query"])
return {"research": result, "status": "research_done"}
def writer_node(state: AgentState) -> AgentState:
draft = run_writer(state["research"])
return {"draft": draft, "status": "draft_done"}
graph = StateGraph(AgentState)
graph.add_node("researcher", research_node)
graph.add_node("writer", writer_node)
graph.add_edge("researcher", "writer")
CrewAI ngược lại: thiên về message passing. Agent giao tiếp qua task outputs, không share state trực tiếp. Mỗi agent nhận output của agent trước như một message.
Cheatsheet
| Tiêu chí | Shared state | Message passing |
|---|---|---|
| Setup ban đầu | Đơn giản, không cần broker | Cần queue/broker |
| Debug | Dễ: xem state là biết | Khó hơn: reconstruct từ events |
| Race condition | Có, phải lock | Không (per-agent mailbox) |
| Stale read | Nguy hiểm khi nhiều reader | Không áp dụng |
| Scale ngang | Khó hơn | Tốt hơn |
| Ordering | Phải tự đảm bảo | Queue đảm bảo |
| Schema coupling | Lỏng | Chặt (cần versioning) |
| Retry/replay | Phức tạp | Tự nhiên (queue) |
| Observability | Cao (one place) | Thấp hơn (distributed) |
| Phù hợp khi | Pipeline tuần tự, cần visibility | Fan-out, scale, decoupled teams |
Pitfall thật: stale state khi nhiều agent đọc song song
Câu chuyện này xảy ra trong một pipeline tôi build năm ngoái: một hệ thống có 3 agent chạy song song để review code từ 3 góc nhìn (security, performance, style). Cả ba đọc cùng một file diff từ Redis, sau đó mỗi cái ghi nhận xét vào review_results.
Thiết kế ban đầu (sai):
# Agent đọc, xử lý, rồi update shared dict
existing = json.loads(r.get("review_results") or "{}")
existing[agent_name] = my_comments
r.set("review_results", json.dumps(existing))
Kết quả: chỉ có comment của agent chạy cuối cùng còn lại. Ba agent đều đọc {}, đều tạo dict mới với key của mình, đều ghi đè nhau.
Fix đơn giản: dùng hash field thay vì một key duy nhất.
# Mỗi agent ghi vào field riêng, không đụng nhau
r.hset("review_results", agent_name, json.dumps(my_comments))
Fix đúng hơn cho production: dùng list append.
r.rpush("review_results", json.dumps({
"agent": agent_name,
"comments": my_comments,
"timestamp": time.time(),
}))
Bài học: khi thiết kế shared state, hỏi ngay “có agent nào khác cũng ghi vào đây không”. Nếu có, đừng dùng cấu trúc overwrite (SET, dict assignment). Dùng append-only (RPUSH, HSET với unique field).
Bài học thứ hai: stale read nguy hiểm hơn race condition write vì nó không báo lỗi. Code chạy, không crash, nhưng agent quyết định dựa trên thông tin cũ. Loại bug này chỉ thấy khi nhìn output và thắc mắc “sao nó lại làm vậy”.
Khi nào dùng cái nào
Không có công thức cứng, nhưng có heuristic:
Dùng shared state khi:
- Pipeline tuần tự, agent chạy nối tiếp, không song song.
- Cần visibility tức thì vào trạng thái toàn hệ thống.
- Team nhỏ, không cần scale ngang.
- Debug là ưu tiên cao hơn throughput.
- LangGraph hoặc framework state-based: họ đã giải quyết consistency, bạn chỉ cần dùng đúng cách.
Dùng message passing khi:
- Nhiều agent xử lý song song, độc lập.
- Cần retry tự động khi agent crash.
- Hệ thống cần scale ngang (thêm worker cho một loại agent).
- Team nhiều người, mỗi người maintain một agent, cần interface rõ ràng.
- CrewAI, NATS-based systems: message là first-class citizen.
Dùng hybrid khi:
- Cần cả hai: fanout nhanh (message) và single source of truth (DB).
- Pipeline phức tạp với nhiều loại agent, một số tuần tự một số song song.
- Production system với SLA: queue giúp buffer tải, DB giúp query state.
Implementation nhanh
Redis pub/sub: phù hợp prototype, không persist message khi subscriber offline.
pip install redis anthropic
Redis Streams: tốt hơn pub/sub vì persist, có consumer group, có ACK.
# Producer
r.xadd("agent:events", {"task_id": task_id, "data": json.dumps(payload)})
# Consumer (với consumer group)
msgs = r.xreadgroup("workers", "worker-1", {"agent:events": ">"}, count=1)
# Process...
r.xack("agent:events", "workers", msg_id)
NATS: nhẹ hơn Kafka, phù hợp microservice scale vừa. Python SDK: pip install nats-py.
import asyncio
import nats
async def run():
nc = await nats.connect("nats://localhost:4222")
async def research_done_handler(msg):
data = json.loads(msg.data.decode())
# Process...
await nc.publish("agent.draft.done", json.dumps(result).encode())
await nc.subscribe("agent.research.done", cb=research_done_handler)
await asyncio.sleep(3600) # giữ process sống
Lời kết
Shared state và message passing không phải đối lập. Chúng là hai công cụ với strength khác nhau. Hầu hết hệ thống production dùng cả hai: queue để di chuyển dữ liệu, DB để lưu trạng thái cuối.
Điều quan trọng nhất để nhớ: thiết kế communication trước khi viết agent logic. Chọn sai communication pattern là loại technical debt tốn nhiều nhất để sửa, vì nó ảnh hưởng đến tất cả agent trong hệ thống.
Bài tiếp theo, Specialized agent roles: planner, executor, reviewer, sẽ đi vào phân vai agent trong một hệ thống thực tế: ai làm gì, giao tiếp với nhau theo pattern nào, và tại sao việc tách vai đúng lại quan trọng hơn việc có nhiều agent.
Và nếu bạn đang tự hỏi “khi nào thì nên dùng framework như LangGraph hay CrewAI thay vì tự build”, bài 19 sẽ so sánh chi tiết trade-off của từng framework.