Build Your First AI Agent from Scratch - Part 5: Testing, Debugging, and Deployment

π Build a Personal AI Assistant
Ad Space
Build Your First AI Agent from Scratch - Part 5: Testing, Debugging, and Deployment
Congratulations! You've built a sophisticated AI agent with memory, context handling, and tool integration. Now it's time to ensure your agent works reliably in production through comprehensive testing, effective debugging strategies, and proper deployment practices.
Testing and deployment are where hobby projects become professional applications. Without proper testing, your agent might work perfectly in development but fail catastrophically when real users interact with it.
What You'll Learn in This Tutorial
By the end of this tutorial, you'll have:
- β Comprehensive testing suite covering all agent functionality
- β Debugging strategies for complex AI agent issues
- β Performance monitoring and optimization techniques
- β Production deployment with proper configuration
- β Error handling and recovery mechanisms
- β Maintenance procedures for ongoing operation
Estimated Time: 35-40 minutes
Step 1: Understanding AI Agent Testing Challenges
Testing AI agents is more complex than testing traditional applications because of their non-deterministic nature and external dependencies.
Unique Testing Challenges
Non-Deterministic Responses AI models can give different responses to the same input, making traditional assertion-based testing difficult.
External API Dependencies Your agent relies on external services (OpenAI, web APIs, databases) that may be unavailable during testing.
Context and Memory Agent behavior depends on conversation history and stored memories, creating complex state dependencies.
Tool Integration Testing tool usage requires mocking external services and handling various response scenarios.
Testing Strategy Overview
Unit Tests: Test individual components in isolation Integration Tests: Test how components work together End-to-End Tests: Test complete user workflows Performance Tests: Ensure agent meets response time requirements Reliability Tests: Test error handling and recovery
Step 2: Building a Comprehensive Test Suite
Let's create a robust testing framework for your AI agent.
Test Configuration and Setup
# tests/conftest.py - pytest configuration
import pytest
import tempfile
import os
from unittest.mock import Mock, patch
import sys
from pathlib import Path
# Add src to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from src.agents.memory_agent import MemoryAgent
from src.utils.config import AgentConfig
from src.tools.web_search import WebSearchTool
from src.tools.openai_tool import OpenAITool
@pytest.fixture
def temp_database():
"""Create temporary database for testing"""
with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as f:
db_path = f.name
yield f"sqlite:///{db_path}"
# Cleanup
if os.path.exists(db_path):
os.unlink(db_path)
@pytest.fixture
def test_config():
"""Create test configuration"""
return AgentConfig(
openai_api_key="test-key-123",
openai_model="gpt-3.5-turbo",
max_tokens=100,
temperature=0.0, # Use 0 temperature for more predictable testing
agent_name="Test Agent",
agent_description="A test agent",
system_prompt="You are a helpful test assistant.",
log_level="DEBUG",
debug=True
)
@pytest.fixture
def mock_openai_client():
"""Mock OpenAI client for testing"""
with patch('src.agents.base_agent.OpenAI') as mock_openai:
mock_client = Mock()
mock_openai.return_value = mock_client
# Mock chat completion response
mock_response = Mock()
mock_response.choices = [Mock()]
mock_response.choices[0].message.content = "Test response from AI"
mock_response.usage.total_tokens = 50
mock_client.chat.completions.create.return_value = mock_response
yield mock_client
@pytest.fixture
def test_agent(test_config, temp_database, mock_openai_client):
"""Create test agent with mocked dependencies"""
agent = MemoryAgent(test_config, temp_database)
agent.initialize()
return agent
Unit Tests for Core Components
# tests/test_agent_core.py
import pytest
from unittest.mock import Mock, patch
from src.agents.memory_agent import MemoryAgent
class TestAgentCore:
"""Test core agent functionality"""
def test_agent_initialization(self, test_config, temp_database, mock_openai_client):
"""Test agent initializes correctly"""
agent = MemoryAgent(test_config, temp_database)
assert agent.config == test_config
assert agent.memory is not None
assert not agent.is_initialized
# Initialize agent
agent.initialize()
assert agent.is_initialized
def test_conversation_creation(self, test_agent):
"""Test conversation creation and management"""
# Create conversation
conv_id = test_agent.create_conversation()
assert conv_id is not None
assert conv_id in test_agent.conversations
# Check memory system has conversation
memory_conv = test_agent.memory.db.get_conversation(conv_id)
assert memory_conv is not None
assert memory_conv.id == conv_id
def test_basic_chat_functionality(self, test_agent):
"""Test basic chat without external API calls"""
conv_id = test_agent.create_conversation()
# Mock the AI response to avoid external API calls
with patch.object(test_agent, '_get_completion_with_memory') as mock_completion:
mock_completion.return_value = "Hello! I'm your test AI agent."
response = test_agent.chat("Hello", conv_id)
assert response == "Hello! I'm your test AI agent."
mock_completion.assert_called_once()
def test_memory_integration(self, test_agent):
"""Test memory system integration"""
conv_id = test_agent.create_conversation()
# Add some messages
test_agent.memory.add_message(conv_id, "user", "My name is John")
test_agent.memory.add_message(conv_id, "assistant", "Nice to meet you, John!")
# Check messages were stored
messages = test_agent.memory.db.get_messages(conv_id)
assert len(messages) == 2
assert messages[0].content == "My name is John"
assert messages[1].content == "Nice to meet you, John!"
def test_error_handling(self, test_agent):
"""Test agent error handling"""
# Test with invalid conversation ID
with pytest.raises(Exception):
test_agent.chat("Hello", "invalid-conversation-id")
# Test with empty message
conv_id = test_agent.create_conversation()
response = test_agent.chat("", conv_id)
# Should handle gracefully (exact behavior depends on implementation)
assert isinstance(response, str)
Integration Tests
# tests/test_integration.py
import pytest
from unittest.mock import Mock, patch
import json
class TestAgentIntegration:
"""Test integration between agent components"""
def test_memory_and_context_integration(self, test_agent):
"""Test memory system works with context building"""
conv_id = test_agent.create_conversation()
# Simulate conversation with memory extraction
messages = [
"Hi, I'm a software engineer",
"I work with Python and JavaScript",
"I'm interested in AI and machine learning"
]
for message in messages:
with patch.object(test_agent, '_get_completion_with_memory') as mock_completion:
mock_completion.return_value = f"Thanks for sharing: {message}"
test_agent.chat(message, conv_id)
# Check that context window includes conversation history
context_window = test_agent.memory.get_context_window(conv_id)
assert len(context_window.messages) > 0
assert context_window.total_tokens > 0
def test_tool_integration(self, test_agent):
"""Test tool integration with agent"""
# Add tools to agent
from src.tools.web_search import WebSearchTool
web_search_tool = WebSearchTool()
test_agent.tools = [web_search_tool]
# Mock tool response
with patch.object(web_search_tool, 'run') as mock_tool:
mock_tool.return_value = "AI agents are software programs..."
result = test_agent.use_tool("WebSearchTool", "What are AI agents?")
assert result == "AI agents are software programs..."
mock_tool.assert_called_once_with("What are AI agents?")
def test_error_recovery(self, test_agent):
"""Test agent recovers from various error conditions"""
conv_id = test_agent.create_conversation()
# Test API failure recovery
with patch.object(test_agent, '_get_completion_with_memory') as mock_completion:
# First call fails
mock_completion.side_effect = [Exception("API Error"), "Recovered response"]
# Agent should handle error gracefully
response = test_agent.chat("Test message", conv_id)
# Should either return error message or retry successfully
assert isinstance(response, str)
assert len(response) > 0
Performance Tests
# tests/test_performance.py
import pytest
import time
import asyncio
from concurrent.futures import ThreadPoolExecutor
class TestAgentPerformance:
"""Test agent performance characteristics"""
def test_response_time(self, test_agent):
"""Test agent response time meets requirements"""
conv_id = test_agent.create_conversation()
with patch.object(test_agent, '_get_completion_with_memory') as mock_completion:
mock_completion.return_value = "Quick response"
start_time = time.time()
response = test_agent.chat("Hello", conv_id)
end_time = time.time()
response_time = end_time - start_time
# Should respond within 5 seconds (adjust based on requirements)
assert response_time < 5.0
assert response == "Quick response"
def test_concurrent_conversations(self, test_agent):
"""Test agent handles multiple concurrent conversations"""
def chat_session(session_id):
conv_id = test_agent.create_conversation()
with patch.object(test_agent, '_get_completion_with_memory') as mock_completion:
mock_completion.return_value = f"Response for session {session_id}"
return test_agent.chat(f"Message from session {session_id}", conv_id)
# Run multiple concurrent chat sessions
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(chat_session, i) for i in range(5)]
results = [future.result() for future in futures]
# All sessions should complete successfully
assert len(results) == 5
for i, result in enumerate(results):
assert f"session {i}" in result
def test_memory_performance(self, test_agent):
"""Test memory system performance with large datasets"""
conv_id = test_agent.create_conversation()
# Add many messages to test memory performance
start_time = time.time()
for i in range(100):
test_agent.memory.add_message(conv_id, "user", f"Test message {i}")
test_agent.memory.add_message(conv_id, "assistant", f"Response {i}")
end_time = time.time()
# Should handle 200 messages quickly
assert (end_time - start_time) < 10.0
# Test context window performance
start_time = time.time()
context_window = test_agent.memory.get_context_window(conv_id)
end_time = time.time()
# Context window should be built quickly
assert (end_time - start_time) < 1.0
assert len(context_window.messages) > 0
Step 3: Debugging Strategies for AI Agents
AI agents can fail in complex ways. Let's implement debugging tools and strategies.
Debugging Framework
# src/utils/debugger.py
import logging
import json
import traceback
from datetime import datetime
from typing import Dict, Any, List
class AgentDebugger:
"""Debugging utilities for AI agents"""
def __init__(self, agent, log_level=logging.DEBUG):
self.agent = agent
self.logger = logging.getLogger(f"AgentDebugger.{agent.name}")
self.logger.setLevel(log_level)
# Debug state tracking
self.conversation_traces = {}
self.error_history = []
self.performance_metrics = {}
# Set up debug handlers
self.setup_debug_handlers()
def setup_debug_handlers(self):
"""Set up debugging event handlers"""
# Listen to agent events
self.agent.on('conversation.started', self.on_conversation_started)
self.agent.on('message.processed', self.on_message_processed)
self.agent.on('error.occurred', self.on_error_occurred)
self.agent.on('tool.used', self.on_tool_used)
def on_conversation_started(self, event_data):
"""Handle conversation start event"""
conv_id = event_data['conversation_id']
self.conversation_traces[conv_id] = {
'started_at': datetime.now().isoformat(),
'messages': [],
'tools_used': [],
'errors': [],
'performance': {}
}
self.logger.debug(f"π¬ Conversation started: {conv_id}")
def on_message_processed(self, event_data):
"""Handle message processing event"""
conv_id = event_data['conversation_id']
if conv_id in self.conversation_traces:
self.conversation_traces[conv_id]['messages'].append({
'timestamp': datetime.now().isoformat(),
'user_message': event_data['user_message'],
'ai_response': event_data['ai_response'],
'processing_time': event_data['processing_time'],
'tokens_used': event_data.get('tokens_used', 0)
})
self.logger.debug(f"π¬ Message processed: {conv_id} ({event_data['processing_time']}ms)")
def on_error_occurred(self, event_data):
"""Handle error event"""
error_record = {
'timestamp': datetime.now().isoformat(),
'conversation_id': event_data.get('conversation_id'),
'error_type': event_data['error_type'],
'error_message': event_data['error_message'],
'stack_trace': event_data.get('stack_trace'),
'context': event_data.get('context', {})
}
self.error_history.append(error_record)
# Add to conversation trace if available
conv_id = event_data.get('conversation_id')
if conv_id and conv_id in self.conversation_traces:
self.conversation_traces[conv_id]['errors'].append(error_record)
self.logger.error(f"β Error occurred: {event_data['error_type']} - {event_data['error_message']}")
def on_tool_used(self, event_data):
"""Handle tool usage event"""
conv_id = event_data.get('conversation_id')
tool_record = {
'timestamp': datetime.now().isoformat(),
'tool_name': event_data['tool_name'],
'query': event_data['query'],
'result': event_data['result'],
'execution_time': event_data['execution_time']
}
if conv_id and conv_id in self.conversation_traces:
self.conversation_traces[conv_id]['tools_used'].append(tool_record)
self.logger.debug(f"π§ Tool used: {event_data['tool_name']} ({event_data['execution_time']}ms)")
def get_conversation_debug_info(self, conversation_id):
"""Get comprehensive debug information for a conversation"""
if conversation_id not in self.conversation_traces:
return {"error": "Conversation not found in debug traces"}
trace = self.conversation_traces[conversation_id]
# Calculate summary statistics
total_messages = len(trace['messages'])
total_tokens = sum(msg.get('tokens_used', 0) for msg in trace['messages'])
avg_response_time = sum(msg['processing_time'] for msg in trace['messages']) / max(total_messages, 1)
return {
'conversation_id': conversation_id,
'summary': {
'total_messages': total_messages,
'total_tokens': total_tokens,
'average_response_time': avg_response_time,
'tools_used_count': len(trace['tools_used']),
'errors_count': len(trace['errors'])
},
'detailed_trace': trace
}
def export_debug_report(self, output_file):
"""Export comprehensive debug report"""
report = {
'agent_info': {
'name': self.agent.name,
'version': getattr(self.agent, 'version', 'unknown'),
'capabilities': list(getattr(self.agent, 'capabilities', []))
},
'conversations': self.conversation_traces,
'error_summary': {
'total_errors': len(self.error_history),
'error_types': self.get_error_type_summary(),
'recent_errors': self.error_history[-10:] # Last 10 errors
},
'performance_summary': self.calculate_performance_summary(),
'generated_at': datetime.now().isoformat()
}
with open(output_file, 'w') as f:
json.dump(report, f, indent=2, default=str)
self.logger.info(f"π Debug report exported: {output_file}")
return report
def get_error_type_summary(self):
"""Get summary of error types"""
error_types = {}
for error in self.error_history:
error_type = error['error_type']
error_types[error_type] = error_types.get(error_type, 0) + 1
return error_types
def calculate_performance_summary(self):
"""Calculate overall performance metrics"""
all_messages = []
for trace in self.conversation_traces.values():
all_messages.extend(trace['messages'])
if not all_messages:
return {}
response_times = [msg['processing_time'] for msg in all_messages]
token_counts = [msg.get('tokens_used', 0) for msg in all_messages]
return {
'total_messages': len(all_messages),
'average_response_time': sum(response_times) / len(response_times),
'min_response_time': min(response_times),
'max_response_time': max(response_times),
'total_tokens': sum(token_counts),
'average_tokens_per_message': sum(token_counts) / len(token_counts) if token_counts else 0
}
Debugging Framework Explanation:
Event-Driven Debugging: The debugger listens to agent events to automatically collect debugging information without manual instrumentation.
Conversation Tracing: Complete traces of conversations help understand how the agent behaves over time.
Error Aggregation: All errors are collected and categorized to identify patterns and common issues.
Performance Tracking: Response times and token usage are tracked to identify performance bottlenecks.
Step 4: Production Deployment Strategies
Now let's deploy your AI agent to production with proper configuration and monitoring.
Deployment Configuration
# deployment/production_config.py
import os
from dataclasses import dataclass
from typing import Optional
@dataclass
class ProductionConfig:
"""Production deployment configuration"""
# Environment settings
environment: str = "production"
debug: bool = False
log_level: str = "INFO"
# AI service configuration
openai_api_key: str = os.getenv("OPENAI_API_KEY")
openai_model: str = os.getenv("OPENAI_MODEL", "gpt-3.5-turbo")
max_tokens: int = int(os.getenv("MAX_TOKENS", "500"))
temperature: float = float(os.getenv("TEMPERATURE", "0.7"))
# Database configuration
database_url: str = os.getenv("DATABASE_URL")
database_pool_size: int = int(os.getenv("DB_POOL_SIZE", "10"))
# Performance settings
max_concurrent_conversations: int = int(os.getenv("MAX_CONCURRENT_CONVERSATIONS", "100"))
response_timeout: int = int(os.getenv("RESPONSE_TIMEOUT", "30"))
# Security settings
rate_limit_per_user: int = int(os.getenv("RATE_LIMIT_PER_USER", "60"))
enable_content_filtering: bool = os.getenv("ENABLE_CONTENT_FILTERING", "true").lower() == "true"
# Monitoring settings
enable_metrics: bool = os.getenv("ENABLE_METRICS", "true").lower() == "true"
metrics_endpoint: Optional[str] = os.getenv("METRICS_ENDPOINT")
def validate(self):
"""Validate production configuration"""
errors = []
if not self.openai_api_key:
errors.append("OPENAI_API_KEY is required")
if not self.database_url:
errors.append("DATABASE_URL is required")
if self.max_tokens > 4000:
errors.append("MAX_TOKENS should not exceed 4000 for cost control")
if errors:
raise ValueError(f"Configuration errors: {', '.join(errors)}")
print("β
Production configuration validated")
# Create production configuration
def get_production_config():
config = ProductionConfig()
config.validate()
return config
Production Deployment Script
# deployment/deploy.py
import sys
import os
import subprocess
import json
from pathlib import Path
def deploy_agent():
"""Deploy AI agent to production"""
print("π Starting AI agent deployment...")
try:
# Step 1: Validate environment
validate_deployment_environment()
# Step 2: Run tests
run_test_suite()
# Step 3: Build deployment package
build_deployment_package()
# Step 4: Deploy to server
deploy_to_server()
# Step 5: Run health checks
run_health_checks()
print("β
Deployment completed successfully!")
except Exception as e:
print(f"β Deployment failed: {e}")
sys.exit(1)
def validate_deployment_environment():
"""Validate deployment environment"""
print("π Validating deployment environment...")
required_env_vars = [
"OPENAI_API_KEY",
"DATABASE_URL",
"DEPLOYMENT_TARGET"
]
missing_vars = []
for var in required_env_vars:
if not os.getenv(var):
missing_vars.append(var)
if missing_vars:
raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}")
print("β
Environment validation passed")
def run_test_suite():
"""Run comprehensive test suite before deployment"""
print("π§ͺ Running test suite...")
# Run pytest with coverage
result = subprocess.run([
"python", "-m", "pytest",
"tests/",
"-v",
"--cov=src",
"--cov-report=term-missing"
], capture_output=True, text=True)
if result.returncode != 0:
print("β Tests failed:")
print(result.stdout)
print(result.stderr)
raise Exception("Test suite failed")
print("β
All tests passed")
def build_deployment_package():
"""Build deployment package"""
print("π¦ Building deployment package...")
# Install production dependencies
subprocess.run(["pip", "install", "-r", "requirements.txt"], check=True)
# Create deployment directory
deployment_dir = Path("deployment_package")
deployment_dir.mkdir(exist_ok=True)
# Copy necessary files
files_to_copy = [
"src/",
"requirements.txt",
"README.md"
]
for file_path in files_to_copy:
if Path(file_path).exists():
if Path(file_path).is_dir():
subprocess.run(["cp", "-r", file_path, str(deployment_dir)], check=True)
else:
subprocess.run(["cp", file_path, str(deployment_dir)], check=True)
print("β
Deployment package built")
def run_health_checks():
"""Run post-deployment health checks"""
print("π₯ Running health checks...")
# Import after deployment
from src.agents.memory_agent import MemoryAgent
from deployment.production_config import get_production_config
try:
# Initialize agent with production config
config = get_production_config()
agent = MemoryAgent(config)
agent.initialize()
# Test basic functionality
conv_id = agent.create_conversation()
response = agent.chat("Health check test", conv_id)
if response and len(response) > 0:
print("β
Health check passed")
else:
raise Exception("Agent not responding properly")
except Exception as e:
raise Exception(f"Health check failed: {e}")
if __name__ == "__main__":
deploy_agent()
Deployment Strategy Explanation:
Environment Validation: Ensures all required configuration is present before deployment.
Automated Testing: Runs the complete test suite to catch issues before production.
Health Checks: Verifies the agent works correctly in the production environment.
Rollback Capability: If health checks fail, the deployment can be rolled back.
Step 5: Production Monitoring and Maintenance
Once deployed, your agent needs ongoing monitoring and maintenance.
Production Monitoring
# monitoring/production_monitor.py
import time
import psutil
import logging
from datetime import datetime, timedelta
class ProductionMonitor:
"""Monitor AI agent in production"""
def __init__(self, agent):
self.agent = agent
self.logger = logging.getLogger("ProductionMonitor")
# Monitoring metrics
self.metrics = {
'uptime': 0,
'total_conversations': 0,
'total_messages': 0,
'average_response_time': 0,
'error_rate': 0,
'memory_usage': 0,
'cpu_usage': 0
}
self.start_time = time.time()
self.start_monitoring()
def start_monitoring(self):
"""Start production monitoring"""
# Monitor every minute
import threading
def monitor_loop():
while True:
try:
self.collect_metrics()
self.check_health()
time.sleep(60) # Monitor every minute
except Exception as e:
self.logger.error(f"Monitoring error: {e}")
time.sleep(60)
monitor_thread = threading.Thread(target=monitor_loop, daemon=True)
monitor_thread.start()
self.logger.info("π Production monitoring started")
def collect_metrics(self):
"""Collect system and agent metrics"""
# System metrics
self.metrics['uptime'] = time.time() - self.start_time
self.metrics['memory_usage'] = psutil.virtual_memory().percent
self.metrics['cpu_usage'] = psutil.cpu_percent()
# Agent metrics
if hasattr(self.agent, 'get_stats'):
agent_stats = self.agent.get_stats()
self.metrics.update(agent_stats)
# Log metrics periodically
if int(self.metrics['uptime']) % 300 == 0: # Every 5 minutes
self.logger.info(f"π Metrics: {self.metrics}")
def check_health(self):
"""Check agent health and alert if issues detected"""
alerts = []
# Check memory usage
if self.metrics['memory_usage'] > 90:
alerts.append(f"High memory usage: {self.metrics['memory_usage']}%")
# Check CPU usage
if self.metrics['cpu_usage'] > 80:
alerts.append(f"High CPU usage: {self.metrics['cpu_usage']}%")
# Check error rate
if self.metrics['error_rate'] > 0.1:
alerts.append(f"High error rate: {self.metrics['error_rate']:.2%}")
# Send alerts if any issues detected
if alerts:
self.send_alerts(alerts)
def send_alerts(self, alerts):
"""Send alerts to administrators"""
alert_message = f"π¨ AI Agent Health Alert:\n" + "\n".join(f"- {alert}" for alert in alerts)
self.logger.warning(alert_message)
# In production, send to Slack, email, or monitoring service
# For now, just log the alert
What You've Accomplished
Congratulations! You've built a complete, production-ready AI agent with:
- β Comprehensive Testing: Unit, integration, and performance tests
- β Debugging Tools: Event tracing and error analysis
- β **Production Deployment
Ad Space
Recommended Tools & Resources
* This section contains affiliate links. We may earn a commission when you purchase through these links at no additional cost to you.
π Featured AI Books
OpenAI API
AI PlatformAccess GPT-4 and other powerful AI models for your agent development.
LangChain Plus
FrameworkAdvanced framework for building applications with large language models.
Pinecone Vector Database
DatabaseHigh-performance vector database for AI applications and semantic search.
AI Agent Development Course
EducationComplete course on building production-ready AI agents from scratch.
π‘ Pro Tip
Start with the free tiers of these tools to experiment, then upgrade as your AI agent projects grow. Most successful developers use a combination of 2-3 core tools rather than trying everything at once.
π Build a Personal AI Assistant
π Join the AgentForge Community
Get weekly insights, tutorials, and the latest AI agent developments delivered to your inbox.
No spam, ever. Unsubscribe at any time.