Multi-Agent System Collaboration - Part 3: Implementing Agents

π Multi-Agent System Collaboration
View All Parts in This Series
Ad Space
Multi-Agent System Collaboration - Part 3: Implementing Agents
Now that you have solid role designs and communication protocols from Parts 1 and 2, it's time to bring your agents to life through implementation. Building production-ready agents requires more than just basic classes - you need robust error handling, lifecycle management, state persistence, and seamless integration with your communication infrastructure.
This comprehensive guide will teach you to implement enterprise-grade AI agents that are reliable, maintainable, and capable of complex collaborative behaviors.
Why Agent Implementation is Complex
Beyond Simple Classes While a basic agent might just be a class with a few methods, production agents need:
State Management Agents must maintain state across conversations, remember context, and persist important information between sessions.
Error Recovery When agents fail (and they will), the system must recover gracefully without losing work or corrupting shared state.
Resource Management Agents consume CPU, memory, and API quotas. Proper resource management prevents system overload and ensures fair resource allocation.
Lifecycle Management Agents need to start up cleanly, shut down gracefully, and handle updates without disrupting ongoing operations.
What You'll Learn in This Tutorial
By the end of this tutorial, you'll have:
- β Production-ready agent architecture with proper abstraction layers
- β Robust communication integration with the message broker from Part 2
- β Comprehensive error handling and recovery mechanisms
- β Agent lifecycle management including startup, shutdown, and updates
- β State persistence and context management
- β Performance monitoring and resource management
Estimated Time: 45-50 minutes
Step 1: Understanding Agent Architecture Patterns
Before implementing agents, it's crucial to understand the architectural patterns that make agents maintainable and scalable.
Core Agent Architecture Components
Agent Core The fundamental agent logic that defines behavior, decision-making, and task execution.
Communication Layer Handles all message passing, event handling, and interaction with other agents.
State Manager Manages agent memory, context, and persistent data across sessions.
Resource Manager Monitors and controls resource usage (CPU, memory, API calls).
Lifecycle Controller Handles agent startup, shutdown, health monitoring, and updates.
Agent Design Patterns
The Actor Model Pattern Each agent is an independent "actor" that:
- Processes messages sequentially (no race conditions)
- Maintains private state (no shared memory issues)
- Communicates only through messages (loose coupling)
Why This Pattern Works for AI Agents:
- Isolation: Agent failures don't cascade to other agents
- Scalability: Easy to distribute agents across multiple processes/machines
- Debugging: Clear message trails make debugging easier
- Testing: Agents can be tested in isolation
Step 2: Building the Base Agent Architecture
Let's create a comprehensive base agent class that provides all the infrastructure needed for production agents.
Base Agent Implementation
// agents/base-agent.js
const EventEmitter = require('events');
const { v4: uuidv4 } = require('uuid');
class BaseAgent extends EventEmitter {
constructor(config) {
super();
// Agent identity and configuration
this.id = config.id || uuidv4();
this.name = config.name || `Agent-${this.id.substring(0, 8)}`;
this.type = config.type || 'generic';
this.version = config.version || '1.0.0';
// Agent capabilities and constraints
this.capabilities = new Set(config.capabilities || []);
this.maxConcurrentTasks = config.maxConcurrentTasks || 5;
this.maxMemoryUsage = config.maxMemoryUsage || 100 * 1024 * 1024; // 100MB
// Communication setup
this.messageBroker = null;
this.subscriptions = new Set();
// State management
this.state = {
status: 'initializing',
currentTasks: new Map(),
completedTasks: 0,
errors: 0,
startTime: null,
lastActivity: null
};
// Performance metrics
this.metrics = {
messagesProcessed: 0,
tasksCompleted: 0,
averageResponseTime: 0,
errorRate: 0,
memoryUsage: 0
};
// Configuration validation
this.validateConfiguration(config);
console.log(`π€ Base agent created: ${this.name} (${this.id})`);
}
validateConfiguration(config) {
/**
* Validate agent configuration for common issues
*/
const issues = [];
// Check required fields
if (!config.capabilities || config.capabilities.length === 0) {
issues.push('Agent must have at least one capability');
}
// Check resource limits
if (config.maxConcurrentTasks && config.maxConcurrentTasks > 50) {
issues.push('maxConcurrentTasks too high - may cause resource exhaustion');
}
// Check for conflicting capabilities
const conflictingPairs = [
['read_only', 'write_data'],
['synchronous', 'asynchronous']
];
for (const [cap1, cap2] of conflictingPairs) {
if (this.capabilities.has(cap1) && this.capabilities.has(cap2)) {
issues.push(`Conflicting capabilities: ${cap1} and ${cap2}`);
}
}
if (issues.length > 0) {
console.warn(`β οΈ Agent configuration issues for ${this.name}:`, issues);
}
}
async initialize(messageBroker) {
/**
* Initialize agent with message broker and start operations
*
* Args:
* messageBroker: Message broker instance from Part 2
*/
try {
console.log(`π Initializing agent: ${this.name}`);
// Connect to message broker
this.messageBroker = messageBroker;
await this.messageBroker.registerAgent(this.id, {
name: this.name,
type: this.type,
capabilities: Array.from(this.capabilities),
version: this.version,
maxConcurrentTasks: this.maxConcurrentTasks
});
// Set up message handling
this.setupMessageHandling();
// Set up health monitoring
this.setupHealthMonitoring();
// Set up resource monitoring
this.setupResourceMonitoring();
// Update state
this.state.status = 'ready';
this.state.startTime = new Date();
this.state.lastActivity = new Date();
// Emit ready event
this.emit('agent.ready', {
agentId: this.id,
name: this.name,
capabilities: Array.from(this.capabilities)
});
console.log(`β
Agent initialized successfully: ${this.name}`);
} catch (error) {
console.error(`β Agent initialization failed: ${this.name}`, error);
this.state.status = 'error';
throw error;
}
}
setupMessageHandling() {
/**
* Set up message handling with the communication system
*/
// Subscribe to messages directed at this agent
this.messageBroker.subscribe(this.id, ['*'], async (messageContext) => {
await this.handleIncomingMessage(messageContext);
});
// Subscribe to broadcast messages
this.messageBroker.subscribe(this.id, ['system.broadcast', 'agent.coordination'], async (messageContext) => {
await this.handleBroadcastMessage(messageContext);
});
console.log(`π‘ Message handling configured for ${this.name}`);
}
async handleIncomingMessage(messageContext) {
/**
* Handle incoming messages with proper error handling and metrics
*/
const startTime = Date.now();
try {
// Update activity timestamp
this.state.lastActivity = new Date();
// Check if agent can handle more tasks
if (this.state.currentTasks.size >= this.maxConcurrentTasks) {
await this.sendMessage(messageContext.from_agent, 'task.rejected', {
reason: 'Agent at maximum capacity',
currentTasks: this.state.currentTasks.size,
maxTasks: this.maxConcurrentTasks
});
return;
}
// Process the message based on type
const response = await this.processMessage(messageContext);
// Send response if needed
if (response && messageContext.from_agent !== 'system') {
await this.sendMessage(messageContext.from_agent, 'task.response', response);
}
// Update metrics
this.metrics.messagesProcessed++;
this.metrics.averageResponseTime = this.updateAverageResponseTime(
Date.now() - startTime
);
} catch (error) {
console.error(`β Error handling message in ${this.name}:`, error);
// Update error metrics
this.state.errors++;
this.metrics.errorRate = this.state.errors / this.metrics.messagesProcessed;
// Send error response
if (messageContext.from_agent !== 'system') {
await this.sendMessage(messageContext.from_agent, 'task.error', {
error: error.message,
agentId: this.id,
messageId: messageContext.message_id
});
}
// Emit error event for monitoring
this.emit('agent.error', {
agentId: this.id,
error: error.message,
messageContext: messageContext
});
}
}
async processMessage(messageContext) {
/**
* Process incoming message - override in subclasses
*
* This is the main method that subclasses should implement
* to define their specific behavior
*/
const { message_type, payload } = messageContext;
// Default processing based on message type
switch (message_type) {
case 'task.execute':
return await this.executeTask(payload);
case 'task.status':
return await this.getTaskStatus(payload.taskId);
case 'task.cancel':
return await this.cancelTask(payload.taskId);
case 'agent.ping':
return { status: 'pong', timestamp: new Date().toISOString() };
default:
console.warn(`β οΈ Unhandled message type: ${message_type} in ${this.name}`);
return { error: `Unsupported message type: ${message_type}` };
}
}
async executeTask(taskPayload) {
/**
* Execute a task - override in subclasses for specific behavior
*/
const taskId = uuidv4();
const task = {
id: taskId,
payload: taskPayload,
startTime: new Date(),
status: 'running'
};
// Track the task
this.state.currentTasks.set(taskId, task);
try {
// Emit task started event
this.emit('task.started', { agentId: this.id, taskId: taskId });
// Simulate task execution (override this in subclasses)
const result = await this.performTask(taskPayload);
// Update task status
task.status = 'completed';
task.endTime = new Date();
task.result = result;
// Move to completed tasks
this.state.currentTasks.delete(taskId);
this.state.completedTasks++;
this.metrics.tasksCompleted++;
// Emit task completed event
this.emit('task.completed', {
agentId: this.id,
taskId: taskId,
result: result
});
return {
taskId: taskId,
status: 'completed',
result: result,
executionTime: task.endTime - task.startTime
};
} catch (error) {
// Handle task failure
task.status = 'failed';
task.error = error.message;
task.endTime = new Date();
this.state.currentTasks.delete(taskId);
this.state.errors++;
// Emit task failed event
this.emit('task.failed', {
agentId: this.id,
taskId: taskId,
error: error.message
});
throw error;
}
}
async performTask(taskPayload) {
/**
* Perform the actual task work - override in subclasses
*
* This is where subclasses implement their specific functionality
*/
// Default implementation - just echo the payload
console.log(`π ${this.name} processing task:`, taskPayload);
// Simulate some work
await new Promise(resolve => setTimeout(resolve, 1000));
return {
message: `Task processed by ${this.name}`,
payload: taskPayload,
timestamp: new Date().toISOString()
};
}
async sendMessage(recipientId, messageType, payload, options = {}) {
/**
* Send message to another agent
*/
if (!this.messageBroker) {
throw new Error('Message broker not initialized');
}
try {
const result = await this.messageBroker.sendMessage(
this.id,
recipientId,
messageType,
payload,
options
);
console.log(`π€ ${this.name} sent message to ${recipientId}: ${messageType}`);
return result;
} catch (error) {
console.error(`β Failed to send message from ${this.name}:`, error);
throw error;
}
}
async broadcast(messageType, payload, options = {}) {
/**
* Broadcast message to all agents
*/
if (!this.messageBroker) {
throw new Error('Message broker not initialized');
}
try {
const result = await this.messageBroker.broadcast(
messageType,
payload,
this.id, // Exclude self from broadcast
options
);
console.log(`π‘ ${this.name} broadcast: ${messageType}`);
return result;
} catch (error) {
console.error(`β Failed to broadcast from ${this.name}:`, error);
throw error;
}
}
setupHealthMonitoring() {
/**
* Set up health monitoring and reporting
*/
// Report health status every 30 seconds
setInterval(() => {
this.reportHealthStatus();
}, 30000);
// Respond to health check requests
this.messageBroker.subscribe(this.id, ['system.health_check'], async (messageContext) => {
const healthStatus = this.getHealthStatus();
await this.sendMessage(
messageContext.from_agent,
'system.health_response',
healthStatus
);
});
}
getHealthStatus() {
/**
* Get current health status of the agent
*/
const now = new Date();
const uptime = this.state.startTime ? now - this.state.startTime : 0;
const timeSinceActivity = this.state.lastActivity ? now - this.state.lastActivity : 0;
return {
agentId: this.id,
name: this.name,
status: this.state.status,
uptime: uptime,
timeSinceLastActivity: timeSinceActivity,
currentTasks: this.state.currentTasks.size,
completedTasks: this.state.completedTasks,
errorCount: this.state.errors,
memoryUsage: process.memoryUsage().heapUsed,
isHealthy: this.isHealthy()
};
}
isHealthy() {
/**
* Determine if agent is healthy based on various factors
*/
// Check basic status
if (this.state.status !== 'ready') {
return false;
}
// Check if agent is responsive (activity within last 5 minutes)
const timeSinceActivity = Date.now() - (this.state.lastActivity?.getTime() || 0);
if (timeSinceActivity > 5 * 60 * 1000) {
return false;
}
// Check error rate (should be less than 10%)
if (this.metrics.errorRate > 0.1) {
return false;
}
// Check memory usage
const memoryUsage = process.memoryUsage().heapUsed;
if (memoryUsage > this.maxMemoryUsage) {
return false;
}
return true;
}
setupResourceMonitoring() {
/**
* Monitor resource usage and enforce limits
*/
// Check resource usage every minute
setInterval(() => {
this.checkResourceUsage();
}, 60000);
}
checkResourceUsage() {
/**
* Check and enforce resource usage limits
*/
const memoryUsage = process.memoryUsage().heapUsed;
this.metrics.memoryUsage = memoryUsage;
// Check memory limit
if (memoryUsage > this.maxMemoryUsage) {
console.warn(`β οΈ ${this.name} exceeding memory limit: ${memoryUsage / 1024 / 1024:.1f}MB`);
// Emit resource warning
this.emit('resource.warning', {
agentId: this.id,
type: 'memory',
current: memoryUsage,
limit: this.maxMemoryUsage
});
// Attempt cleanup
await this.performResourceCleanup();
}
// Check task limit
if (this.state.currentTasks.size >= this.maxConcurrentTasks) {
console.warn(`β οΈ ${this.name} at maximum task capacity: ${this.state.currentTasks.size}`);
}
}
async performResourceCleanup() {
/**
* Perform resource cleanup when limits are exceeded
*/
console.log(`π§Ή Performing resource cleanup for ${this.name}`);
// Clear old completed task references
// Force garbage collection if available
if (global.gc) {
global.gc();
}
// Cancel oldest non-critical tasks if necessary
const oldTasks = Array.from(this.state.currentTasks.entries())
.filter(([taskId, task]) => !task.critical)
.sort((a, b) => a[1].startTime - b[1].startTime);
if (oldTasks.length > 0) {
const [oldestTaskId] = oldTasks[0];
await this.cancelTask(oldestTaskId);
console.log(`ποΈ Cancelled oldest task for resource management: ${oldestTaskId}`);
}
}
async shutdown(graceful = true) {
/**
* Shutdown agent gracefully or forcefully
*
* Args:
* graceful: If true, wait for current tasks to complete
*/
console.log(`π Shutting down agent: ${this.name} (graceful: ${graceful})`);
this.state.status = 'shutting_down';
try {
if (graceful) {
// Wait for current tasks to complete (with timeout)
const shutdownTimeout = 30000; // 30 seconds
const startTime = Date.now();
while (this.state.currentTasks.size > 0 &&
(Date.now() - startTime) < shutdownTimeout) {
console.log(`β³ Waiting for ${this.state.currentTasks.size} tasks to complete...`);
await new Promise(resolve => setTimeout(resolve, 1000));
}
// Cancel remaining tasks if timeout reached
if (this.state.currentTasks.size > 0) {
console.warn(`β οΈ Timeout reached, cancelling ${this.state.currentTasks.size} remaining tasks`);
for (const taskId of this.state.currentTasks.keys()) {
await this.cancelTask(taskId);
}
}
} else {
// Force cancel all tasks
for (const taskId of this.state.currentTasks.keys()) {
await this.cancelTask(taskId);
}
}
// Unregister from message broker
if (this.messageBroker) {
// Implementation would depend on message broker API
console.log(`π€ Unregistering ${this.name} from message broker`);
}
// Update final state
this.state.status = 'shutdown';
// Emit shutdown event
this.emit('agent.shutdown', {
agentId: this.id,
name: this.name,
graceful: graceful,
finalStats: this.getStats()
});
console.log(`β
Agent shutdown complete: ${this.name}`);
} catch (error) {
console.error(`β Error during agent shutdown: ${this.name}`, error);
this.state.status = 'error';
throw error;
}
}
getStats() {
/**
* Get comprehensive agent statistics
*/
const uptime = this.state.startTime ? Date.now() - this.state.startTime.getTime() : 0;
return {
agentId: this.id,
name: this.name,
type: this.type,
status: this.state.status,
uptime: uptime,
capabilities: Array.from(this.capabilities),
// Task statistics
currentTasks: this.state.currentTasks.size,
completedTasks: this.state.completedTasks,
totalErrors: this.state.errors,
// Performance metrics
messagesProcessed: this.metrics.messagesProcessed,
averageResponseTime: this.metrics.averageResponseTime,
errorRate: this.metrics.errorRate,
memoryUsage: this.metrics.memoryUsage,
// Health status
isHealthy: this.isHealthy(),
lastActivity: this.state.lastActivity
};
}
}
module.exports = BaseAgent;
Base Agent Architecture Explanation:
Event-Driven Design: The agent extends EventEmitter, allowing other components to listen for agent events (ready, error, task completion, etc.).
Resource Management: Built-in monitoring and enforcement of memory and task limits prevents agents from consuming excessive resources.
Health Monitoring: Continuous health checks ensure agents remain responsive and functional.
Graceful Shutdown: Proper shutdown procedures ensure no data is lost when agents are stopped or updated.
Step 3: Implementing Specialized Agent Types
Now let's create specific agent implementations that demonstrate different patterns and capabilities.
Task Execution Agent
// agents/task-executor-agent.js
const BaseAgent = require('./base-agent');
class TaskExecutorAgent extends BaseAgent {
constructor(config) {
super({
...config,
type: 'task_executor',
capabilities: ['execute_tasks', 'parallel_processing', 'result_aggregation']
});
// Task-specific configuration
this.taskQueue = [];
this.taskResults = new Map();
this.supportedTaskTypes = new Set(config.supportedTaskTypes || ['generic']);
console.log(`βοΈ Task Executor Agent created: ${this.name}`);
}
async performTask(taskPayload) {
/**
* Override base task performance with specialized logic
*/
const { taskType, data, options = {} } = taskPayload;
// Check if we support this task type
if (!this.supportedTaskTypes.has(taskType)) {
throw new Error(`Unsupported task type: ${taskType}`);
}
console.log(`βοΈ ${this.name} executing ${taskType} task`);
// Route to specific task handler
switch (taskType) {
case 'data_processing':
return await this.processData(data, options);
case 'api_call':
return await this.makeAPICall(data, options);
case 'file_operation':
return await this.performFileOperation(data, options);
default:
return await this.executeGenericTask(data, options);
}
}
async processData(data, options) {
/**
* Process data according to specified operations
*/
const operations = options.operations || ['validate'];
const results = {};
for (const operation of operations) {
switch (operation) {
case 'validate':
results.validation = this.validateData(data);
break;
case 'transform':
results.transformed = this.transformData(data, options.transformation);
break;
case 'analyze':
results.analysis = this.analyzeData(data);
break;
}
}
return {
taskType: 'data_processing',
operations: operations,
results: results,
dataSize: JSON.stringify(data).length
};
}
validateData(data) {
/**
* Validate data structure and content
*/
const validation = {
isValid: true,
errors: [],
warnings: []
};
// Basic validation checks
if (!data || typeof data !== 'object') {
validation.isValid = false;
validation.errors.push('Data must be a valid object');
}
// Check for required fields (example)
const requiredFields = ['id', 'type'];
for (const field of requiredFields) {
if (!(field in data)) {
validation.isValid = false;
validation.errors.push(`Missing required field: ${field}`);
}
}
return validation;
}
}
Task Executor Explanation:
Specialization: This agent specializes in executing various types of tasks, demonstrating how to create focused, capable agents.
Task Routing: Different task types are routed to appropriate handlers, showing how agents can have multiple capabilities while maintaining clean code organization.
Validation: Input validation ensures the agent receives properly formatted data before processing.
Communication Facilitator Agent
// agents/communication-facilitator-agent.js
const BaseAgent = require('./base-agent');
class CommunicationFacilitatorAgent extends BaseAgent {
constructor(config) {
super({
...config,
type: 'communication_facilitator',
capabilities: ['message_routing', 'protocol_translation', 'communication_optimization']
});
// Communication-specific state
this.routingRules = new Map();
this.messageHistory = [];
this.communicationMetrics = {
messagesRouted: 0,
protocolTranslations: 0,
routingErrors: 0
};
console.log(`π‘ Communication Facilitator Agent created: ${this.name}`);
}
async performTask(taskPayload) {
/**
* Handle communication-related tasks
*/
const { taskType, data, options = {} } = taskPayload;
switch (taskType) {
case 'route_message':
return await this.routeMessage(data, options);
case 'translate_protocol':
return await this.translateProtocol(data, options);
case 'optimize_communication':
return await this.optimizeCommunication(data, options);
default:
throw new Error(`Unsupported communication task: ${taskType}`);
}
}
async routeMessage(messageData, options) {
/**
* Route messages between agents based on rules
*/
const { fromAgent, toAgent, messageType, payload } = messageData;
console.log(`π Routing message: ${fromAgent} β ${toAgent} (${messageType})`);
try {
// Check routing rules
const routingRule = this.routingRules.get(`${fromAgent}:${toAgent}`) ||
this.routingRules.get(`*:${toAgent}`) ||
this.routingRules.get(`${fromAgent}:*`);
if (routingRule && !routingRule.allowed) {
throw new Error(`Message routing blocked by rule: ${fromAgent} β ${toAgent}`);
}
// Apply message transformations if specified
let transformedPayload = payload;
if (routingRule && routingRule.transform) {
transformedPayload = await this.applyMessageTransformation(
payload,
routingRule.transform
);
}
// Route the message
const result = await this.sendMessage(toAgent, messageType, transformedPayload, options);
// Update metrics
this.communicationMetrics.messagesRouted++;
// Store in message history for analysis
this.messageHistory.push({
timestamp: new Date(),
fromAgent,
toAgent,
messageType,
success: true
});
return {
routed: true,
fromAgent,
toAgent,
messageType,
transformationApplied: !!routingRule?.transform
};
} catch (error) {
this.communicationMetrics.routingErrors++;
// Store failed routing attempt
this.messageHistory.push({
timestamp: new Date(),
fromAgent,
toAgent,
messageType,
success: false,
error: error.message
});
throw error;
}
}
addRoutingRule(fromAgent, toAgent, rule) {
/**
* Add routing rule for message filtering and transformation
*/
const ruleKey = `${fromAgent}:${toAgent}`;
this.routingRules.set(ruleKey, rule);
console.log(`π Routing rule added: ${ruleKey}`, rule);
}
}
Communication Facilitator Explanation:
Message Routing: This agent
Ad Space
Recommended Tools & Resources
* This section contains affiliate links. We may earn a commission when you purchase through these links at no additional cost to you.
π Featured AI Books
OpenAI API
AI PlatformAccess GPT-4 and other powerful AI models for your agent development.
LangChain Plus
FrameworkAdvanced framework for building applications with large language models.
Pinecone Vector Database
DatabaseHigh-performance vector database for AI applications and semantic search.
AI Agent Development Course
EducationComplete course on building production-ready AI agents from scratch.
π‘ Pro Tip
Start with the free tiers of these tools to experiment, then upgrade as your AI agent projects grow. Most successful developers use a combination of 2-3 core tools rather than trying everything at once.
π Multi-Agent System Collaboration
View All Parts in This Series
π Join the AgentForge Community
Get weekly insights, tutorials, and the latest AI agent developments delivered to your inbox.
No spam, ever. Unsubscribe at any time.