Learn how AI agents exchange information and coordinate actions through structured messages, communication patterns like pub-sub and request-response, and protocols for task delegation and consensus building.

This article is part of the free-to-read AI Agent Handbook
Communication Between Agents
In the previous chapter, you saw how multiple agents can work together by specializing in different tasks. But we glossed over a critical detail: how do agents actually talk to each other? When the guest agent finishes compiling dietary restrictions, how does it pass that information to the menu agent? When the research agent finds important data, how does it tell the writing agent?
Human teams communicate through language, gestures, emails, and meetings. AI agents need their own communication mechanisms. In this chapter, we'll explore how agents exchange information, coordinate their actions, and stay synchronized while working toward shared goals.
The Communication Challenge
Let's start with a concrete problem. Imagine you have two agents:
- Agent A (Research): Finds the current price of Bitcoin
- Agent B (Analysis): Determines if now is a good time to buy
Agent B needs the information Agent A discovers. But how does Agent A send that information? And how does Agent B know it's receiving price data rather than, say, historical trends or market sentiment?
Here's what makes agent communication tricky:
Different Contexts: Each agent has its own conversation history, system prompt, and state. When Agent A says "the price is $45,000," Agent B needs enough context to understand what that means.
Timing: Agent B might need to wait for Agent A to finish its research. How does it know when Agent A is done? What if Agent A encounters an error?
Format: Should Agent A send a simple number, a structured JSON object, or a natural language description? The format affects how easily Agent B can use the information.
Reliability: What if the message gets lost or corrupted? What if Agent A sends information that Agent B doesn't understand?
Let's see these challenges in action with a simple example:
1## Using Claude Sonnet 4.5 for agent communication
2import anthropic
3import json
4
5client = anthropic.Anthropic(api_key="ANTHROPIC_API_KEY")
6
7## Agent A: Research Agent
8def research_agent(query):
9 """
10 Researches information and returns findings.
11 """
12 system_prompt = """You are a research specialist.
13 When asked to research something, provide factual information in a clear format.
14 Always structure your response as JSON with 'topic', 'findings', and 'confidence' fields."""
15
16 response = client.messages.create(
17 model="claude-sonnet-4.5",
18 max_tokens=512,
19 system=system_prompt,
20 messages=[{"role": "user", "content": query}]
21 )
22
23 return response.content[0].text
24
25## Agent B: Analysis Agent
26def analysis_agent(research_data):
27 """
28 Analyzes research data and provides recommendations.
29 """
30 system_prompt = """You are an analysis specialist.
31 You receive research findings and provide actionable recommendations.
32 Focus on practical insights."""
33
34 response = client.messages.create(
35 model="claude-sonnet-4.5",
36 max_tokens=512,
37 system=system_prompt,
38 messages=[{"role": "user", "content": f"Analyze this research: {research_data}"}]
39 )
40
41 return response.content[0].text
42
43## Simple communication flow
44research_result = research_agent("What is the current sentiment around electric vehicles?")
45print(f"Research Agent found:\n{research_result}\n")
46
47analysis_result = analysis_agent(research_result)
48print(f"Analysis Agent recommends:\n{analysis_result}")1## Using Claude Sonnet 4.5 for agent communication
2import anthropic
3import json
4
5client = anthropic.Anthropic(api_key="ANTHROPIC_API_KEY")
6
7## Agent A: Research Agent
8def research_agent(query):
9 """
10 Researches information and returns findings.
11 """
12 system_prompt = """You are a research specialist.
13 When asked to research something, provide factual information in a clear format.
14 Always structure your response as JSON with 'topic', 'findings', and 'confidence' fields."""
15
16 response = client.messages.create(
17 model="claude-sonnet-4.5",
18 max_tokens=512,
19 system=system_prompt,
20 messages=[{"role": "user", "content": query}]
21 )
22
23 return response.content[0].text
24
25## Agent B: Analysis Agent
26def analysis_agent(research_data):
27 """
28 Analyzes research data and provides recommendations.
29 """
30 system_prompt = """You are an analysis specialist.
31 You receive research findings and provide actionable recommendations.
32 Focus on practical insights."""
33
34 response = client.messages.create(
35 model="claude-sonnet-4.5",
36 max_tokens=512,
37 system=system_prompt,
38 messages=[{"role": "user", "content": f"Analyze this research: {research_data}"}]
39 )
40
41 return response.content[0].text
42
43## Simple communication flow
44research_result = research_agent("What is the current sentiment around electric vehicles?")
45print(f"Research Agent found:\n{research_result}\n")
46
47analysis_result = analysis_agent(research_result)
48print(f"Analysis Agent recommends:\n{analysis_result}")This works, but notice the limitations. The analysis agent receives raw text from the research agent. It has to parse that text and hope it contains the information it needs. There's no guarantee the format will be consistent, no way to verify the message was understood correctly, and no mechanism for the analysis agent to ask follow-up questions.
We can do better.
Message Formats: Speaking a Common Language
The first step to better communication is establishing a shared format. Just as humans might agree to communicate via email with specific subject lines and structure, agents benefit from standardized message formats.
Natural Language Messages
The simplest approach is natural language. Agent A sends a message like "I found that Bitcoin is currently trading at $45,000 with high volatility." Agent B reads this and responds accordingly.
Advantages:
- Flexible and expressive
- Easy to understand when debugging
- Works well with language models' strengths
Disadvantages:
- Ambiguous (what does "high volatility" mean exactly?)
- Hard to parse programmatically
- Prone to misinterpretation
Natural language works well for high-level coordination where precision isn't critical. For example, a coordinator agent might tell worker agents: "Focus on the Q4 data" or "Prioritize accuracy over speed."
Structured Data Messages
For precise information exchange, structured formats like JSON work better:
1## Agent A sends structured data
2message = {
3 "from": "research_agent",
4 "to": "analysis_agent",
5 "timestamp": "2025-11-10T14:30:00Z",
6 "message_type": "research_findings",
7 "data": {
8 "topic": "Bitcoin price",
9 "current_price": 45000,
10 "currency": "USD",
11 "volatility": 0.15,
12 "confidence": 0.92
13 }
14}1## Agent A sends structured data
2message = {
3 "from": "research_agent",
4 "to": "analysis_agent",
5 "timestamp": "2025-11-10T14:30:00Z",
6 "message_type": "research_findings",
7 "data": {
8 "topic": "Bitcoin price",
9 "current_price": 45000,
10 "currency": "USD",
11 "volatility": 0.15,
12 "confidence": 0.92
13 }
14}Now Agent B knows exactly what it's receiving. The price is a number, not a string. The volatility is quantified. The confidence score indicates how reliable this information is. The timestamp shows when the data was collected.
Let's rebuild our example with structured communication:
1## Using Claude Sonnet 4.5 with structured message passing
2import anthropic
3import json
4from datetime import datetime
5
6client = anthropic.Anthropic(api_key="ANTHROPIC_API_KEY")
7
8class AgentMessage:
9 """
10 Standard message format for agent communication.
11 """
12 def __init__(self, sender, recipient, message_type, data):
13 self.sender = sender
14 self.recipient = recipient
15 self.message_type = message_type
16 self.data = data
17 self.timestamp = datetime.utcnow().isoformat()
18
19 def to_json(self):
20 return json.dumps({
21 "from": self.sender,
22 "to": self.recipient,
23 "type": self.message_type,
24 "timestamp": self.timestamp,
25 "data": self.data
26 }, indent=2)
27
28 @staticmethod
29 def from_json(json_str):
30 obj = json.loads(json_str)
31 msg = AgentMessage(
32 obj["from"],
33 obj["to"],
34 obj["type"],
35 obj["data"]
36 )
37 msg.timestamp = obj["timestamp"]
38 return msg
39
40def research_agent_v2(query):
41 """
42 Research agent that returns structured messages.
43 """
44 system_prompt = """You are a research specialist.
45 Return findings as JSON with these exact fields:
46 - topic: what you researched
47 - findings: key information discovered
48 - sources: where the information came from
49 - confidence: 0-1 score of how confident you are
50
51 Only return the JSON, nothing else."""
52
53 response = client.messages.create(
54 model="claude-sonnet-4.5",
55 max_tokens=512,
56 system=system_prompt,
57 messages=[{"role": "user", "content": query}]
58 )
59
60 # Parse the research findings
61 findings = json.loads(response.content[0].text)
62
63 # Wrap in a standard message
64 message = AgentMessage(
65 sender="research_agent",
66 recipient="analysis_agent",
67 message_type="research_complete",
68 data=findings
69 )
70
71 return message
72
73def analysis_agent_v2(message):
74 """
75 Analysis agent that expects structured messages.
76 """
77 # Verify message type
78 if message.message_type != "research_complete":
79 return AgentMessage(
80 sender="analysis_agent",
81 recipient=message.sender,
82 message_type="error",
83 data={"error": f"Expected 'research_complete', got '{message.message_type}'"}
84 )
85
86 system_prompt = """You are an analysis specialist.
87 You receive research findings as JSON and provide recommendations.
88 Return your analysis as JSON with:
89 - recommendation: your main recommendation
90 - reasoning: why you recommend this
91 - confidence: 0-1 score of confidence
92
93 Only return the JSON, nothing else."""
94
95 # Extract research data
96 research_data = json.dumps(message.data)
97
98 response = client.messages.create(
99 model="claude-sonnet-4.5",
100 max_tokens=512,
101 system=system_prompt,
102 messages=[{"role": "user", "content": f"Analyze: {research_data}"}]
103 )
104
105 analysis = json.loads(response.content[0].text)
106
107 # Return structured response
108 return AgentMessage(
109 sender="analysis_agent",
110 recipient="research_agent",
111 message_type="analysis_complete",
112 data=analysis
113 )
114
115## Example usage
116print("=== Structured Agent Communication ===\n")
117
118## Research agent does its work
119research_msg = research_agent_v2("Research the benefits of renewable energy")
120print(f"Research Agent sent:\n{research_msg.to_json()}\n")
121
122## Analysis agent receives and processes
123analysis_msg = analysis_agent_v2(research_msg)
124print(f"Analysis Agent sent:\n{analysis_msg.to_json()}")1## Using Claude Sonnet 4.5 with structured message passing
2import anthropic
3import json
4from datetime import datetime
5
6client = anthropic.Anthropic(api_key="ANTHROPIC_API_KEY")
7
8class AgentMessage:
9 """
10 Standard message format for agent communication.
11 """
12 def __init__(self, sender, recipient, message_type, data):
13 self.sender = sender
14 self.recipient = recipient
15 self.message_type = message_type
16 self.data = data
17 self.timestamp = datetime.utcnow().isoformat()
18
19 def to_json(self):
20 return json.dumps({
21 "from": self.sender,
22 "to": self.recipient,
23 "type": self.message_type,
24 "timestamp": self.timestamp,
25 "data": self.data
26 }, indent=2)
27
28 @staticmethod
29 def from_json(json_str):
30 obj = json.loads(json_str)
31 msg = AgentMessage(
32 obj["from"],
33 obj["to"],
34 obj["type"],
35 obj["data"]
36 )
37 msg.timestamp = obj["timestamp"]
38 return msg
39
40def research_agent_v2(query):
41 """
42 Research agent that returns structured messages.
43 """
44 system_prompt = """You are a research specialist.
45 Return findings as JSON with these exact fields:
46 - topic: what you researched
47 - findings: key information discovered
48 - sources: where the information came from
49 - confidence: 0-1 score of how confident you are
50
51 Only return the JSON, nothing else."""
52
53 response = client.messages.create(
54 model="claude-sonnet-4.5",
55 max_tokens=512,
56 system=system_prompt,
57 messages=[{"role": "user", "content": query}]
58 )
59
60 # Parse the research findings
61 findings = json.loads(response.content[0].text)
62
63 # Wrap in a standard message
64 message = AgentMessage(
65 sender="research_agent",
66 recipient="analysis_agent",
67 message_type="research_complete",
68 data=findings
69 )
70
71 return message
72
73def analysis_agent_v2(message):
74 """
75 Analysis agent that expects structured messages.
76 """
77 # Verify message type
78 if message.message_type != "research_complete":
79 return AgentMessage(
80 sender="analysis_agent",
81 recipient=message.sender,
82 message_type="error",
83 data={"error": f"Expected 'research_complete', got '{message.message_type}'"}
84 )
85
86 system_prompt = """You are an analysis specialist.
87 You receive research findings as JSON and provide recommendations.
88 Return your analysis as JSON with:
89 - recommendation: your main recommendation
90 - reasoning: why you recommend this
91 - confidence: 0-1 score of confidence
92
93 Only return the JSON, nothing else."""
94
95 # Extract research data
96 research_data = json.dumps(message.data)
97
98 response = client.messages.create(
99 model="claude-sonnet-4.5",
100 max_tokens=512,
101 system=system_prompt,
102 messages=[{"role": "user", "content": f"Analyze: {research_data}"}]
103 )
104
105 analysis = json.loads(response.content[0].text)
106
107 # Return structured response
108 return AgentMessage(
109 sender="analysis_agent",
110 recipient="research_agent",
111 message_type="analysis_complete",
112 data=analysis
113 )
114
115## Example usage
116print("=== Structured Agent Communication ===\n")
117
118## Research agent does its work
119research_msg = research_agent_v2("Research the benefits of renewable energy")
120print(f"Research Agent sent:\n{research_msg.to_json()}\n")
121
122## Analysis agent receives and processes
123analysis_msg = analysis_agent_v2(research_msg)
124print(f"Analysis Agent sent:\n{analysis_msg.to_json()}")This structured approach gives us several benefits:
Type Safety: The message_type field lets agents verify they're receiving the expected kind of message.
Traceability: Timestamps and sender/recipient fields create an audit trail of communication.
Error Handling: Agents can send error messages in the same format, making it easy to handle failures.
Extensibility: You can add new message types without breaking existing communication patterns.
Communication Patterns
Now that we have a message format, let's explore different ways agents can communicate.
Request-Response
The simplest pattern: Agent A sends a request, Agent B sends a response.
1Agent A: "What's the weather in Paris?"
2Agent B: "Sunny, 22°C"1Agent A: "What's the weather in Paris?"
2Agent B: "Sunny, 22°C"This is synchronous. Agent A waits for Agent B's response before continuing. It's straightforward but can be inefficient if Agent B takes a long time.
Fire-and-Forget
Agent A sends a message but doesn't wait for a response. It continues with other work.
1Agent A: "Log this event: user clicked button"
2Logger Agent: [receives message, logs it]
3Agent A: [already moved on to other tasks]1Agent A: "Log this event: user clicked button"
2Logger Agent: [receives message, logs it]
3Agent A: [already moved on to other tasks]This is asynchronous and efficient, but Agent A has no way to know if the message was received or processed successfully.
Publish-Subscribe
Multiple agents can subscribe to specific types of messages. When Agent A publishes a message, all subscribed agents receive it.
1Agent A: [publishes "new_data_available"]
2Agent B: [subscribed to "new_data_available", receives notification]
3Agent C: [subscribed to "new_data_available", receives notification]
4Agent D: [not subscribed, doesn't receive anything]1Agent A: [publishes "new_data_available"]
2Agent B: [subscribed to "new_data_available", receives notification]
3Agent C: [subscribed to "new_data_available", receives notification]
4Agent D: [not subscribed, doesn't receive anything]This pattern works well for broadcasting information to multiple interested parties without Agent A needing to know who they are.
Here's a simple implementation:
1## Using Claude Sonnet 4.5 for pub-sub coordination
2class MessageBroker:
3 """
4 Simple publish-subscribe message broker for agent communication.
5 """
6 def __init__(self):
7 self.subscriptions = {} # topic -> list of agents
8 self.message_history = []
9
10 def subscribe(self, topic, agent_name):
11 """
12 Agent subscribes to a topic.
13 """
14 if topic not in self.subscriptions:
15 self.subscriptions[topic] = []
16 if agent_name not in self.subscriptions[topic]:
17 self.subscriptions[topic].append(agent_name)
18 print(f"{agent_name} subscribed to '{topic}'")
19
20 def publish(self, topic, message):
21 """
22 Publish a message to all subscribers of a topic.
23 """
24 self.message_history.append({
25 "topic": topic,
26 "message": message,
27 "timestamp": datetime.utcnow().isoformat()
28 })
29
30 if topic in self.subscriptions:
31 subscribers = self.subscriptions[topic]
32 print(f"\nPublishing to '{topic}' ({len(subscribers)} subscribers)")
33 return subscribers
34 else:
35 print(f"\nPublishing to '{topic}' (no subscribers)")
36 return []
37
38 def get_history(self, topic=None):
39 """
40 Get message history, optionally filtered by topic.
41 """
42 if topic:
43 return [msg for msg in self.message_history if msg["topic"] == topic]
44 return self.message_history
45
46## Example: Multiple agents coordinating through pub-sub
47broker = MessageBroker()
48
49## Agents subscribe to topics they care about
50broker.subscribe("market_data", "trading_agent")
51broker.subscribe("market_data", "analysis_agent")
52broker.subscribe("market_data", "reporting_agent")
53broker.subscribe("user_actions", "logging_agent")
54broker.subscribe("user_actions", "analytics_agent")
55
56## When market data arrives, all interested agents are notified
57market_subscribers = broker.publish("market_data", {
58 "symbol": "AAPL",
59 "price": 178.50,
60 "change": 2.3
61})
62print(f"Notified: {', '.join(market_subscribers)}")
63
64## When a user action occurs, different agents are notified
65action_subscribers = broker.publish("user_actions", {
66 "action": "purchase",
67 "item": "premium_subscription"
68})
69print(f"Notified: {', '.join(action_subscribers)}")1## Using Claude Sonnet 4.5 for pub-sub coordination
2class MessageBroker:
3 """
4 Simple publish-subscribe message broker for agent communication.
5 """
6 def __init__(self):
7 self.subscriptions = {} # topic -> list of agents
8 self.message_history = []
9
10 def subscribe(self, topic, agent_name):
11 """
12 Agent subscribes to a topic.
13 """
14 if topic not in self.subscriptions:
15 self.subscriptions[topic] = []
16 if agent_name not in self.subscriptions[topic]:
17 self.subscriptions[topic].append(agent_name)
18 print(f"{agent_name} subscribed to '{topic}'")
19
20 def publish(self, topic, message):
21 """
22 Publish a message to all subscribers of a topic.
23 """
24 self.message_history.append({
25 "topic": topic,
26 "message": message,
27 "timestamp": datetime.utcnow().isoformat()
28 })
29
30 if topic in self.subscriptions:
31 subscribers = self.subscriptions[topic]
32 print(f"\nPublishing to '{topic}' ({len(subscribers)} subscribers)")
33 return subscribers
34 else:
35 print(f"\nPublishing to '{topic}' (no subscribers)")
36 return []
37
38 def get_history(self, topic=None):
39 """
40 Get message history, optionally filtered by topic.
41 """
42 if topic:
43 return [msg for msg in self.message_history if msg["topic"] == topic]
44 return self.message_history
45
46## Example: Multiple agents coordinating through pub-sub
47broker = MessageBroker()
48
49## Agents subscribe to topics they care about
50broker.subscribe("market_data", "trading_agent")
51broker.subscribe("market_data", "analysis_agent")
52broker.subscribe("market_data", "reporting_agent")
53broker.subscribe("user_actions", "logging_agent")
54broker.subscribe("user_actions", "analytics_agent")
55
56## When market data arrives, all interested agents are notified
57market_subscribers = broker.publish("market_data", {
58 "symbol": "AAPL",
59 "price": 178.50,
60 "change": 2.3
61})
62print(f"Notified: {', '.join(market_subscribers)}")
63
64## When a user action occurs, different agents are notified
65action_subscribers = broker.publish("user_actions", {
66 "action": "purchase",
67 "item": "premium_subscription"
68})
69print(f"Notified: {', '.join(action_subscribers)}")Conversation Threads
For complex coordination, agents might need back-and-forth conversations:
1Agent A: "I need to schedule a meeting with the team."
2Agent B: "What time works for you?"
3Agent A: "Tuesday afternoon."
4Agent B: "Tuesday at 2pm is available. Shall I book it?"
5Agent A: "Yes, please."
6Agent B: "Done. Meeting scheduled."1Agent A: "I need to schedule a meeting with the team."
2Agent B: "What time works for you?"
3Agent A: "Tuesday afternoon."
4Agent B: "Tuesday at 2pm is available. Shall I book it?"
5Agent A: "Yes, please."
6Agent B: "Done. Meeting scheduled."This requires maintaining conversation context. Each message needs to reference the thread it belongs to:
1class ConversationThread:
2 """
3 Manages a conversation thread between agents.
4 """
5 def __init__(self, thread_id, participants):
6 self.thread_id = thread_id
7 self.participants = participants
8 self.messages = []
9
10 def add_message(self, sender, content):
11 """
12 Add a message to the thread.
13 """
14 if sender not in self.participants:
15 raise ValueError(f"{sender} is not a participant in this thread")
16
17 message = {
18 "thread_id": self.thread_id,
19 "sender": sender,
20 "content": content,
21 "timestamp": datetime.utcnow().isoformat(),
22 "sequence": len(self.messages)
23 }
24 self.messages.append(message)
25 return message
26
27 def get_history(self):
28 """
29 Get the full conversation history.
30 """
31 return self.messages
32
33 def get_context_for_agent(self, agent_name):
34 """
35 Get conversation context formatted for a specific agent.
36 """
37 context = f"Conversation thread {self.thread_id}\n"
38 context += f"Participants: {', '.join(self.participants)}\n\n"
39 for msg in self.messages:
40 context += f"{msg['sender']}: {msg['content']}\n"
41 return context
42
43## Example: Two agents having a conversation
44thread = ConversationThread(
45 thread_id="meeting_planning_001",
46 participants=["coordinator_agent", "calendar_agent"]
47)
48
49thread.add_message("coordinator_agent", "I need to schedule a team meeting for next week.")
50thread.add_message("calendar_agent", "I can see Tuesday 2pm or Thursday 3pm are available. Which works better?")
51thread.add_message("coordinator_agent", "Tuesday 2pm works. Please book it.")
52thread.add_message("calendar_agent", "Meeting scheduled for Tuesday at 2pm. I've sent invites to all team members.")
53
54print("=== Conversation Thread ===")
55print(thread.get_context_for_agent("coordinator_agent"))1class ConversationThread:
2 """
3 Manages a conversation thread between agents.
4 """
5 def __init__(self, thread_id, participants):
6 self.thread_id = thread_id
7 self.participants = participants
8 self.messages = []
9
10 def add_message(self, sender, content):
11 """
12 Add a message to the thread.
13 """
14 if sender not in self.participants:
15 raise ValueError(f"{sender} is not a participant in this thread")
16
17 message = {
18 "thread_id": self.thread_id,
19 "sender": sender,
20 "content": content,
21 "timestamp": datetime.utcnow().isoformat(),
22 "sequence": len(self.messages)
23 }
24 self.messages.append(message)
25 return message
26
27 def get_history(self):
28 """
29 Get the full conversation history.
30 """
31 return self.messages
32
33 def get_context_for_agent(self, agent_name):
34 """
35 Get conversation context formatted for a specific agent.
36 """
37 context = f"Conversation thread {self.thread_id}\n"
38 context += f"Participants: {', '.join(self.participants)}\n\n"
39 for msg in self.messages:
40 context += f"{msg['sender']}: {msg['content']}\n"
41 return context
42
43## Example: Two agents having a conversation
44thread = ConversationThread(
45 thread_id="meeting_planning_001",
46 participants=["coordinator_agent", "calendar_agent"]
47)
48
49thread.add_message("coordinator_agent", "I need to schedule a team meeting for next week.")
50thread.add_message("calendar_agent", "I can see Tuesday 2pm or Thursday 3pm are available. Which works better?")
51thread.add_message("coordinator_agent", "Tuesday 2pm works. Please book it.")
52thread.add_message("calendar_agent", "Meeting scheduled for Tuesday at 2pm. I've sent invites to all team members.")
53
54print("=== Conversation Thread ===")
55print(thread.get_context_for_agent("coordinator_agent"))Coordination Protocols
Message formats and patterns are building blocks. Coordination protocols define the rules for how agents work together on specific types of tasks.
Task Delegation Protocol
When one agent needs another agent to do work:
- Request: Coordinator sends a task with clear requirements
- Acknowledgment: Worker confirms it received the task
- Progress Updates: Worker sends periodic status updates
- Completion: Worker sends results or error message
- Confirmation: Coordinator acknowledges receipt
1## Using Claude Sonnet 4.5 for task delegation
2class TaskDelegationProtocol:
3 """
4 Protocol for delegating tasks between agents.
5 """
6 def __init__(self):
7 self.active_tasks = {}
8
9 def request_task(self, task_id, from_agent, to_agent, task_description):
10 """
11 Coordinator requests a task from a worker.
12 """
13 task = {
14 "task_id": task_id,
15 "coordinator": from_agent,
16 "worker": to_agent,
17 "description": task_description,
18 "status": "requested",
19 "created_at": datetime.utcnow().isoformat(),
20 "updates": []
21 }
22 self.active_tasks[task_id] = task
23
24 return AgentMessage(
25 sender=from_agent,
26 recipient=to_agent,
27 message_type="task_request",
28 data=task
29 )
30
31 def acknowledge_task(self, task_id, worker_agent):
32 """
33 Worker acknowledges receiving the task.
34 """
35 if task_id not in self.active_tasks:
36 raise ValueError(f"Unknown task: {task_id}")
37
38 task = self.active_tasks[task_id]
39 task["status"] = "in_progress"
40 task["updates"].append({
41 "timestamp": datetime.utcnow().isoformat(),
42 "message": "Task acknowledged and started"
43 })
44
45 return AgentMessage(
46 sender=worker_agent,
47 recipient=task["coordinator"],
48 message_type="task_acknowledged",
49 data={"task_id": task_id}
50 )
51
52 def update_progress(self, task_id, worker_agent, progress_message):
53 """
54 Worker sends a progress update.
55 """
56 if task_id not in self.active_tasks:
57 raise ValueError(f"Unknown task: {task_id}")
58
59 task = self.active_tasks[task_id]
60 task["updates"].append({
61 "timestamp": datetime.utcnow().isoformat(),
62 "message": progress_message
63 })
64
65 return AgentMessage(
66 sender=worker_agent,
67 recipient=task["coordinator"],
68 message_type="task_progress",
69 data={
70 "task_id": task_id,
71 "progress": progress_message
72 }
73 )
74
75 def complete_task(self, task_id, worker_agent, results):
76 """
77 Worker completes the task and sends results.
78 """
79 if task_id not in self.active_tasks:
80 raise ValueError(f"Unknown task: {task_id}")
81
82 task = self.active_tasks[task_id]
83 task["status"] = "completed"
84 task["results"] = results
85 task["completed_at"] = datetime.utcnow().isoformat()
86
87 return AgentMessage(
88 sender=worker_agent,
89 recipient=task["coordinator"],
90 message_type="task_complete",
91 data={
92 "task_id": task_id,
93 "results": results
94 }
95 )
96
97 def get_task_status(self, task_id):
98 """
99 Get the current status of a task.
100 """
101 return self.active_tasks.get(task_id)
102
103## Example usage
104protocol = TaskDelegationProtocol()
105
106print("=== Task Delegation Protocol ===\n")
107
108## Step 1: Coordinator requests a task
109request_msg = protocol.request_task(
110 task_id="research_001",
111 from_agent="coordinator",
112 to_agent="research_agent",
113 task_description="Research the top 3 cloud providers and compare pricing"
114)
115print(f"1. Task Requested:\n{request_msg.to_json()}\n")
116
117## Step 2: Worker acknowledges
118ack_msg = protocol.acknowledge_task("research_001", "research_agent")
119print(f"2. Task Acknowledged:\n{ack_msg.to_json()}\n")
120
121## Step 3: Worker sends progress update
122progress_msg = protocol.update_progress(
123 "research_001",
124 "research_agent",
125 "Completed research on AWS, starting Azure"
126)
127print(f"3. Progress Update:\n{progress_msg.to_json()}\n")
128
129## Step 4: Worker completes task
130complete_msg = protocol.complete_task(
131 "research_001",
132 "research_agent",
133 {
134 "providers": ["AWS", "Azure", "Google Cloud"],
135 "comparison": "Detailed pricing comparison...",
136 "recommendation": "AWS offers best value for your use case"
137 }
138)
139print(f"4. Task Complete:\n{complete_msg.to_json()}\n")
140
141## Check final status
142status = protocol.get_task_status("research_001")
143print(f"Final Status: {status['status']}")
144print(f"Updates: {len(status['updates'])} progress updates")1## Using Claude Sonnet 4.5 for task delegation
2class TaskDelegationProtocol:
3 """
4 Protocol for delegating tasks between agents.
5 """
6 def __init__(self):
7 self.active_tasks = {}
8
9 def request_task(self, task_id, from_agent, to_agent, task_description):
10 """
11 Coordinator requests a task from a worker.
12 """
13 task = {
14 "task_id": task_id,
15 "coordinator": from_agent,
16 "worker": to_agent,
17 "description": task_description,
18 "status": "requested",
19 "created_at": datetime.utcnow().isoformat(),
20 "updates": []
21 }
22 self.active_tasks[task_id] = task
23
24 return AgentMessage(
25 sender=from_agent,
26 recipient=to_agent,
27 message_type="task_request",
28 data=task
29 )
30
31 def acknowledge_task(self, task_id, worker_agent):
32 """
33 Worker acknowledges receiving the task.
34 """
35 if task_id not in self.active_tasks:
36 raise ValueError(f"Unknown task: {task_id}")
37
38 task = self.active_tasks[task_id]
39 task["status"] = "in_progress"
40 task["updates"].append({
41 "timestamp": datetime.utcnow().isoformat(),
42 "message": "Task acknowledged and started"
43 })
44
45 return AgentMessage(
46 sender=worker_agent,
47 recipient=task["coordinator"],
48 message_type="task_acknowledged",
49 data={"task_id": task_id}
50 )
51
52 def update_progress(self, task_id, worker_agent, progress_message):
53 """
54 Worker sends a progress update.
55 """
56 if task_id not in self.active_tasks:
57 raise ValueError(f"Unknown task: {task_id}")
58
59 task = self.active_tasks[task_id]
60 task["updates"].append({
61 "timestamp": datetime.utcnow().isoformat(),
62 "message": progress_message
63 })
64
65 return AgentMessage(
66 sender=worker_agent,
67 recipient=task["coordinator"],
68 message_type="task_progress",
69 data={
70 "task_id": task_id,
71 "progress": progress_message
72 }
73 )
74
75 def complete_task(self, task_id, worker_agent, results):
76 """
77 Worker completes the task and sends results.
78 """
79 if task_id not in self.active_tasks:
80 raise ValueError(f"Unknown task: {task_id}")
81
82 task = self.active_tasks[task_id]
83 task["status"] = "completed"
84 task["results"] = results
85 task["completed_at"] = datetime.utcnow().isoformat()
86
87 return AgentMessage(
88 sender=worker_agent,
89 recipient=task["coordinator"],
90 message_type="task_complete",
91 data={
92 "task_id": task_id,
93 "results": results
94 }
95 )
96
97 def get_task_status(self, task_id):
98 """
99 Get the current status of a task.
100 """
101 return self.active_tasks.get(task_id)
102
103## Example usage
104protocol = TaskDelegationProtocol()
105
106print("=== Task Delegation Protocol ===\n")
107
108## Step 1: Coordinator requests a task
109request_msg = protocol.request_task(
110 task_id="research_001",
111 from_agent="coordinator",
112 to_agent="research_agent",
113 task_description="Research the top 3 cloud providers and compare pricing"
114)
115print(f"1. Task Requested:\n{request_msg.to_json()}\n")
116
117## Step 2: Worker acknowledges
118ack_msg = protocol.acknowledge_task("research_001", "research_agent")
119print(f"2. Task Acknowledged:\n{ack_msg.to_json()}\n")
120
121## Step 3: Worker sends progress update
122progress_msg = protocol.update_progress(
123 "research_001",
124 "research_agent",
125 "Completed research on AWS, starting Azure"
126)
127print(f"3. Progress Update:\n{progress_msg.to_json()}\n")
128
129## Step 4: Worker completes task
130complete_msg = protocol.complete_task(
131 "research_001",
132 "research_agent",
133 {
134 "providers": ["AWS", "Azure", "Google Cloud"],
135 "comparison": "Detailed pricing comparison...",
136 "recommendation": "AWS offers best value for your use case"
137 }
138)
139print(f"4. Task Complete:\n{complete_msg.to_json()}\n")
140
141## Check final status
142status = protocol.get_task_status("research_001")
143print(f"Final Status: {status['status']}")
144print(f"Updates: {len(status['updates'])} progress updates")This protocol ensures both agents stay synchronized. The coordinator knows the task is being worked on, can track progress, and receives results in a predictable format. The worker has a clear structure for communicating its status.
Consensus Protocol
When multiple agents need to agree on something:
- Proposal: One agent proposes a decision
- Voting: All agents vote (agree, disagree, abstain)
- Tally: Votes are counted according to rules (majority, unanimous, weighted)
- Decision: Result is announced to all agents
1class ConsensusProtocol:
2 """
3 Simple consensus protocol for agent decision-making.
4 """
5 def __init__(self, participants, threshold=0.5):
6 self.participants = participants
7 self.threshold = threshold # Fraction of votes needed to pass
8 self.proposals = {}
9
10 def propose(self, proposal_id, proposer, description):
11 """
12 An agent proposes something for the group to decide.
13 """
14 if proposer not in self.participants:
15 raise ValueError(f"{proposer} is not a participant")
16
17 self.proposals[proposal_id] = {
18 "id": proposal_id,
19 "proposer": proposer,
20 "description": description,
21 "votes": {},
22 "status": "voting",
23 "created_at": datetime.utcnow().isoformat()
24 }
25
26 print(f"\n{proposer} proposed: {description}")
27 print(f"Proposal ID: {proposal_id}")
28 print(f"Waiting for votes from: {', '.join(self.participants)}")
29
30 def vote(self, proposal_id, voter, vote):
31 """
32 An agent votes on a proposal (True for agree, False for disagree).
33 """
34 if proposal_id not in self.proposals:
35 raise ValueError(f"Unknown proposal: {proposal_id}")
36
37 if voter not in self.participants:
38 raise ValueError(f"{voter} is not a participant")
39
40 proposal = self.proposals[proposal_id]
41 proposal["votes"][voter] = vote
42
43 print(f"{voter} voted: {'agree' if vote else 'disagree'}")
44
45 # Check if all votes are in
46 if len(proposal["votes"]) == len(self.participants):
47 self._finalize_proposal(proposal_id)
48
49 def _finalize_proposal(self, proposal_id):
50 """
51 Tally votes and finalize the proposal.
52 """
53 proposal = self.proposals[proposal_id]
54 votes = proposal["votes"]
55
56 agree_count = sum(1 for v in votes.values() if v)
57 total_votes = len(votes)
58 agreement_ratio = agree_count / total_votes
59
60 if agreement_ratio >= self.threshold:
61 proposal["status"] = "accepted"
62 result = "ACCEPTED"
63 else:
64 proposal["status"] = "rejected"
65 result = "REJECTED"
66
67 proposal["finalized_at"] = datetime.utcnow().isoformat()
68 proposal["agreement_ratio"] = agreement_ratio
69
70 print(f"\nProposal {proposal_id}: {result}")
71 print(f"Votes: {agree_count}/{total_votes} agreed ({agreement_ratio:.0%})")
72 print(f"Threshold: {self.threshold:.0%}")
73
74 def get_result(self, proposal_id):
75 """
76 Get the result of a proposal.
77 """
78 return self.proposals.get(proposal_id)
79
80## Example: Agents reaching consensus
81agents = ["agent_a", "agent_b", "agent_c", "agent_d"]
82consensus = ConsensusProtocol(agents, threshold=0.75)
83
84## Propose a decision
85consensus.propose(
86 "upgrade_001",
87 "agent_a",
88 "Upgrade to the new model version (higher cost but better performance)"
89)
90
91## Agents vote
92consensus.vote("upgrade_001", "agent_a", True) # Proposer agrees
93consensus.vote("upgrade_001", "agent_b", True) # Agent B agrees
94consensus.vote("upgrade_001", "agent_c", True) # Agent C agrees
95consensus.vote("upgrade_001", "agent_d", False) # Agent D disagrees
96
97## Result is automatically finalized when all votes are in
98result = consensus.get_result("upgrade_001")
99print(f"\nFinal decision: {result['status']}")1class ConsensusProtocol:
2 """
3 Simple consensus protocol for agent decision-making.
4 """
5 def __init__(self, participants, threshold=0.5):
6 self.participants = participants
7 self.threshold = threshold # Fraction of votes needed to pass
8 self.proposals = {}
9
10 def propose(self, proposal_id, proposer, description):
11 """
12 An agent proposes something for the group to decide.
13 """
14 if proposer not in self.participants:
15 raise ValueError(f"{proposer} is not a participant")
16
17 self.proposals[proposal_id] = {
18 "id": proposal_id,
19 "proposer": proposer,
20 "description": description,
21 "votes": {},
22 "status": "voting",
23 "created_at": datetime.utcnow().isoformat()
24 }
25
26 print(f"\n{proposer} proposed: {description}")
27 print(f"Proposal ID: {proposal_id}")
28 print(f"Waiting for votes from: {', '.join(self.participants)}")
29
30 def vote(self, proposal_id, voter, vote):
31 """
32 An agent votes on a proposal (True for agree, False for disagree).
33 """
34 if proposal_id not in self.proposals:
35 raise ValueError(f"Unknown proposal: {proposal_id}")
36
37 if voter not in self.participants:
38 raise ValueError(f"{voter} is not a participant")
39
40 proposal = self.proposals[proposal_id]
41 proposal["votes"][voter] = vote
42
43 print(f"{voter} voted: {'agree' if vote else 'disagree'}")
44
45 # Check if all votes are in
46 if len(proposal["votes"]) == len(self.participants):
47 self._finalize_proposal(proposal_id)
48
49 def _finalize_proposal(self, proposal_id):
50 """
51 Tally votes and finalize the proposal.
52 """
53 proposal = self.proposals[proposal_id]
54 votes = proposal["votes"]
55
56 agree_count = sum(1 for v in votes.values() if v)
57 total_votes = len(votes)
58 agreement_ratio = agree_count / total_votes
59
60 if agreement_ratio >= self.threshold:
61 proposal["status"] = "accepted"
62 result = "ACCEPTED"
63 else:
64 proposal["status"] = "rejected"
65 result = "REJECTED"
66
67 proposal["finalized_at"] = datetime.utcnow().isoformat()
68 proposal["agreement_ratio"] = agreement_ratio
69
70 print(f"\nProposal {proposal_id}: {result}")
71 print(f"Votes: {agree_count}/{total_votes} agreed ({agreement_ratio:.0%})")
72 print(f"Threshold: {self.threshold:.0%}")
73
74 def get_result(self, proposal_id):
75 """
76 Get the result of a proposal.
77 """
78 return self.proposals.get(proposal_id)
79
80## Example: Agents reaching consensus
81agents = ["agent_a", "agent_b", "agent_c", "agent_d"]
82consensus = ConsensusProtocol(agents, threshold=0.75)
83
84## Propose a decision
85consensus.propose(
86 "upgrade_001",
87 "agent_a",
88 "Upgrade to the new model version (higher cost but better performance)"
89)
90
91## Agents vote
92consensus.vote("upgrade_001", "agent_a", True) # Proposer agrees
93consensus.vote("upgrade_001", "agent_b", True) # Agent B agrees
94consensus.vote("upgrade_001", "agent_c", True) # Agent C agrees
95consensus.vote("upgrade_001", "agent_d", False) # Agent D disagrees
96
97## Result is automatically finalized when all votes are in
98result = consensus.get_result("upgrade_001")
99print(f"\nFinal decision: {result['status']}")Handling Communication Failures
Communication between agents isn't always perfect. Networks fail, messages get lost, agents crash. Robust multi-agent systems need to handle these failures gracefully.
Timeouts
If Agent A sends a request to Agent B and doesn't get a response within a reasonable time, it should timeout and either retry or report an error:
1import time
2
3def send_with_timeout(sender_func, timeout_seconds=5):
4 """
5 Send a message and wait for response with timeout.
6 """
7 start_time = time.time()
8
9 try:
10 # In a real system, this would be async
11 response = sender_func()
12 elapsed = time.time() - start_time
13
14 if elapsed > timeout_seconds:
15 return {
16 "status": "timeout",
17 "message": f"No response after {timeout_seconds} seconds"
18 }
19
20 return {
21 "status": "success",
22 "response": response
23 }
24 except Exception as e:
25 return {
26 "status": "error",
27 "message": str(e)
28 }1import time
2
3def send_with_timeout(sender_func, timeout_seconds=5):
4 """
5 Send a message and wait for response with timeout.
6 """
7 start_time = time.time()
8
9 try:
10 # In a real system, this would be async
11 response = sender_func()
12 elapsed = time.time() - start_time
13
14 if elapsed > timeout_seconds:
15 return {
16 "status": "timeout",
17 "message": f"No response after {timeout_seconds} seconds"
18 }
19
20 return {
21 "status": "success",
22 "response": response
23 }
24 except Exception as e:
25 return {
26 "status": "error",
27 "message": str(e)
28 }Retries
For transient failures, retrying can help:
1def send_with_retry(sender_func, max_retries=3):
2 """
3 Send a message with automatic retries on failure.
4 """
5 for attempt in range(max_retries):
6 try:
7 response = sender_func()
8 return {
9 "status": "success",
10 "response": response,
11 "attempts": attempt + 1
12 }
13 except Exception as e:
14 if attempt == max_retries - 1:
15 # Last attempt failed
16 return {
17 "status": "failed",
18 "error": str(e),
19 "attempts": max_retries
20 }
21 # Wait before retrying (exponential backoff)
22 time.sleep(2 ** attempt)1def send_with_retry(sender_func, max_retries=3):
2 """
3 Send a message with automatic retries on failure.
4 """
5 for attempt in range(max_retries):
6 try:
7 response = sender_func()
8 return {
9 "status": "success",
10 "response": response,
11 "attempts": attempt + 1
12 }
13 except Exception as e:
14 if attempt == max_retries - 1:
15 # Last attempt failed
16 return {
17 "status": "failed",
18 "error": str(e),
19 "attempts": max_retries
20 }
21 # Wait before retrying (exponential backoff)
22 time.sleep(2 ** attempt)Acknowledgments
For critical messages, require acknowledgments:
1class ReliableMessaging:
2 """
3 Ensures messages are received by requiring acknowledgments.
4 """
5 def __init__(self):
6 self.pending_acks = {}
7
8 def send_reliable(self, message_id, sender, recipient, content):
9 """
10 Send a message that requires acknowledgment.
11 """
12 self.pending_acks[message_id] = {
13 "sender": sender,
14 "recipient": recipient,
15 "content": content,
16 "sent_at": datetime.utcnow().isoformat(),
17 "acknowledged": False
18 }
19
20 return AgentMessage(
21 sender=sender,
22 recipient=recipient,
23 message_type="reliable_message",
24 data={
25 "message_id": message_id,
26 "content": content,
27 "requires_ack": True
28 }
29 )
30
31 def acknowledge(self, message_id, recipient):
32 """
33 Recipient acknowledges receiving the message.
34 """
35 if message_id not in self.pending_acks:
36 raise ValueError(f"Unknown message: {message_id}")
37
38 self.pending_acks[message_id]["acknowledged"] = True
39 self.pending_acks[message_id]["acked_at"] = datetime.utcnow().isoformat()
40
41 msg = self.pending_acks[message_id]
42 return AgentMessage(
43 sender=recipient,
44 recipient=msg["sender"],
45 message_type="acknowledgment",
46 data={"message_id": message_id}
47 )
48
49 def check_pending(self):
50 """
51 Check for messages that haven't been acknowledged.
52 """
53 return {
54 msg_id: msg
55 for msg_id, msg in self.pending_acks.items()
56 if not msg["acknowledged"]
57 }1class ReliableMessaging:
2 """
3 Ensures messages are received by requiring acknowledgments.
4 """
5 def __init__(self):
6 self.pending_acks = {}
7
8 def send_reliable(self, message_id, sender, recipient, content):
9 """
10 Send a message that requires acknowledgment.
11 """
12 self.pending_acks[message_id] = {
13 "sender": sender,
14 "recipient": recipient,
15 "content": content,
16 "sent_at": datetime.utcnow().isoformat(),
17 "acknowledged": False
18 }
19
20 return AgentMessage(
21 sender=sender,
22 recipient=recipient,
23 message_type="reliable_message",
24 data={
25 "message_id": message_id,
26 "content": content,
27 "requires_ack": True
28 }
29 )
30
31 def acknowledge(self, message_id, recipient):
32 """
33 Recipient acknowledges receiving the message.
34 """
35 if message_id not in self.pending_acks:
36 raise ValueError(f"Unknown message: {message_id}")
37
38 self.pending_acks[message_id]["acknowledged"] = True
39 self.pending_acks[message_id]["acked_at"] = datetime.utcnow().isoformat()
40
41 msg = self.pending_acks[message_id]
42 return AgentMessage(
43 sender=recipient,
44 recipient=msg["sender"],
45 message_type="acknowledgment",
46 data={"message_id": message_id}
47 )
48
49 def check_pending(self):
50 """
51 Check for messages that haven't been acknowledged.
52 """
53 return {
54 msg_id: msg
55 for msg_id, msg in self.pending_acks.items()
56 if not msg["acknowledged"]
57 }Real-World Communication: The A2A Protocol
Earlier we discussed the Agent-to-Agent (A2A) Protocol as a framework for multi-agent systems. Now let's look specifically at how A2A handles communication.
A2A defines standard message formats and communication patterns that work across different platforms. When agents use A2A, they can communicate even if one is built with Claude, another with GPT-5, and a third with Gemini.
Key A2A Communication Features:
Agent Discovery: Agents can find other agents by querying a registry. "Are there any research agents available?" returns a list of agents with their capabilities.
Capability Negotiation: Before communicating, agents can check if they speak compatible "languages." Agent A can ask Agent B: "Do you accept JSON input?" or "Can you return image data?"
Standardized Message Envelope: All A2A messages include metadata (sender, recipient, timestamp, message type) in a consistent format, similar to our AgentMessage class but with additional fields for authentication and routing.
Task Lifecycle Management: A2A includes built-in protocols for task delegation, progress tracking, and completion, similar to our TaskDelegationProtocol but standardized across all A2A-compatible agents.
Error Handling: A2A defines standard error codes and formats, so agents can understand what went wrong even across different implementations.
While we won't implement the full A2A protocol here (it's complex and still evolving), the patterns we've explored in this chapter align with A2A's design principles. When you build multi-agent systems, following similar patterns will make your agents more interoperable and maintainable.
Practical Considerations
As you design communication between your agents, keep these principles in mind:
Keep Messages Small: Large messages slow down communication and increase the chance of errors. If you need to share a big dataset, send a reference (like a file path or database query) rather than the data itself.
Be Explicit: Don't assume the receiving agent has context. Include enough information in each message for it to be understood independently.
Version Your Protocols: As your system evolves, message formats might change. Include version numbers so agents can handle different formats gracefully.
Log Everything: Communication logs are invaluable for debugging. When something goes wrong, being able to trace the message flow helps you find the problem quickly.
Design for Failure: Assume messages will get lost, delayed, or corrupted. Build in timeouts, retries, and error handling from the start.
Test Communication Separately: Before building complex multi-agent workflows, test that your agents can reliably exchange simple messages. Get the communication layer working first, then add sophisticated behaviors.
Looking Ahead
You now understand how agents communicate: through structured messages, following specific patterns (request-response, pub-sub, conversations), using coordination protocols (task delegation, consensus), and handling failures gracefully.
In the next chapter, we'll explore the benefits and challenges of multi-agent systems in more depth. You'll learn when the added complexity of multiple agents is worth it, what problems can arise, and how to design multi-agent systems that are robust, efficient, and maintainable.
The key insight from this chapter is that communication is the foundation of collaboration. Just as human teams need clear communication channels and protocols, AI agents need well-designed message formats and coordination mechanisms. Get the communication right, and your agents can accomplish remarkable things together.
Glossary
Acknowledgment: A message sent by a recipient to confirm that it received and understood a previous message, used to ensure reliable communication.
Asynchronous Communication: A pattern where the sender doesn't wait for a response and continues with other work, useful for fire-and-forget messages and parallel processing.
Consensus Protocol: A set of rules for how multiple agents reach agreement on decisions through proposals, voting, and tallying results according to defined thresholds.
Conversation Thread: A sequence of related messages between agents, maintained with a thread ID and message history to preserve context across multiple exchanges.
Message Broker: A component that manages publish-subscribe communication, routing messages from publishers to all subscribed agents based on topics.
Message Envelope: The wrapper around message content that includes metadata like sender, recipient, timestamp, and message type, enabling proper routing and handling.
Publish-Subscribe: A communication pattern where agents subscribe to topics of interest and receive all messages published to those topics, enabling one-to-many communication.
Request-Response: A synchronous communication pattern where one agent sends a request and waits for another agent to send back a response before continuing.
Structured Data: Information formatted in a standardized way (like JSON) with defined fields and types, making it easier for agents to parse and use reliably.
Task Delegation Protocol: A coordination protocol that defines how one agent requests work from another, including acknowledgment, progress updates, completion, and error handling.
Timeout: A mechanism that limits how long an agent waits for a response before concluding the message was lost or the recipient is unavailable.
Quiz
Ready to test your understanding? Take this quick quiz to reinforce what you've learned about agent communication patterns and protocols.
Reference

About the author: Michael Brenndoerfer
All opinions expressed here are my own and do not reflect the views of my employer.
Michael currently works as an Associate Director of Data Science at EQT Partners in Singapore, where he drives AI and data initiatives across private capital investments.
With over a decade of experience spanning private equity, management consulting, and software engineering, he specializes in building and scaling analytics capabilities from the ground up. He has published research in leading AI conferences and holds expertise in machine learning, natural language processing, and value creation through data.
Related Content

Scaling Up without Breaking the Bank: AI Agent Performance & Cost Optimization at Scale
Learn how to scale AI agents from single users to thousands while maintaining performance and controlling costs. Covers horizontal scaling, load balancing, monitoring, cost controls, and prompt optimization strategies.

Managing and Reducing AI Agent Costs: Complete Guide to Cost Optimization Strategies
Learn how to dramatically reduce AI agent API costs without sacrificing capability. Covers model selection, caching, batching, prompt optimization, and budget controls with practical Python examples.

Speeding Up AI Agents: Performance Optimization Techniques for Faster Response Times
Learn practical techniques to make AI agents respond faster, including model selection strategies, response caching, streaming, parallel execution, and prompt optimization for reduced latency.
Stay updated
Get notified when I publish new articles on data and AI, private equity, technology, and more.

