Build Your First AI Agent from Scratch - Part 3: Adding Memory and Context Handling

Ad Space
Tutorial Navigation
- ← Part 2: Creating the Basic Agent Structure
- Part 3: Adding Memory and Context Handling (Current)
- Part 4: Implementing Tool Usage and API Integrations (Coming Soon)
- Part 5: Testing, Debugging, and Deployment (Coming Soon)
Build Your First AI Agent from Scratch - Part 3: Adding Memory and Context Handling
Welcome to Part 3 of our AI agent tutorial series! In the previous parts, we built a solid foundation with environment setup and basic agent structure. Now we'll add sophisticated memory and context handling capabilities that will make your agent truly intelligent and capable of maintaining long-term conversations.
What You'll Build in This Tutorial
By the end of this tutorial, you'll have:
- ✅ Persistent conversation storage with SQLite
- ✅ Vector-based semantic memory using embeddings
- ✅ Intelligent context window management
- ✅ Message summarization for long conversations
- ✅ Semantic search and retrieval of past conversations
- ✅ Memory consolidation and cleanup strategies
- ✅ Advanced context injection techniques
Estimated Time: 30-35 minutes
Understanding AI Agent Memory
Before diving into implementation, let's understand the different types of memory an AI agent needs:
1. Working Memory (Short-term)
- Current conversation context
- Recent messages within token limits
- Temporary state and variables
2. Episodic Memory (Medium-term)
- Complete conversation histories
- Session-based interactions
- Contextual relationships between conversations
3. Semantic Memory (Long-term)
- Learned facts and knowledge
- User preferences and patterns
- Summarized insights from past interactions
4. Procedural Memory
- Learned behaviors and skills
- Tool usage patterns
- Problem-solving strategies
Step 1: Setting Up Persistent Storage
First, let's create a robust database system for storing conversations and memory.
Install Additional Dependencies
Add these to your requirements.txt
:
# Database and Storage
sqlalchemy==2.0.23
alembic==1.13.1
sqlite3 # Built into Python
# Vector Operations and Embeddings
numpy==1.24.3
scikit-learn==1.3.2
faiss-cpu==1.7.4
# Text Processing
tiktoken==0.5.2
nltk==3.8.1
Install the new dependencies:
pip install -r requirements.txt
Create Database Models
Create src/memory/models.py
:
"""
Database models for AI Agent memory system
"""
from datetime import datetime
from typing import Optional, List, Dict, Any
import json
import uuid
from sqlalchemy import create_engine, Column, String, DateTime, Text, Integer, Float, Boolean, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship, Session
from sqlalchemy.dialects.sqlite import JSON
Base = declarative_base()
class Conversation(Base):
"""Represents a conversation session"""
__tablename__ = "conversations"
id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4()))
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
title = Column(String(200))
summary = Column(Text)
metadata = Column(JSON, default=dict)
is_active = Column(Boolean, default=True)
message_count = Column(Integer, default=0)
# Relationships
messages = relationship("Message", back_populates="conversation", cascade="all, delete-orphan")
memories = relationship("Memory", back_populates="conversation", cascade="all, delete-orphan")
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary"""
return {
"id": self.id,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
"title": self.title,
"summary": self.summary,
"metadata": self.metadata or {},
"is_active": self.is_active,
"message_count": self.message_count
}
class Message(Base):
"""Represents a single message in a conversation"""
__tablename__ = "messages"
id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4()))
conversation_id = Column(String, ForeignKey("conversations.id"), nullable=False)
role = Column(String(20), nullable=False) # user, assistant, system
content = Column(Text, nullable=False)
timestamp = Column(DateTime, default=datetime.utcnow)
token_count = Column(Integer)
metadata = Column(JSON, default=dict)
# Relationships
conversation = relationship("Conversation", back_populates="messages")
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary"""
return {
"id": self.id,
"conversation_id": self.conversation_id,
"role": self.role,
"content": self.content,
"timestamp": self.timestamp.isoformat() if self.timestamp else None,
"token_count": self.token_count,
"metadata": self.metadata or {}
}
class Memory(Base):
"""Represents a semantic memory entry"""
__tablename__ = "memories"
id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4()))
conversation_id = Column(String, ForeignKey("conversations.id"), nullable=True)
memory_type = Column(String(50), nullable=False) # fact, preference, skill, etc.
content = Column(Text, nullable=False)
importance = Column(Float, default=0.5) # 0.0 to 1.0
created_at = Column(DateTime, default=datetime.utcnow)
last_accessed = Column(DateTime, default=datetime.utcnow)
access_count = Column(Integer, default=0)
embedding = Column(Text) # JSON serialized vector
metadata = Column(JSON, default=dict)
# Relationships
conversation = relationship("Conversation", back_populates="memories")
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary"""
return {
"id": self.id,
"conversation_id": self.conversation_id,
"memory_type": self.memory_type,
"content": self.content,
"importance": self.importance,
"created_at": self.created_at.isoformat() if self.created_at else None,
"last_accessed": self.last_accessed.isoformat() if self.last_accessed else None,
"access_count": self.access_count,
"metadata": self.metadata or {}
}
def get_embedding(self) -> Optional[List[float]]:
"""Get embedding as list of floats"""
if self.embedding:
return json.loads(self.embedding)
return None
def set_embedding(self, embedding: List[float]) -> None:
"""Set embedding from list of floats"""
self.embedding = json.dumps(embedding)
class ConversationSummary(Base):
"""Represents a summarized version of conversation segments"""
__tablename__ = "conversation_summaries"
id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4()))
conversation_id = Column(String, ForeignKey("conversations.id"), nullable=False)
start_message_id = Column(String, nullable=False)
end_message_id = Column(String, nullable=False)
summary = Column(Text, nullable=False)
message_count = Column(Integer, nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
embedding = Column(Text) # JSON serialized vector
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary"""
return {
"id": self.id,
"conversation_id": self.conversation_id,
"start_message_id": self.start_message_id,
"end_message_id": self.end_message_id,
"summary": self.summary,
"message_count": self.message_count,
"created_at": self.created_at.isoformat() if self.created_at else None
}
Create Database Manager
Create src/memory/database.py
:
"""
Database management for AI Agent memory system
"""
import os
from typing import Optional, List, Dict, Any
from contextlib import contextmanager
from pathlib import Path
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.exc import SQLAlchemyError
from .models import Base, Conversation, Message, Memory, ConversationSummary
from ..utils.logger import get_logger
class DatabaseManager:
"""Manages database connections and operations"""
def __init__(self, database_url: Optional[str] = None):
"""
Initialize database manager
Args:
database_url: Database connection URL (defaults to SQLite)
"""
self.logger = get_logger()
if database_url is None:
# Default to SQLite in data directory
data_dir = Path("data")
data_dir.mkdir(exist_ok=True)
database_url = f"sqlite:///{data_dir}/agent_memory.db"
self.database_url = database_url
self.engine = create_engine(
database_url,
echo=False, # Set to True for SQL debugging
pool_pre_ping=True
)
self.SessionLocal = sessionmaker(
autocommit=False,
autoflush=False,
bind=self.engine
)
# Create tables
self.create_tables()
self.logger.info(f"Database initialized: {database_url}")
def create_tables(self):
"""Create database tables"""
try:
Base.metadata.create_all(bind=self.engine)
self.logger.info("Database tables created successfully")
except SQLAlchemyError as e:
self.logger.error(f"Failed to create database tables: {e}")
raise
@contextmanager
def get_session(self):
"""Get database session with automatic cleanup"""
session = self.SessionLocal()
try:
yield session
session.commit()
except Exception as e:
session.rollback()
self.logger.error(f"Database session error: {e}")
raise
finally:
session.close()
def health_check(self) -> bool:
"""Check database connectivity"""
try:
with self.get_session() as session:
session.execute(text("SELECT 1"))
return True
except Exception as e:
self.logger.error(f"Database health check failed: {e}")
return False
# Conversation operations
def create_conversation(self, conversation_id: str, title: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None) -> Conversation:
"""Create a new conversation"""
with self.get_session() as session:
conversation = Conversation(
id=conversation_id,
title=title,
metadata=metadata or {}
)
session.add(conversation)
session.flush()
return conversation
def get_conversation(self, conversation_id: str) -> Optional[Conversation]:
"""Get conversation by ID"""
with self.get_session() as session:
return session.query(Conversation).filter(
Conversation.id == conversation_id
).first()
def update_conversation(self, conversation_id: str, **kwargs) -> bool:
"""Update conversation fields"""
with self.get_session() as session:
result = session.query(Conversation).filter(
Conversation.id == conversation_id
).update(kwargs)
return result > 0
def list_conversations(self, limit: int = 50, offset: int = 0,
active_only: bool = True) -> List[Conversation]:
"""List conversations with pagination"""
with self.get_session() as session:
query = session.query(Conversation)
if active_only:
query = query.filter(Conversation.is_active == True)
return query.order_by(
Conversation.updated_at.desc()
).offset(offset).limit(limit).all()
# Message operations
def add_message(self, conversation_id: str, role: str, content: str,
token_count: Optional[int] = None,
metadata: Optional[Dict[str, Any]] = None) -> Message:
"""Add a message to a conversation"""
with self.get_session() as session:
message = Message(
conversation_id=conversation_id,
role=role,
content=content,
token_count=token_count,
metadata=metadata or {}
)
session.add(message)
# Update conversation message count
session.query(Conversation).filter(
Conversation.id == conversation_id
).update({
"message_count": Conversation.message_count + 1,
"updated_at": message.timestamp
})
session.flush()
return message
def get_messages(self, conversation_id: str, limit: Optional[int] = None,
offset: int = 0) -> List[Message]:
"""Get messages for a conversation"""
with self.get_session() as session:
query = session.query(Message).filter(
Message.conversation_id == conversation_id
).order_by(Message.timestamp.asc())
if limit:
query = query.offset(offset).limit(limit)
return query.all()
def get_recent_messages(self, conversation_id: str, count: int = 10) -> List[Message]:
"""Get the most recent messages from a conversation"""
with self.get_session() as session:
return session.query(Message).filter(
Message.conversation_id == conversation_id
).order_by(Message.timestamp.desc()).limit(count).all()[::-1] # Reverse to chronological order
# Memory operations
def add_memory(self, content: str, memory_type: str, importance: float = 0.5,
conversation_id: Optional[str] = None, embedding: Optional[List[float]] = None,
metadata: Optional[Dict[str, Any]] = None) -> Memory:
"""Add a memory entry"""
with self.get_session() as session:
memory = Memory(
conversation_id=conversation_id,
memory_type=memory_type,
content=content,
importance=importance,
metadata=metadata or {}
)
if embedding:
memory.set_embedding(embedding)
session.add(memory)
session.flush()
return memory
def search_memories(self, memory_type: Optional[str] = None,
min_importance: float = 0.0, limit: int = 50) -> List[Memory]:
"""Search memories by type and importance"""
with self.get_session() as session:
query = session.query(Memory).filter(
Memory.importance >= min_importance
)
if memory_type:
query = query.filter(Memory.memory_type == memory_type)
return query.order_by(
Memory.importance.desc(),
Memory.last_accessed.desc()
).limit(limit).all()
def update_memory_access(self, memory_id: str) -> bool:
"""Update memory access statistics"""
with self.get_session() as session:
from datetime import datetime
result = session.query(Memory).filter(
Memory.id == memory_id
).update({
"last_accessed": datetime.utcnow(),
"access_count": Memory.access_count + 1
})
return result > 0
# Summary operations
def add_summary(self, conversation_id: str, start_message_id: str,
end_message_id: str, summary: str, message_count: int,
embedding: Optional[List[float]] = None) -> ConversationSummary:
"""Add a conversation summary"""
with self.get_session() as session:
summary_obj = ConversationSummary(
conversation_id=conversation_id,
start_message_id=start_message_id,
end_message_id=end_message_id,
summary=summary,
message_count=message_count
)
if embedding:
summary_obj.embedding = json.dumps(embedding)
session.add(summary_obj)
session.flush()
return summary_obj
def get_summaries(self, conversation_id: str) -> List[ConversationSummary]:
"""Get summaries for a conversation"""
with self.get_session() as session:
return session.query(ConversationSummary).filter(
ConversationSummary.conversation_id == conversation_id
).order_by(ConversationSummary.created_at.asc()).all()
def cleanup_old_data(self, days_old: int = 30) -> Dict[str, int]:
"""Clean up old inactive conversations and memories"""
from datetime import datetime, timedelta
cutoff_date = datetime.utcnow() - timedelta(days=days_old)
with self.get_session() as session:
# Count items to be deleted
old_conversations = session.query(Conversation).filter(
Conversation.updated_at < cutoff_date,
Conversation.is_active == False
).count()
old_memories = session.query(Memory).filter(
Memory.last_accessed < cutoff_date,
Memory.importance < 0.3
).count()
# Delete old conversations (cascades to messages)
session.query(Conversation).filter(
Conversation.updated_at < cutoff_date,
Conversation.is_active == False
).delete()
# Delete low-importance old memories
session.query(Memory).filter(
Memory.last_accessed < cutoff_date,
Memory.importance < 0.3
).delete()
return {
"conversations_deleted": old_conversations,
"memories_deleted": old_memories
}
Step 2: Implementing Vector-Based Semantic Memory
Now let's create a semantic memory system using embeddings for intelligent context retrieval.
Create Embedding Manager
Create src/memory/embeddings.py
:
"""
Embedding and vector operations for semantic memory
"""
import json
import numpy as np
from typing import List, Dict, Any, Optional, Tuple
from sklearn.metrics.pairwise import cosine_similarity
import tiktoken
from openai import OpenAI
from ..utils.config import AgentConfig
from ..utils.logger import get_logger
class EmbeddingManager:
"""Manages text embeddings for semantic search and memory"""
def __init__(self, config: AgentConfig):
"""
Initialize embedding manager
Args:
config: Agent configuration
"""
self.config = config
self.logger = get_logger()
self.client = OpenAI(api_key=config.openai_api_key)
# Initialize tokenizer for token counting
try:
self.tokenizer = tiktoken.encoding_for_model(config.openai_model)
except KeyError:
# Fallback to a common encoding
self.tokenizer = tiktoken.get_encoding("cl100k_base")
self.logger.info("Embedding manager initialized")
def count_tokens(self, text: str) -> int:
"""Count tokens in text"""
return len(self.tokenizer.encode(text))
def get_embedding(self, text: str, model: str = "text-embedding-3-small") -> List[float]:
"""
Get embedding for text
Args:
text: Text to embed
model: Embedding model to use
Returns:
Embedding vector
"""
try:
# Clean and truncate text if necessary
text = text.strip()
if not text:
raise ValueError("Empty text provided for embedding")
# Truncate if too long (embedding models have token limits)
max_tokens = 8000 # Conservative limit for embedding models
if self.count_tokens(text) > max_tokens:
tokens = self.tokenizer.encode(text)[:max_tokens]
text = self.tokenizer.decode(tokens)
self.logger.warning(f"Text truncated to {max_tokens} tokens for embedding")
response = self.client.embeddings.create(
input=text,
model=model
)
embedding = response.data[0].embedding
self.logger.debug(f"Generated embedding of dimension {len(embedding)}")
return embedding
except Exception as e:
self.logger.error(f"Failed to generate embedding: {e}")
raise
def get_embeddings_batch(self, texts: List[str],
model: str = "text-embedding-3-small") -> List[List[float]]:
"""
Get embeddings for multiple texts in batch
Args:
texts: List of texts to embed
model: Embedding model to use
Returns:
List of embedding vectors
"""
try:
if not texts:
return []
# Clean and validate texts
cleaned_texts = []
for text in texts:
text = text.strip()
if text:
# Truncate if necessary
if self.count_tokens(text) > 8000:
tokens = self.tokenizer.encode(text)[:8000]
text = self.tokenizer.decode(tokens)
cleaned_texts.append(text)
if not cleaned_texts:
return []
response = self.client.embeddings.create(
input=cleaned_texts,
model=model
)
embeddings = [data.embedding for data in response.data]
self.logger.debug(f"Generated {len(embeddings)} embeddings")
return embeddings
except Exception as e:
self.logger.error(f"Failed to generate batch embeddings: {e}")
raise
def calculate_similarity(self, embedding1: List[float],
embedding2: List[float]) -> float:
"""
Calculate cosine similarity between two embeddings
Args:
embedding1: First embedding vector
embedding2: Second embedding vector
Returns:
Similarity score (0-1)
"""
try:
# Convert to numpy arrays
vec1 = np.array(embedding1).reshape(1, -1)
vec2 = np.array(embedding2).reshape(1, -1)
# Calculate cosine similarity
similarity = cosine_similarity(vec1, vec2)[0][0]
# Convert to 0-1 range (cosine similarity is -1 to 1)
return (similarity + 1) / 2
except Exception as e:
self.logger.error(f"Failed to calculate similarity: {e}")
return 0.0
def find_most_similar(self, query_embedding: List[float],
candidate_embeddings: List[List[float]],
top_k: int = 5) -> List[Tuple[int, float]]:
"""
Find most similar embeddings to query
Args:
query_embedding: Query embedding vector
candidate_embeddings: List of candidate embedding vectors
top_k: Number of top results to return
Returns:
List of (index, similarity_score) tuples
"""
try:
if not candidate_embeddings:
return []
query_vec = np.array(query_embedding).reshape(1, -1)
candidate_matrix = np.array(candidate_embeddings)
# Calculate similarities
similarities = cosine_similarity(query_vec, candidate_matrix)[0]
# Convert to 0-1 range and get top-k
similarities = (similarities + 1) / 2
top_indices = np.argsort(similarities)[::-1][:top_k]
results = [(int(idx), float(similarities[idx])) for idx in top_indices]
self.logger.debug(f"Found {len(results)} similar embeddings")
return results
except Exception as e:
self.logger.error(f"Failed to find similar embeddings: {e}")
return []
def cluster_embeddings(self, embeddings: List[List[float]],
n_clusters: int = 5) -> List[int]:
"""
Cluster embeddings using K-means
Args:
embeddings: List of embedding vectors
n_clusters: Number of clusters
Returns:
List of cluster labels
"""
try:
from sklearn.cluster import KMeans
if len(embeddings) < n_clusters:
return list(range(len(embeddings)))
embedding_matrix = np.array(embeddings)
kmeans = KMeans(n_clusters=n_clusters, random_state=42)
cluster_labels = kmeans.fit_predict(embedding_matrix)
self.logger.debug(f"Clustered {len(embeddings)} embeddings into {n_clusters} clusters")
return cluster_labels.tolist()
except Exception as e:
self.logger.error(f"Failed to cluster embeddings: {e}")
return [0] * len(embeddings) # Return all in one cluster as fallback
Step 3: Creating the Memory Manager
Now let's create a comprehensive memory management system that ties everything together.
Create Memory Manager
Create src/memory/memory_manager.py
:
"""
Comprehensive memory management for AI Agent
"""
import json
from typing import List, Dict, Any, Optional, Tuple
from datetime import datetime, timedelta
from dataclasses import dataclass
from .database import DatabaseManager
from .embeddings import EmbeddingManager
from .models import Conversation, Message, Memory, ConversationSummary
from ..utils.config import AgentConfig
from ..utils.logger import get_logger
@dataclass
class MemorySearchResult:
"""Result from memory search"""
memory_id: str
content: str
memory_type: str
importance: float
similarity_score: float
last_accessed: datetime
metadata: Dict[str, Any]
@dataclass
class ContextWindow:
"""Represents the current context window"""
messages: List[Dict[str, Any]]
summaries: List[str]
memories: List[MemorySearchResult]
total_tokens: int
max_tokens: int
class MemoryManager:
"""Manages all aspects of agent memory and context"""
def __init__(self, config: AgentConfig, database_url: Optional[str] = None):
"""
Initialize memory manager
Args:
config: Agent configuration
database_url: Optional database URL
"""
self.config = config
self.logger = get_logger()
# Initialize components
self.db = DatabaseManager(database_url)
self.embeddings = EmbeddingManager(config)
# Memory configuration
self.max_context_tokens = config.max_tokens * 3 # Allow more context than response
self.summary_threshold = 20 # Messages before summarization
self.memory_importance_threshold = 0.3
self.similarity_threshold = 0.7
self.logger.info("Memory manager initialized")
def create_conversation(self, conversation_id: str, title: Optional[str] = None) -> str:
"""Create a new conversation with memory tracking"""
try:
# Create conversation in database
conversation = self.db.create_conversation(
conversation_id=conversation_id,
title=title or f"Conversation {datetime.now().strftime('%Y-%m-%d %H:%M')}"
)
self.logger.info(f"Created conversation with memory: {conversation_id}")
return conversation_id
except Exception as e:
self.logger.error(f"Failed to create conversation: {e}")
raise
def add_message(self, conversation_id: str, role: str, content: str,
extract_memories: bool = True) -> str:
"""
Add a message and extract memories if needed
Args:
conversation_id: Conversation ID
role: Message role (user, assistant, system)
content: Message content
extract_memories: Whether to extract memories from the message
Returns:
Message ID
"""
try:
# Count tokens
token_count = self.embeddings.count_tokens(content)
# Add message to database
message = self.db.add_message(
conversation_id=conversation_id,
role=role,
content=content,
token_count=token_count
)
# Extract memories from user messages and important assistant responses
if extract_memories and (role == "user" or (role == "assistant" and len(content) > 100)):
self._extract_memories_from_message(conversation_id, content, role)
# Check if conversation needs summarization
self._check_summarization_needed(conversation_id)
self.logger.debug(f"Added message to conversation {conversation_id}")
return message.id
except Exception as e:
self.logger.error(f"Failed to add message: {e}")
raise
def get_context_window(self, conversation_id: str, query: Optional[str] = None) -> ContextWindow:
"""
Build an intelligent context window for the conversation
Args:
conversation_id: Conversation ID
query: Optional query for semantic search
Returns:
Context window with messages, summaries, and relevant memories
"""
try:
# Get recent messages
recent_messages = self.db.get_recent_messages(conversation_id, count=50)
# Convert to dict format and calculate tokens
messages = []
total_tokens = 0
for msg in reversed(recent_messages): # Most recent first for token counting
msg_dict = {
"role": msg.role,
"content": msg.content,
"timestamp": msg.timestamp.isoformat()
}
msg_tokens = msg.token_count or self.embeddings.count_tokens(msg.content)
if total_tokens + msg_tokens <= self.max_context_tokens:
messages.insert(0, msg_dict) # Insert at beginning to maintain order
total_tokens += msg_tokens
else:
break
# Get conversation summaries if we're truncating messages
summaries = []
if len(messages) < len(recent_messages):
summary_objs = self.db.get_summaries(conversation_id)
summaries = [s.summary for s in summary_objs]
# Get relevant memories
memories = []
if query:
memories = self.search_memories(query, limit=5)
return ContextWindow(
messages=messages,
summaries=summaries,
memories=memories,
total_tokens=total_tokens,
max_tokens=self.max_context_tokens
)
except Exception as e:
self.logger.error(f"Failed to build context window: {e}")
# Return minimal context on error
return ContextWindow(
messages=[],
summaries=[],
memories=[],
total_tokens=0,
max_tokens=self.max_context_tokens
)
def search_memories(self, query: str, memory_types: Optional[List[str]] = None,
limit: int = 10) -> List[MemorySearchResult]:
"""
Search memories using semantic similarity
Args:
query: Search query
memory_types: Optional list of memory types to filter by
limit: Maximum number of results
Returns:
List of memory search results
"""
try:
# Get query embedding
query_embedding = self.embeddings.get_embedding(query)
# Get candidate memories
memories = self.db.search_memories(
memory_type=memory_types[0] if memory_types and len(memory_types) == 1 else None,
min_importance=self.memory_importance_threshold,
limit=limit * 3 # Get more candidates for better filtering
)
if not memories:
return []
# Filter memories with embeddings
candidates = []
candidate_embeddings = []
for memory in memories:
embedding = memory.get_embedding()
if embedding:
candidates.append(memory)
candidate_embeddings.append(embedding)
if not candidate_embeddings:
return []
# Find most similar memories
similar_indices = self.embeddings.find_most_similar(
query_embedding, candidate_embeddings, top_k=limit
)
# Build results
results = []
for idx, similarity_score in similar_indices:
if similarity_score >= self.similarity_threshold:
memory = candidates[idx]
# Update access statistics
self.db.update_memory_access(memory.id)
results.append(MemorySearchResult(
memory_id=memory.id,
content=memory.content,
memory_type=memory.memory_type,
importance=memory.importance,
similarity_score=similarity_score,
last_accessed=memory.last_accessed,
metadata=memory.metadata or {}
))
self.logger.debug(f"Found {len(results)} relevant memories for query")
return results
except Exception as e:
self.logger.error(f"Failed to search memories: {e}")
return []
def _extract_memories_from_message(self, conversation_id: str, content: str, role: str):
"""Extract and store memories from a message"""
try:
# Use LLM to extract structured information
extraction_prompt = f"""
Analyze the following {role} message and extract any important information that should be remembered for future conversations. Focus on:
1. Facts about the user (preferences, background, goals)
2. Important decisions or conclusions
3. Specific requirements or constraints
4. Skills or knowledge demonstrated
Message: {content}
Return a JSON list of memories, each with:
- "content": The memory content
- "type": One of "fact", "preference", "goal", "skill", "decision"
- "importance": Float between 0.0 and 1.0
Only extract truly important information. Return empty list if nothing significant.
"""
# Get extraction from LLM (simplified for tutorial)
# In production, you'd use a more sophisticated extraction method
memories_to_store = self._simple_memory_extraction(content, role)
# Store extracted memories
for memory_data in memories_to_store:
embedding = self.embeddings.get_embedding(memory_data["content"])
self.db.add_memory(
content=memory_data["content"],
memory_type=memory_data["type"],
importance=memory_data["importance"],
conversation_id=conversation_id,
embedding=embedding
)
if memories_to_store:
self.logger.debug(f"Extracted {len(memories_to_store)} memories from message")
except Exception as e:
self.logger.error(f"Failed to extract memories: {e}")
def _simple_memory_extraction(self, content: str, role: str) -> List[Dict[str, Any]]:
"""Simple rule-based memory extraction (placeholder for LLM-based extraction)"""
memories = []
# Simple heuristics for demonstration
if role == "user":
# Extract preferences
if "i like" in content.lower() or "i prefer" in content.lower():
memories.append({
"content": f"User preference: {content}",
"type": "preference",
"importance": 0.7
})
# Extract goals
if "i want to" in content.lower() or "my goal" in content.lower():
memories.append({
"content": f"User goal: {content}",
"type": "goal",
"importance": 0.8
})
return memories
def _check_summarization_needed(self, conversation_id: str):
"""Check if conversation needs summarization"""
try:
conversation = self.db.get_conversation(conversation_id)
if not conversation:
return
# Check if we need to summarize
if conversation.message_count >= self.summary_threshold:
messages = self.db.get_messages(conversation_id, limit=self.summary_threshold)
if len(messages) >= self.summary_threshold:
self._create_summary(conversation_id, messages)
except Exception as e:
self.logger.error(f"Failed to check summarization: {e}")
def _create_summary(self, conversation_id: str, messages: List[Message]):
"""Create a summary of conversation messages"""
try:
# Prepare messages for summarization
message_texts = []
for msg in messages[:-5]: # Don't summarize the most recent messages
message_texts.append(f"{msg.role}: {msg.content}")
conversation_text = "\n".join(message_texts)
# Create summary (simplified - in production use LLM)
summary = f"Summary of {len(message_texts)} messages from conversation"
# Generate embedding for summary
summary_embedding = self.embeddings.get_embedding(summary)
# Store summary
self.db.add_summary(
conversation_id=conversation_id,
start_message_id=messages[0].id,
end_message_id=messages[-6].id, # End before recent messages
summary=summary,
message_count=len(message_texts),
embedding=summary_embedding
)
self.logger.info(f"Created summary for conversation {conversation_id}")
except Exception as e:
self.logger.error(f"Failed to create summary: {e}")
def cleanup_memory(self, days_old: int = 30) -> Dict[str, int]:
"""Clean up old memory data"""
try:
result = self.db.cleanup_old_data(days_old)
self.logger.info(f"Memory cleanup completed: {result}")
return result
except Exception as e:
self.logger.error(f"Failed to cleanup memory: {e}")
return {"conversations_deleted": 0, "memories_deleted": 0}
def get_memory_stats(self) -> Dict[str, Any]:
"""Get memory system statistics"""
try:
conversations = self.db.list_conversations(limit=1000)
total_conversations = len(conversations)
active_conversations = len([c for c in conversations if c.is_active])
total_messages = sum(c.message_count for c in conversations)
# Get memory counts by type
memories = self.db.search_memories(limit=1000)
memory_by_type = {}
for memory in memories:
memory_by_type[memory.memory_type] = memory_by_type.get(memory.memory_type, 0) + 1
return {
"total_conversations": total_conversations,
"active_conversations": active_conversations,
"total_messages": total_messages,
"total_memories": len(memories),
"memory_by_type": memory_by_type,
"database_health": self.db.health_check()
}
except Exception as e:
self.logger.error(f"Failed to get memory stats: {e}")
return {}
Step 4: Integrating Memory with Your Agent
Now let's update your base agent to use the new memory system.
Update Base Agent
Create src/agents/memory_agent.py
:
"""
Memory-enhanced AI Agent
"""
from typing import Optional, Dict, Any, List
from datetime import datetime
from .base_agent import BaseAgent, ConversationContext
from ..memory.memory_manager import MemoryManager, ContextWindow
from ..utils.config import AgentConfig
from ..utils.logger import get_logger
class MemoryAgent(BaseAgent):
"""AI Agent with advanced memory and context handling"""
def __init__(self, config: AgentConfig, database_url: Optional[str] = None):
"""
Initialize memory-enhanced agent
Args:
config: Agent configuration
database_url: Optional database URL for memory storage
"""
super().__init__(config)
# Initialize memory manager
self.memory = MemoryManager(config, database_url)
self.logger.info("Memory-enhanced agent initialized")
def create_conversation(self, conversation_id: Optional[str] = None) -> str:
"""Create a new conversation with memory tracking"""
# Create conversation in base agent
conv_id = super().create_conversation(conversation_id)
# Create conversation in memory system
self.memory.create_conversation(conv_id)
return conv_id
def chat(self, message: str, conversation_id: Optional[str] = None) -> str:
"""Enhanced chat with memory integration"""
if not self.is_initialized:
self.initialize()
# Create or get conversation
if conversation_id is None:
conversation_id = self.create_conversation()
try:
# Add user message to memory
self.memory.add_message(conversation_id, "user", message)
# Get intelligent context window
context_window = self.memory.get_context_window(conversation_id, query=message)
# Build enhanced context for LLM
enhanced_context = self._build_enhanced_context(context_window)
# Get response using enhanced context
response = self._get_completion_with_memory(enhanced_context, message)
# Add assistant response to memory
self.memory.add_message(conversation_id, "assistant", response)
# Update base agent conversation (for compatibility)
base_context = self.get_conversation(conversation_id)
if base_context:
base_context.add_message("user", message)
base_context.add_message("assistant", response)
self.logger.info(f"Memory-enhanced chat completed for {conversation_id}")
return response
except Exception as e:
self.logger.error(f"Error in memory-enhanced chat: {e}")
# Fallback to base agent behavior
return super().chat(message, conversation_id)
def _build_enhanced_context(self, context_window: ContextWindow) -> List[Dict[str, str]]:
"""Build enhanced context with memories and summaries"""
messages = []
# Add system message with memory context
system_content = self.config.system_prompt
# Add relevant memories to system prompt
if context_window.memories:
memory_context = "\n\nRelevant memories from past conversations:\n"
for memory in context_window.memories[:3]: # Top 3 most relevant
memory_context += f"- {memory.content} (importance: {memory.importance:.2f})\n"
system_content += memory_context
# Add conversation summaries
if context_window.summaries:
summary_context = "\n\nConversation history summary:\n"
for summary in context_window.summaries[-2:]: # Last 2 summaries
summary_context += f"- {summary}\n"
system_content += summary_context
messages.append({"role": "system", "content": system_content})
# Add recent messages
for msg in context_window.messages:
if msg["role"] != "system": # Skip system messages from history
messages.append({
"role": msg["role"],
"content": msg["content"]
})
return messages
def _get_completion_with_memory(self, messages: List[Dict[str, str]], query: str) -> str:
"""Get completion using memory-enhanced context"""
try:
response = self.client.chat.completions.create(
model=self.config.openai_model,
messages=messages,
max_tokens=self.config.max_tokens,
temperature=self.config.temperature
)
return response.choices[0].message.content.strip()
except Exception as e:
self.logger.error(f"Error getting completion with memory: {e}")
raise
def search_conversation_history(self, query: str, limit: int = 5) -> List[Dict[str, Any]]:
"""Search conversation history using semantic search"""
try:
memories = self.memory.search_memories(query, limit=limit)
results = []
for memory in memories:
results.append({
"content": memory.content,
"type": memory.memory_type,
"importance": memory.importance,
"similarity": memory.similarity_score,
"last_accessed": memory.last_accessed.isoformat()
})
return results
except Exception as e:
self.logger.error(f"Error searching conversation history: {e}")
return []
def get_memory_stats(self) -> Dict[str, Any]:
"""Get comprehensive memory statistics"""
base_stats = super().get_stats()
memory_stats = self.memory.get_memory_stats()
return {
**base_stats,
"memory": memory_stats
}
def cleanup_old_conversations(self, days_old: int = 30) -> Dict[str, int]:
"""Clean up old conversation data"""
return self.memory.cleanup_memory(days_old)
Step 5: Testing Your Memory-Enhanced Agent
Let's create tests and examples for the new memory system.
Create Memory Tests
Create tests/test_memory_agent.py
:
"""
Tests for memory-enhanced agent
"""
import pytest
from unittest.mock import Mock, patch
import tempfile
import os
from src.agents.memory_agent import MemoryAgent
from src.memory.memory_manager import MemoryManager
from src.utils.config import AgentConfig
@pytest.fixture
def temp_db():
"""Create temporary database for testing"""
with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as f:
db_path = f.name
yield f"sqlite:///{db_path}"
# Cleanup
if os.path.exists(db_path):
os.unlink(db_path)
@pytest.fixture
def memory_config():
"""Mock configuration for memory agent"""
return AgentConfig(
openai_api_key="sk-test-key-123",
openai_model="gpt-3.5-turbo",
max_tokens=1000,
temperature=0.7,
agent_name="Memory Test Agent",
agent_description="A test agent with memory",
system_prompt="You are a helpful assistant with memory.",
log_level="DEBUG",
debug=True
)
class TestMemoryAgent:
"""Test memory-enhanced agent functionality"""
@patch('src.agents.base_agent.OpenAI')
@patch('src.agents.base_agent.AsyncOpenAI')
@patch('src.memory.embeddings.OpenAI')
def test_memory_agent_initialization(self, mock_embed_openai, mock_async_openai,
mock_openai, memory_config, temp_db):
"""Test memory agent initialization"""
agent = MemoryAgent(memory_config, temp_db)
assert agent.config == memory_config
assert agent.memory is not None
assert isinstance(agent.memory, MemoryManager)
@patch('src.agents.base_agent.OpenAI')
@patch('src.agents.base_agent.AsyncOpenAI')
@patch('src.memory.embeddings.OpenAI')
def test_conversation_creation_with_memory(self, mock_embed_openai, mock_async_openai,
mock_openai, memory_config, temp_db):
"""Test conversation creation with memory tracking"""
agent = MemoryAgent(memory_config, temp_db)
conv_id = agent.create_conversation()
# Check that conversation exists in both base agent and memory system
assert conv_id in agent.conversations
# Check memory system has the conversation
memory_conv = agent.memory.db.get_conversation(conv_id)
assert memory_conv is not None
assert memory_conv.id == conv_id
@patch('src.agents.base_agent.OpenAI')
@patch('src.agents.base_agent.AsyncOpenAI')
@patch('src.memory.embeddings.OpenAI')
def test_memory_stats(self, mock_embed_openai, mock_async_openai,
mock_openai, memory_config, temp_db):
"""Test memory statistics"""
agent = MemoryAgent(memory_config, temp_db)
stats = agent.get_memory_stats()
assert "memory" in stats
assert "total_conversations" in stats["memory"]
assert "database_health" in stats["memory"]
Create Example Usage
Create examples/memory_agent_example.py
:
"""
Example usage of memory-enhanced AI agent
"""
import sys
from pathlib import Path
# Add src to path
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from src.agents.memory_agent import MemoryAgent
from src.utils.config import get_config
from src.utils.logger import get_logger
def main():
"""Demonstrate memory-enhanced agent capabilities"""
# Initialize agent with memory
config = get_config()
agent = MemoryAgent(config)
logger = get_logger()
print("🧠 Memory-Enhanced AI Agent Demo")
print("=" * 40)
try:
# Initialize agent
agent.initialize()
# Create conversation
conv_id = agent.create_conversation()
print(f"Created conversation: {conv_id[:8]}...")
# Simulate conversation with memory extraction
messages = [
"Hi! I'm John, a software engineer from San Francisco. I love Python programming.",
"I'm working on a machine learning project using TensorFlow.",
"My goal is to build an AI agent that can help with code reviews.",
"I prefer clean, well-documented code with good test coverage."
]
print("\n📝 Conversation with Memory Extraction:")
for i, message in enumerate(messages, 1):
print(f"\nUser: {message}")
response = agent.chat(message, conv_id)
print(f"Agent: {response}")
# Show memory stats after each message
if i % 2 == 0:
stats = agent.get_memory_stats()
print(f"💾 Memory: {stats['memory']['total_memories']} memories stored")
# Demonstrate memory search
print("\n🔍 Memory Search Demo:")
search_queries = [
"What programming language does the user prefer?",
"What is the user's goal?",
"Where is the user from?"
]
for query in search_queries:
print(f"\nQuery: {query}")
results = agent.search_conversation_history(query, limit=3)
for result in results:
print(f" - {result['content']} (similarity: {result['similarity']:.2f})")
# Show final statistics
print("\n📊 Final Memory Statistics:")
final_stats = agent.get_memory_stats()
memory_stats = final_stats['memory']
print(f" Total Conversations: {memory_stats['total_conversations']}")
print(f" Total Messages: {memory_stats['total_messages']}")
print(f" Total Memories: {memory_stats['total_memories']}")
print(f" Memory Types: {memory_stats['memory_by_type']}")
# Demonstrate context window
print("\n🪟 Context Window Demo:")
context = agent.memory.get_context_window(conv_id, "Tell me about my preferences")
print(f" Messages in context: {len(context.messages)}")
print(f" Relevant memories: {len(context.memories)}")
print(f" Token usage: {context.total_tokens}/{context.max_tokens}")
except Exception as e:
logger.error(f"Demo failed: {e}")
print(f"❌ Error: {e}")
if __name__ == "__main__":
main()
Step 6: Advanced Memory Features
Memory Consolidation
Create src/memory/consolidation.py
:
"""
Memory consolidation and optimization
"""
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
import json
from .memory_manager import MemoryManager
from .models import Memory
from ..utils.logger import get_logger
class MemoryConsolidator:
"""Handles memory consolidation and optimization"""
def __init__(self, memory_manager: MemoryManager):
self.memory = memory_manager
self.logger = get_logger()
def consolidate_similar_memories(self, similarity_threshold: float = 0.9) -> int:
"""Consolidate very similar memories"""
try:
memories = self.memory.db.search_memories(limit=1000)
consolidated_count = 0
# Group memories by type
by_type = {}
for memory in memories:
if memory.memory_type not in by_type:
by_type[memory.memory_type] = []
by_type[memory.memory_type].append(memory)
# Find and consolidate similar memories within each type
for memory_type, type_memories in by_type.items():
if len(type_memories) < 2:
continue
# Get embeddings for all memories of this type
embeddings = []
valid_memories = []
for memory in type_memories:
embedding = memory.get_embedding()
if embedding:
embeddings.append(embedding)
valid_memories.append(memory)
if len(embeddings) < 2:
continue
# Find similar pairs
for i in range(len(embeddings)):
for j in range(i + 1, len(embeddings)):
similarity = self.memory.embeddings.calculate_similarity(
embeddings[i], embeddings[j]
)
if similarity >= similarity_threshold:
# Consolidate these memories
memory1, memory2 = valid_memories[i], valid_memories[j]
# Create consolidated memory
consolidated_content = f"{memory1.content} | {memory2.content}"
consolidated_importance = max(memory1.importance, memory2.importance)
# Add consolidated memory
new_embedding = self.memory.embeddings.get_embedding(consolidated_content)
self.memory.db.add_memory(
content=consolidated_content,
memory_type=memory_type,
importance=consolidated_importance,
embedding=new_embedding,
metadata={"consolidated": True, "source_ids": [memory1.id, memory2.id]}
)
# Remove original memories (simplified - in production, mark as consolidated)
consolidated_count += 2
self.logger.info(f"Consolidated {consolidated_count} similar memories")
return consolidated_count
except Exception as e:
self.logger.error(f"Failed to consolidate memories: {e}")
return 0
def promote_important_memories(self, access_threshold: int = 5) -> int:
"""Promote frequently accessed memories"""
try:
# Find memories that are accessed frequently
memories = self.memory.db.search_memories(limit=1000)
promoted_count = 0
for memory in memories:
if memory.access_count >= access_threshold and memory.importance < 0.8:
# Increase importance based on access frequency
new_importance = min(0.9, memory.importance + (memory.access_count * 0.05))
# Update memory importance
with self.memory.db.get_session() as session:
session.query(Memory).filter(Memory.id == memory.id).update({
"importance": new_importance
})
promoted_count += 1
self.logger.info(f"Promoted {promoted_count} frequently accessed memories")
return promoted_count
except Exception as e:
self.logger.error(f"Failed to promote memories: {e}")
return 0
What You've Accomplished
Congratulations! You've built a sophisticated memory system for your AI agent:
- ✅ Persistent Storage - SQLite database with proper relationships
- ✅ Semantic Memory - Vector embeddings for intelligent retrieval
- ✅ Context Management - Smart context window with token limits
- ✅ Memory Extraction - Automatic extraction from conversations
- ✅ Summarization - Conversation summarization for long chats
- ✅ Memory Search - Semantic search across conversation history
- ✅ Memory Consolidation - Advanced memory optimization
- ✅ Comprehensive Testing - Unit tests and examples
Key Features Implemented:
- Multi-layered Memory Architecture
- Intelligent Context Window Management
- Semantic Search and Retrieval
- Automatic Memory Extraction
- Conversation Summarization
- Memory Consolidation and Cleanup
- Comprehensive Statistics and Monitoring
What's Next?
In Part 4: Implementing Tool Usage and API Integrations, you'll learn:
- Creating custom tools for your agent
- API integration patterns
- Function calling with OpenAI
- Tool result processing and chaining
- Error handling for external services
- Building a tool registry system
Quick Reference Commands
# Run memory agent example
python examples/memory_agent_example.py
# Run memory tests
pytest tests/test_memory_agent.py -v
# Check memory database
sqlite3 data/agent_memory.db ".tables"
Additional Resources
- SQLAlchemy Documentation: docs.sqlalchemy.org
- OpenAI Embeddings Guide: platform.openai.com/docs/guides/embeddings
- Vector Similarity Search: scikit-learn.org/stable/modules/metrics.html
- Memory Systems in AI: Research papers on episodic and semantic memory
Ready to add tool usage capabilities to your agent? Continue to Part 4: Implementing Tool Usage and API Integrations to make your agent truly powerful!
This tutorial is part of our comprehensive AI Agent Development series. The memory system you've built provides the foundation for creating truly intelligent agents that learn and remember from every interaction.
Ad Space
Recommended Tools & Resources
* This section contains affiliate links. We may earn a commission when you purchase through these links at no additional cost to you.
📚 Featured AI Books
OpenAI API
AI PlatformAccess GPT-4 and other powerful AI models for your agent development.
LangChain Plus
FrameworkAdvanced framework for building applications with large language models.
Pinecone Vector Database
DatabaseHigh-performance vector database for AI applications and semantic search.
AI Agent Development Course
EducationComplete course on building production-ready AI agents from scratch.
💡 Pro Tip
Start with the free tiers of these tools to experiment, then upgrade as your AI agent projects grow. Most successful developers use a combination of 2-3 core tools rather than trying everything at once.
🚀 Join the AgentForge Community
Get weekly insights, tutorials, and the latest AI agent developments delivered to your inbox.
No spam, ever. Unsubscribe at any time.