ai-agentstutorialpythonmemorycontext

Build Your First AI Agent from Scratch - Part 3: Adding Memory and Context Handling

By AgentForge Hub1/8/202526 min read
Intermediate
Build Your First AI Agent from Scratch - Part 3: Adding Memory and Context Handling

Ad Space

Tutorial Navigation


Build Your First AI Agent from Scratch - Part 3: Adding Memory and Context Handling

Welcome to Part 3 of our AI agent tutorial series! In the previous parts, we built a solid foundation with environment setup and basic agent structure. Now we'll add sophisticated memory and context handling capabilities that will make your agent truly intelligent and capable of maintaining long-term conversations.

What You'll Build in This Tutorial

By the end of this tutorial, you'll have:

  • ✅ Persistent conversation storage with SQLite
  • ✅ Vector-based semantic memory using embeddings
  • ✅ Intelligent context window management
  • ✅ Message summarization for long conversations
  • ✅ Semantic search and retrieval of past conversations
  • ✅ Memory consolidation and cleanup strategies
  • ✅ Advanced context injection techniques

Estimated Time: 30-35 minutes


Understanding AI Agent Memory

Before diving into implementation, let's understand the different types of memory an AI agent needs:

1. Working Memory (Short-term)

  • Current conversation context
  • Recent messages within token limits
  • Temporary state and variables

2. Episodic Memory (Medium-term)

  • Complete conversation histories
  • Session-based interactions
  • Contextual relationships between conversations

3. Semantic Memory (Long-term)

  • Learned facts and knowledge
  • User preferences and patterns
  • Summarized insights from past interactions

4. Procedural Memory

  • Learned behaviors and skills
  • Tool usage patterns
  • Problem-solving strategies

Step 1: Setting Up Persistent Storage

First, let's create a robust database system for storing conversations and memory.

Install Additional Dependencies

Add these to your requirements.txt:

# Database and Storage
sqlalchemy==2.0.23
alembic==1.13.1
sqlite3  # Built into Python

# Vector Operations and Embeddings
numpy==1.24.3
scikit-learn==1.3.2
faiss-cpu==1.7.4

# Text Processing
tiktoken==0.5.2
nltk==3.8.1

Install the new dependencies:

pip install -r requirements.txt

Create Database Models

Create src/memory/models.py:

"""
Database models for AI Agent memory system
"""

from datetime import datetime
from typing import Optional, List, Dict, Any
import json
import uuid

from sqlalchemy import create_engine, Column, String, DateTime, Text, Integer, Float, Boolean, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship, Session
from sqlalchemy.dialects.sqlite import JSON

Base = declarative_base()

class Conversation(Base):
    """Represents a conversation session"""
    __tablename__ = "conversations"
    
    id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4()))
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    title = Column(String(200))
    summary = Column(Text)
    metadata = Column(JSON, default=dict)
    is_active = Column(Boolean, default=True)
    message_count = Column(Integer, default=0)
    
    # Relationships
    messages = relationship("Message", back_populates="conversation", cascade="all, delete-orphan")
    memories = relationship("Memory", back_populates="conversation", cascade="all, delete-orphan")
    
    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary"""
        return {
            "id": self.id,
            "created_at": self.created_at.isoformat() if self.created_at else None,
            "updated_at": self.updated_at.isoformat() if self.updated_at else None,
            "title": self.title,
            "summary": self.summary,
            "metadata": self.metadata or {},
            "is_active": self.is_active,
            "message_count": self.message_count
        }

class Message(Base):
    """Represents a single message in a conversation"""
    __tablename__ = "messages"
    
    id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4()))
    conversation_id = Column(String, ForeignKey("conversations.id"), nullable=False)
    role = Column(String(20), nullable=False)  # user, assistant, system
    content = Column(Text, nullable=False)
    timestamp = Column(DateTime, default=datetime.utcnow)
    token_count = Column(Integer)
    metadata = Column(JSON, default=dict)
    
    # Relationships
    conversation = relationship("Conversation", back_populates="messages")
    
    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary"""
        return {
            "id": self.id,
            "conversation_id": self.conversation_id,
            "role": self.role,
            "content": self.content,
            "timestamp": self.timestamp.isoformat() if self.timestamp else None,
            "token_count": self.token_count,
            "metadata": self.metadata or {}
        }

class Memory(Base):
    """Represents a semantic memory entry"""
    __tablename__ = "memories"
    
    id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4()))
    conversation_id = Column(String, ForeignKey("conversations.id"), nullable=True)
    memory_type = Column(String(50), nullable=False)  # fact, preference, skill, etc.
    content = Column(Text, nullable=False)
    importance = Column(Float, default=0.5)  # 0.0 to 1.0
    created_at = Column(DateTime, default=datetime.utcnow)
    last_accessed = Column(DateTime, default=datetime.utcnow)
    access_count = Column(Integer, default=0)
    embedding = Column(Text)  # JSON serialized vector
    metadata = Column(JSON, default=dict)
    
    # Relationships
    conversation = relationship("Conversation", back_populates="memories")
    
    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary"""
        return {
            "id": self.id,
            "conversation_id": self.conversation_id,
            "memory_type": self.memory_type,
            "content": self.content,
            "importance": self.importance,
            "created_at": self.created_at.isoformat() if self.created_at else None,
            "last_accessed": self.last_accessed.isoformat() if self.last_accessed else None,
            "access_count": self.access_count,
            "metadata": self.metadata or {}
        }
    
    def get_embedding(self) -> Optional[List[float]]:
        """Get embedding as list of floats"""
        if self.embedding:
            return json.loads(self.embedding)
        return None
    
    def set_embedding(self, embedding: List[float]) -> None:
        """Set embedding from list of floats"""
        self.embedding = json.dumps(embedding)

class ConversationSummary(Base):
    """Represents a summarized version of conversation segments"""
    __tablename__ = "conversation_summaries"
    
    id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4()))
    conversation_id = Column(String, ForeignKey("conversations.id"), nullable=False)
    start_message_id = Column(String, nullable=False)
    end_message_id = Column(String, nullable=False)
    summary = Column(Text, nullable=False)
    message_count = Column(Integer, nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow)
    embedding = Column(Text)  # JSON serialized vector
    
    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary"""
        return {
            "id": self.id,
            "conversation_id": self.conversation_id,
            "start_message_id": self.start_message_id,
            "end_message_id": self.end_message_id,
            "summary": self.summary,
            "message_count": self.message_count,
            "created_at": self.created_at.isoformat() if self.created_at else None
        }

Create Database Manager

Create src/memory/database.py:

"""
Database management for AI Agent memory system
"""

import os
from typing import Optional, List, Dict, Any
from contextlib import contextmanager
from pathlib import Path

from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.exc import SQLAlchemyError

from .models import Base, Conversation, Message, Memory, ConversationSummary
from ..utils.logger import get_logger

class DatabaseManager:
    """Manages database connections and operations"""
    
    def __init__(self, database_url: Optional[str] = None):
        """
        Initialize database manager
        
        Args:
            database_url: Database connection URL (defaults to SQLite)
        """
        self.logger = get_logger()
        
        if database_url is None:
            # Default to SQLite in data directory
            data_dir = Path("data")
            data_dir.mkdir(exist_ok=True)
            database_url = f"sqlite:///{data_dir}/agent_memory.db"
        
        self.database_url = database_url
        self.engine = create_engine(
            database_url,
            echo=False,  # Set to True for SQL debugging
            pool_pre_ping=True
        )
        
        self.SessionLocal = sessionmaker(
            autocommit=False,
            autoflush=False,
            bind=self.engine
        )
        
        # Create tables
        self.create_tables()
        
        self.logger.info(f"Database initialized: {database_url}")
    
    def create_tables(self):
        """Create database tables"""
        try:
            Base.metadata.create_all(bind=self.engine)
            self.logger.info("Database tables created successfully")
        except SQLAlchemyError as e:
            self.logger.error(f"Failed to create database tables: {e}")
            raise
    
    @contextmanager
    def get_session(self):
        """Get database session with automatic cleanup"""
        session = self.SessionLocal()
        try:
            yield session
            session.commit()
        except Exception as e:
            session.rollback()
            self.logger.error(f"Database session error: {e}")
            raise
        finally:
            session.close()
    
    def health_check(self) -> bool:
        """Check database connectivity"""
        try:
            with self.get_session() as session:
                session.execute(text("SELECT 1"))
            return True
        except Exception as e:
            self.logger.error(f"Database health check failed: {e}")
            return False
    
    # Conversation operations
    def create_conversation(self, conversation_id: str, title: Optional[str] = None, 
                          metadata: Optional[Dict[str, Any]] = None) -> Conversation:
        """Create a new conversation"""
        with self.get_session() as session:
            conversation = Conversation(
                id=conversation_id,
                title=title,
                metadata=metadata or {}
            )
            session.add(conversation)
            session.flush()
            return conversation
    
    def get_conversation(self, conversation_id: str) -> Optional[Conversation]:
        """Get conversation by ID"""
        with self.get_session() as session:
            return session.query(Conversation).filter(
                Conversation.id == conversation_id
            ).first()
    
    def update_conversation(self, conversation_id: str, **kwargs) -> bool:
        """Update conversation fields"""
        with self.get_session() as session:
            result = session.query(Conversation).filter(
                Conversation.id == conversation_id
            ).update(kwargs)
            return result > 0
    
    def list_conversations(self, limit: int = 50, offset: int = 0, 
                          active_only: bool = True) -> List[Conversation]:
        """List conversations with pagination"""
        with self.get_session() as session:
            query = session.query(Conversation)
            
            if active_only:
                query = query.filter(Conversation.is_active == True)
            
            return query.order_by(
                Conversation.updated_at.desc()
            ).offset(offset).limit(limit).all()
    
    # Message operations
    def add_message(self, conversation_id: str, role: str, content: str,
                   token_count: Optional[int] = None, 
                   metadata: Optional[Dict[str, Any]] = None) -> Message:
        """Add a message to a conversation"""
        with self.get_session() as session:
            message = Message(
                conversation_id=conversation_id,
                role=role,
                content=content,
                token_count=token_count,
                metadata=metadata or {}
            )
            session.add(message)
            
            # Update conversation message count
            session.query(Conversation).filter(
                Conversation.id == conversation_id
            ).update({
                "message_count": Conversation.message_count + 1,
                "updated_at": message.timestamp
            })
            
            session.flush()
            return message
    
    def get_messages(self, conversation_id: str, limit: Optional[int] = None,
                    offset: int = 0) -> List[Message]:
        """Get messages for a conversation"""
        with self.get_session() as session:
            query = session.query(Message).filter(
                Message.conversation_id == conversation_id
            ).order_by(Message.timestamp.asc())
            
            if limit:
                query = query.offset(offset).limit(limit)
            
            return query.all()
    
    def get_recent_messages(self, conversation_id: str, count: int = 10) -> List[Message]:
        """Get the most recent messages from a conversation"""
        with self.get_session() as session:
            return session.query(Message).filter(
                Message.conversation_id == conversation_id
            ).order_by(Message.timestamp.desc()).limit(count).all()[::-1]  # Reverse to chronological order
    
    # Memory operations
    def add_memory(self, content: str, memory_type: str, importance: float = 0.5,
                  conversation_id: Optional[str] = None, embedding: Optional[List[float]] = None,
                  metadata: Optional[Dict[str, Any]] = None) -> Memory:
        """Add a memory entry"""
        with self.get_session() as session:
            memory = Memory(
                conversation_id=conversation_id,
                memory_type=memory_type,
                content=content,
                importance=importance,
                metadata=metadata or {}
            )
            
            if embedding:
                memory.set_embedding(embedding)
            
            session.add(memory)
            session.flush()
            return memory
    
    def search_memories(self, memory_type: Optional[str] = None, 
                       min_importance: float = 0.0, limit: int = 50) -> List[Memory]:
        """Search memories by type and importance"""
        with self.get_session() as session:
            query = session.query(Memory).filter(
                Memory.importance >= min_importance
            )
            
            if memory_type:
                query = query.filter(Memory.memory_type == memory_type)
            
            return query.order_by(
                Memory.importance.desc(),
                Memory.last_accessed.desc()
            ).limit(limit).all()
    
    def update_memory_access(self, memory_id: str) -> bool:
        """Update memory access statistics"""
        with self.get_session() as session:
            from datetime import datetime
            result = session.query(Memory).filter(
                Memory.id == memory_id
            ).update({
                "last_accessed": datetime.utcnow(),
                "access_count": Memory.access_count + 1
            })
            return result > 0
    
    # Summary operations
    def add_summary(self, conversation_id: str, start_message_id: str, 
                   end_message_id: str, summary: str, message_count: int,
                   embedding: Optional[List[float]] = None) -> ConversationSummary:
        """Add a conversation summary"""
        with self.get_session() as session:
            summary_obj = ConversationSummary(
                conversation_id=conversation_id,
                start_message_id=start_message_id,
                end_message_id=end_message_id,
                summary=summary,
                message_count=message_count
            )
            
            if embedding:
                summary_obj.embedding = json.dumps(embedding)
            
            session.add(summary_obj)
            session.flush()
            return summary_obj
    
    def get_summaries(self, conversation_id: str) -> List[ConversationSummary]:
        """Get summaries for a conversation"""
        with self.get_session() as session:
            return session.query(ConversationSummary).filter(
                ConversationSummary.conversation_id == conversation_id
            ).order_by(ConversationSummary.created_at.asc()).all()
    
    def cleanup_old_data(self, days_old: int = 30) -> Dict[str, int]:
        """Clean up old inactive conversations and memories"""
        from datetime import datetime, timedelta
        
        cutoff_date = datetime.utcnow() - timedelta(days=days_old)
        
        with self.get_session() as session:
            # Count items to be deleted
            old_conversations = session.query(Conversation).filter(
                Conversation.updated_at < cutoff_date,
                Conversation.is_active == False
            ).count()
            
            old_memories = session.query(Memory).filter(
                Memory.last_accessed < cutoff_date,
                Memory.importance < 0.3
            ).count()
            
            # Delete old conversations (cascades to messages)
            session.query(Conversation).filter(
                Conversation.updated_at < cutoff_date,
                Conversation.is_active == False
            ).delete()
            
            # Delete low-importance old memories
            session.query(Memory).filter(
                Memory.last_accessed < cutoff_date,
                Memory.importance < 0.3
            ).delete()
            
            return {
                "conversations_deleted": old_conversations,
                "memories_deleted": old_memories
            }

Step 2: Implementing Vector-Based Semantic Memory

Now let's create a semantic memory system using embeddings for intelligent context retrieval.

Create Embedding Manager

Create src/memory/embeddings.py:

"""
Embedding and vector operations for semantic memory
"""

import json
import numpy as np
from typing import List, Dict, Any, Optional, Tuple
from sklearn.metrics.pairwise import cosine_similarity
import tiktoken

from openai import OpenAI
from ..utils.config import AgentConfig
from ..utils.logger import get_logger

class EmbeddingManager:
    """Manages text embeddings for semantic search and memory"""
    
    def __init__(self, config: AgentConfig):
        """
        Initialize embedding manager
        
        Args:
            config: Agent configuration
        """
        self.config = config
        self.logger = get_logger()
        self.client = OpenAI(api_key=config.openai_api_key)
        
        # Initialize tokenizer for token counting
        try:
            self.tokenizer = tiktoken.encoding_for_model(config.openai_model)
        except KeyError:
            # Fallback to a common encoding
            self.tokenizer = tiktoken.get_encoding("cl100k_base")
        
        self.logger.info("Embedding manager initialized")
    
    def count_tokens(self, text: str) -> int:
        """Count tokens in text"""
        return len(self.tokenizer.encode(text))
    
    def get_embedding(self, text: str, model: str = "text-embedding-3-small") -> List[float]:
        """
        Get embedding for text
        
        Args:
            text: Text to embed
            model: Embedding model to use
            
        Returns:
            Embedding vector
        """
        try:
            # Clean and truncate text if necessary
            text = text.strip()
            if not text:
                raise ValueError("Empty text provided for embedding")
            
            # Truncate if too long (embedding models have token limits)
            max_tokens = 8000  # Conservative limit for embedding models
            if self.count_tokens(text) > max_tokens:
                tokens = self.tokenizer.encode(text)[:max_tokens]
                text = self.tokenizer.decode(tokens)
                self.logger.warning(f"Text truncated to {max_tokens} tokens for embedding")
            
            response = self.client.embeddings.create(
                input=text,
                model=model
            )
            
            embedding = response.data[0].embedding
            self.logger.debug(f"Generated embedding of dimension {len(embedding)}")
            
            return embedding
            
        except Exception as e:
            self.logger.error(f"Failed to generate embedding: {e}")
            raise
    
    def get_embeddings_batch(self, texts: List[str], 
                           model: str = "text-embedding-3-small") -> List[List[float]]:
        """
        Get embeddings for multiple texts in batch
        
        Args:
            texts: List of texts to embed
            model: Embedding model to use
            
        Returns:
            List of embedding vectors
        """
        try:
            if not texts:
                return []
            
            # Clean and validate texts
            cleaned_texts = []
            for text in texts:
                text = text.strip()
                if text:
                    # Truncate if necessary
                    if self.count_tokens(text) > 8000:
                        tokens = self.tokenizer.encode(text)[:8000]
                        text = self.tokenizer.decode(tokens)
                    cleaned_texts.append(text)
            
            if not cleaned_texts:
                return []
            
            response = self.client.embeddings.create(
                input=cleaned_texts,
                model=model
            )
            
            embeddings = [data.embedding for data in response.data]
            self.logger.debug(f"Generated {len(embeddings)} embeddings")
            
            return embeddings
            
        except Exception as e:
            self.logger.error(f"Failed to generate batch embeddings: {e}")
            raise
    
    def calculate_similarity(self, embedding1: List[float], 
                           embedding2: List[float]) -> float:
        """
        Calculate cosine similarity between two embeddings
        
        Args:
            embedding1: First embedding vector
            embedding2: Second embedding vector
            
        Returns:
            Similarity score (0-1)
        """
        try:
            # Convert to numpy arrays
            vec1 = np.array(embedding1).reshape(1, -1)
            vec2 = np.array(embedding2).reshape(1, -1)
            
            # Calculate cosine similarity
            similarity = cosine_similarity(vec1, vec2)[0][0]
            
            # Convert to 0-1 range (cosine similarity is -1 to 1)
            return (similarity + 1) / 2
            
        except Exception as e:
            self.logger.error(f"Failed to calculate similarity: {e}")
            return 0.0
    
    def find_most_similar(self, query_embedding: List[float], 
                         candidate_embeddings: List[List[float]],
                         top_k: int = 5) -> List[Tuple[int, float]]:
        """
        Find most similar embeddings to query
        
        Args:
            query_embedding: Query embedding vector
            candidate_embeddings: List of candidate embedding vectors
            top_k: Number of top results to return
            
        Returns:
            List of (index, similarity_score) tuples
        """
        try:
            if not candidate_embeddings:
                return []
            
            query_vec = np.array(query_embedding).reshape(1, -1)
            candidate_matrix = np.array(candidate_embeddings)
            
            # Calculate similarities
            similarities = cosine_similarity(query_vec, candidate_matrix)[0]
            
            # Convert to 0-1 range and get top-k
            similarities = (similarities + 1) / 2
            top_indices = np.argsort(similarities)[::-1][:top_k]
            
            results = [(int(idx), float(similarities[idx])) for idx in top_indices]
            
            self.logger.debug(f"Found {len(results)} similar embeddings")
            return results
            
        except Exception as e:
            self.logger.error(f"Failed to find similar embeddings: {e}")
            return []
    
    def cluster_embeddings(self, embeddings: List[List[float]], 
                          n_clusters: int = 5) -> List[int]:
        """
        Cluster embeddings using K-means
        
        Args:
            embeddings: List of embedding vectors
            n_clusters: Number of clusters
            
        Returns:
            List of cluster labels
        """
        try:
            from sklearn.cluster import KMeans
            
            if len(embeddings) < n_clusters:
                return list(range(len(embeddings)))
            
            embedding_matrix = np.array(embeddings)
            kmeans = KMeans(n_clusters=n_clusters, random_state=42)
            cluster_labels = kmeans.fit_predict(embedding_matrix)
            
            self.logger.debug(f"Clustered {len(embeddings)} embeddings into {n_clusters} clusters")
            return cluster_labels.tolist()
            
        except Exception as e:
            self.logger.error(f"Failed to cluster embeddings: {e}")
            return [0] * len(embeddings)  # Return all in one cluster as fallback

Step 3: Creating the Memory Manager

Now let's create a comprehensive memory management system that ties everything together.

Create Memory Manager

Create src/memory/memory_manager.py:

"""
Comprehensive memory management for AI Agent
"""

import json
from typing import List, Dict, Any, Optional, Tuple
from datetime import datetime, timedelta
from dataclasses import dataclass

from .database import DatabaseManager
from .embeddings import EmbeddingManager
from .models import Conversation, Message, Memory, ConversationSummary
from ..utils.config import AgentConfig
from ..utils.logger import get_logger

@dataclass
class MemorySearchResult:
    """Result from memory search"""
    memory_id: str
    content: str
    memory_type: str
    importance: float
    similarity_score: float
    last_accessed: datetime
    metadata: Dict[str, Any]

@dataclass
class ContextWindow:
    """Represents the current context window"""
    messages: List[Dict[str, Any]]
    summaries: List[str]
    memories: List[MemorySearchResult]
    total_tokens: int
    max_tokens: int

class MemoryManager:
    """Manages all aspects of agent memory and context"""
    
    def __init__(self, config: AgentConfig, database_url: Optional[str] = None):
        """
        Initialize memory manager
        
        Args:
            config: Agent configuration
            database_url: Optional database URL
        """
        self.config = config
        self.logger = get_logger()
        
        # Initialize components
        self.db = DatabaseManager(database_url)
        self.embeddings = EmbeddingManager(config)
        
        # Memory configuration
        self.max_context_tokens = config.max_tokens * 3  # Allow more context than response
        self.summary_threshold = 20  # Messages before summarization
        self.memory_importance_threshold = 0.3
        self.similarity_threshold = 0.7
        
        self.logger.info("Memory manager initialized")
    
    def create_conversation(self, conversation_id: str, title: Optional[str] = None) -> str:
        """Create a new conversation with memory tracking"""
        try:
            # Create conversation in database
            conversation = self.db.create_conversation(
                conversation_id=conversation_id,
                title=title or f"Conversation {datetime.now().strftime('%Y-%m-%d %H:%M')}"
            )
            
            self.logger.info(f"Created conversation with memory: {conversation_id}")
            return conversation_id
            
        except Exception as e:
            self.logger.error(f"Failed to create conversation: {e}")
            raise
    
    def add_message(self, conversation_id: str, role: str, content: str,
                   extract_memories: bool = True) -> str:
        """
        Add a message and extract memories if needed
        
        Args:
            conversation_id: Conversation ID
            role: Message role (user, assistant, system)
            content: Message content
            extract_memories: Whether to extract memories from the message
            
        Returns:
            Message ID
        """
        try:
            # Count tokens
            token_count = self.embeddings.count_tokens(content)
            
            # Add message to database
            message = self.db.add_message(
                conversation_id=conversation_id,
                role=role,
                content=content,
                token_count=token_count
            )
            
            # Extract memories from user messages and important assistant responses
            if extract_memories and (role == "user" or (role == "assistant" and len(content) > 100)):
                self._extract_memories_from_message(conversation_id, content, role)
            
            # Check if conversation needs summarization
            self._check_summarization_needed(conversation_id)
            
            self.logger.debug(f"Added message to conversation {conversation_id}")
            return message.id
            
        except Exception as e:
            self.logger.error(f"Failed to add message: {e}")
            raise
    
    def get_context_window(self, conversation_id: str, query: Optional[str] = None) -> ContextWindow:
        """
        Build an intelligent context window for the conversation
        
        Args:
            conversation_id: Conversation ID
            query: Optional query for semantic search
            
        Returns:
            Context window with messages, summaries, and relevant memories
        """
        try:
            # Get recent messages
            recent_messages = self.db.get_recent_messages(conversation_id, count=50)
            
            # Convert to dict format and calculate tokens
            messages = []
            total_tokens = 0
            
            for msg in reversed(recent_messages):  # Most recent first for token counting
                msg_dict = {
                    "role": msg.role,
                    "content": msg.content,
                    "timestamp": msg.timestamp.isoformat()
                }
                msg_tokens = msg.token_count or self.embeddings.count_tokens(msg.content)
                
                if total_tokens + msg_tokens <= self.max_context_tokens:
                    messages.insert(0, msg_dict)  # Insert at beginning to maintain order
                    total_tokens += msg_tokens
                else:
                    break
            
            # Get conversation summaries if we're truncating messages
            summaries = []
            if len(messages) < len(recent_messages):
                summary_objs = self.db.get_summaries(conversation_id)
                summaries = [s.summary for s in summary_objs]
            
            # Get relevant memories
            memories = []
            if query:
                memories = self.search_memories(query, limit=5)
            
            return ContextWindow(
                messages=messages,
                summaries=summaries,
                memories=memories,
                total_tokens=total_tokens,
                max_tokens=self.max_context_tokens
            )
            
        except Exception as e:
            self.logger.error(f"Failed to build context window: {e}")
            # Return minimal context on error
            return ContextWindow(
                messages=[],
                summaries=[],
                memories=[],
                total_tokens=0,
                max_tokens=self.max_context_tokens
            )
    
    def search_memories(self, query: str, memory_types: Optional[List[str]] = None,
                       limit: int = 10) -> List[MemorySearchResult]:
        """
        Search memories using semantic similarity
        
        Args:
            query: Search query
            memory_types: Optional list of memory types to filter by
            limit: Maximum number of results
            
        Returns:
            List of memory search results
        """
        try:
            # Get query embedding
            query_embedding = self.embeddings.get_embedding(query)
            
            # Get candidate memories
            memories = self.db.search_memories(
                memory_type=memory_types[0] if memory_types and len(memory_types) == 1 else None,
                min_importance=self.memory_importance_threshold,
                limit=limit * 3  # Get more candidates for better filtering
            )
            
            if not memories:
                return []
            
            # Filter memories with embeddings
            candidates = []
            candidate_embeddings = []
            
            for memory in memories:
                embedding = memory.get_embedding()
                if embedding:
                    candidates.append(memory)
                    candidate_embeddings.append(embedding)
            
            if not candidate_embeddings:
                return []
            
            # Find most similar memories
            similar_indices = self.embeddings.find_most_similar(
                query_embedding, candidate_embeddings, top_k=limit
            )
            
            # Build results
            results = []
            for idx, similarity_score in similar_indices:
                if similarity_score >= self.similarity_threshold:
                    memory = candidates[idx]
                    
                    # Update access statistics
                    self.db.update_memory_access(memory.id)
                    
                    results.append(MemorySearchResult(
                        memory_id=memory.id,
                        content=memory.content,
                        memory_type=memory.memory_type,
                        importance=memory.importance,
                        similarity_score=similarity_score,
                        last_accessed=memory.last_accessed,
                        metadata=memory.metadata or {}
                    ))
            
            self.logger.debug(f"Found {len(results)} relevant memories for query")
            return results
            
        except Exception as e:
            self.logger.error(f"Failed to search memories: {e}")
            return []
    
    def _extract_memories_from_message(self, conversation_id: str, content: str, role: str):
        """Extract and store memories from a message"""
        try:
            # Use LLM to extract structured information
            extraction_prompt = f"""
            Analyze the following {role} message and extract any important information that should be remembered for future conversations. Focus on:
            
            1. Facts about the user (preferences, background, goals)
            2. Important decisions or conclusions
            3. Specific requirements or constraints
            4. Skills or knowledge demonstrated
            
            Message: {content}
            
            Return a JSON list of memories, each with:
            - "content": The memory content
            - "type": One of "fact", "preference", "goal", "skill", "decision"
            - "importance": Float between 0.0 and 1.0
            
            Only extract truly important information. Return empty list if nothing significant.
            """
            
            # Get extraction from LLM (simplified for tutorial)
            # In production, you'd use a more sophisticated extraction method
            memories_to_store = self._simple_memory_extraction(content, role)
            
            # Store extracted memories
            for memory_data in memories_to_store:
                embedding = self.embeddings.get_embedding(memory_data["content"])
                
                self.db.add_memory(
                    content=memory_data["content"],
                    memory_type=memory_data["type"],
                    importance=memory_data["importance"],
                    conversation_id=conversation_id,
                    embedding=embedding
                )
            
            if memories_to_store:
                self.logger.debug(f"Extracted {len(memories_to_store)} memories from message")
                
        except Exception as e:
            self.logger.error(f"Failed to extract memories: {e}")
    
    def _simple_memory_extraction(self, content: str, role: str) -> List[Dict[str, Any]]:
        """Simple rule-based memory extraction (placeholder for LLM-based extraction)"""
        memories = []
        
        # Simple heuristics for demonstration
        if role == "user":
            # Extract preferences
            if "i like" in content.lower() or "i prefer" in content.lower():
                memories.append({
                    "content": f"User preference: {content}",
                    "type": "preference",
                    "importance": 0.7
                })
            
            # Extract goals
            if "i want to" in content.lower() or "my goal" in content.lower():
                memories.append({
                    "content": f"User goal: {content}",
                    "type": "goal",
                    "importance": 0.8
                })
        
        return memories
    
    def _check_summarization_needed(self, conversation_id: str):
        """Check if conversation needs summarization"""
        try:
            conversation = self.db.get_conversation(conversation_id)
            if not conversation:
                return
            
            # Check if we need to summarize
            if conversation.message_count >= self.summary_threshold:
                messages = self.db.get_messages(conversation_id, limit=self.summary_threshold)
                
                if len(messages) >= self.summary_threshold:
                    self._create_summary(conversation_id, messages)
                    
        except Exception as e:
            self.logger.error(f"Failed to check summarization: {e}")
    
    def _create_summary(self, conversation_id: str, messages: List[Message]):
        """Create a summary of conversation messages"""
        try:
            # Prepare messages for summarization
            message_texts = []
            for msg in messages[:-5]:  # Don't summarize the most recent messages
                message_texts.append(f"{msg.role}: {msg.content}")
            
            conversation_text = "\n".join(message_texts)
            
            # Create summary (simplified - in production use LLM)
            summary = f"Summary of {len(message_texts)} messages from conversation"
            
            # Generate embedding for summary
            summary_embedding = self.embeddings.get_embedding(summary)
            
            # Store summary
            self.db.add_summary(
                conversation_id=conversation_id,
                start_message_id=messages[0].id,
                end_message_id=messages[-6].id,  # End before recent messages
                summary=summary,
                message_count=len(message_texts),
                embedding=summary_embedding
            )
            
            self.logger.info(f"Created summary for conversation {conversation_id}")
            
        except Exception as e:
            self.logger.error(f"Failed to create summary: {e}")
    
    def cleanup_memory(self, days_old: int = 30) -> Dict[str, int]:
        """Clean up old memory data"""
        try:
            result = self.db.cleanup_old_data(days_old)
            self.logger.info(f"Memory cleanup completed: {result}")
            return result
            
        except Exception as e:
            self.logger.error(f"Failed to cleanup memory: {e}")
            return {"conversations_deleted": 0, "memories_deleted": 0}
    
    def get_memory_stats(self) -> Dict[str, Any]:
        """Get memory system statistics"""
        try:
            conversations = self.db.list_conversations(limit=1000)
            
            total_conversations = len(conversations)
            active_conversations = len([c for c in conversations if c.is_active])
            total_messages = sum(c.message_count for c in conversations)
            
            # Get memory counts by type
            memories = self.db.search_memories(limit=1000)
            memory_by_type = {}
            for memory in memories:
                memory_by_type[memory.memory_type] = memory_by_type.get(memory.memory_type, 0) + 1
            
            return {
                "total_conversations": total_conversations,
                "active_conversations": active_conversations,
                "total_messages": total_messages,
                "total_memories": len(memories),
                "memory_by_type": memory_by_type,
                "database_health": self.db.health_check()
            }
            
        except Exception as e:
            self.logger.error(f"Failed to get memory stats: {e}")
            return {}

Step 4: Integrating Memory with Your Agent

Now let's update your base agent to use the new memory system.

Update Base Agent

Create src/agents/memory_agent.py:

"""
Memory-enhanced AI Agent
"""

from typing import Optional, Dict, Any, List
from datetime import datetime

from .base_agent import BaseAgent, ConversationContext
from ..memory.memory_manager import MemoryManager, ContextWindow
from ..utils.config import AgentConfig
from ..utils.logger import get_logger

class MemoryAgent(BaseAgent):
    """AI Agent with advanced memory and context handling"""
    
    def __init__(self, config: AgentConfig, database_url: Optional[str] = None):
        """
        Initialize memory-enhanced agent
        
        Args:
            config: Agent configuration
            database_url: Optional database URL for memory storage
        """
        super().__init__(config)
        
        # Initialize memory manager
        self.memory = MemoryManager(config, database_url)
        
        self.logger.info("Memory-enhanced agent initialized")
    
    def create_conversation(self, conversation_id: Optional[str] = None) -> str:
        """Create a new conversation with memory tracking"""
        # Create conversation in base agent
        conv_id = super().create_conversation(conversation_id)
        
        # Create conversation in memory system
        self.memory.create_conversation(conv_id)
        
        return conv_id
    
    def chat(self, message: str, conversation_id: Optional[str] = None) -> str:
        """Enhanced chat with memory integration"""
        if not self.is_initialized:
            self.initialize()
        
        # Create or get conversation
        if conversation_id is None:
            conversation_id = self.create_conversation()
        
        try:
            # Add user message to memory
            self.memory.add_message(conversation_id, "user", message)
            
            # Get intelligent context window
            context_window = self.memory.get_context_window(conversation_id, query=message)
            
            # Build enhanced context for LLM
            enhanced_context = self._build_enhanced_context(context_window)
            
            # Get response using enhanced context
            response = self._get_completion_with_memory(enhanced_context, message)
            
            # Add assistant response to memory
            self.memory.add_message(conversation_id, "assistant", response)
            
            # Update base agent conversation (for compatibility)
            base_context = self.get_conversation(conversation_id)
            if base_context:
                base_context.add_message("user", message)
                base_context.add_message("assistant", response)
            
            self.logger.info(f"Memory-enhanced chat completed for {conversation_id}")
            return response
            
        except Exception as e:
            self.logger.error(f"Error in memory-enhanced chat: {e}")
            # Fallback to base agent behavior
            return super().chat(message, conversation_id)
    
    def _build_enhanced_context(self, context_window: ContextWindow) -> List[Dict[str, str]]:
        """Build enhanced context with memories and summaries"""
        messages = []
        
        # Add system message with memory context
        system_content = self.config.system_prompt
        
        # Add relevant memories to system prompt
        if context_window.memories:
            memory_context = "\n\nRelevant memories from past conversations:\n"
            for memory in context_window.memories[:3]:  # Top 3 most relevant
                memory_context += f"- {memory.content} (importance: {memory.importance:.2f})\n"
            system_content += memory_context
        
        # Add conversation summaries
        if context_window.summaries:
            summary_context = "\n\nConversation history summary:\n"
            for summary in context_window.summaries[-2:]:  # Last 2 summaries
                summary_context += f"- {summary}\n"
            system_content += summary_context
        
        messages.append({"role": "system", "content": system_content})
        
        # Add recent messages
        for msg in context_window.messages:
            if msg["role"] != "system":  # Skip system messages from history
                messages.append({
                    "role": msg["role"],
                    "content": msg["content"]
                })
        
        return messages
    
    def _get_completion_with_memory(self, messages: List[Dict[str, str]], query: str) -> str:
        """Get completion using memory-enhanced context"""
        try:
            response = self.client.chat.completions.create(
                model=self.config.openai_model,
                messages=messages,
                max_tokens=self.config.max_tokens,
                temperature=self.config.temperature
            )
            
            return response.choices[0].message.content.strip()
            
        except Exception as e:
            self.logger.error(f"Error getting completion with memory: {e}")
            raise
    
    def search_conversation_history(self, query: str, limit: int = 5) -> List[Dict[str, Any]]:
        """Search conversation history using semantic search"""
        try:
            memories = self.memory.search_memories(query, limit=limit)
            
            results = []
            for memory in memories:
                results.append({
                    "content": memory.content,
                    "type": memory.memory_type,
                    "importance": memory.importance,
                    "similarity": memory.similarity_score,
                    "last_accessed": memory.last_accessed.isoformat()
                })
            
            return results
            
        except Exception as e:
            self.logger.error(f"Error searching conversation history: {e}")
            return []
    
    def get_memory_stats(self) -> Dict[str, Any]:
        """Get comprehensive memory statistics"""
        base_stats = super().get_stats()
        memory_stats = self.memory.get_memory_stats()
        
        return {
            **base_stats,
            "memory": memory_stats
        }
    
    def cleanup_old_conversations(self, days_old: int = 30) -> Dict[str, int]:
        """Clean up old conversation data"""
        return self.memory.cleanup_memory(days_old)

Step 5: Testing Your Memory-Enhanced Agent

Let's create tests and examples for the new memory system.

Create Memory Tests

Create tests/test_memory_agent.py:

"""
Tests for memory-enhanced agent
"""

import pytest
from unittest.mock import Mock, patch
import tempfile
import os

from src.agents.memory_agent import MemoryAgent
from src.memory.memory_manager import MemoryManager
from src.utils.config import AgentConfig

@pytest.fixture
def temp_db():
    """Create temporary database for testing"""
    with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as f:
        db_path = f.name
    
    yield f"sqlite:///{db_path}"
    
    # Cleanup
    if os.path.exists(db_path):
        os.unlink(db_path)

@pytest.fixture
def memory_config():
    """Mock configuration for memory agent"""
    return AgentConfig(
        openai_api_key="sk-test-key-123",
        openai_model="gpt-3.5-turbo",
        max_tokens=1000,
        temperature=0.7,
        agent_name="Memory Test Agent",
        agent_description="A test agent with memory",
        system_prompt="You are a helpful assistant with memory.",
        log_level="DEBUG",
        debug=True
    )

class TestMemoryAgent:
    """Test memory-enhanced agent functionality"""
    
    @patch('src.agents.base_agent.OpenAI')
    @patch('src.agents.base_agent.AsyncOpenAI')
    @patch('src.memory.embeddings.OpenAI')
    def test_memory_agent_initialization(self, mock_embed_openai, mock_async_openai, 
                                       mock_openai, memory_config, temp_db):
        """Test memory agent initialization"""
        agent = MemoryAgent(memory_config, temp_db)
        
        assert agent.config == memory_config
        assert agent.memory is not None
        assert isinstance(agent.memory, MemoryManager)
    
    @patch('src.agents.base_agent.OpenAI')
    @patch('src.agents.base_agent.AsyncOpenAI')
    @patch('src.memory.embeddings.OpenAI')
    def test_conversation_creation_with_memory(self, mock_embed_openai, mock_async_openai,
                                             mock_openai, memory_config, temp_db):
        """Test conversation creation with memory tracking"""
        agent = MemoryAgent(memory_config, temp_db)
        
        conv_id = agent.create_conversation()
        
        # Check that conversation exists in both base agent and memory system
        assert conv_id in agent.conversations
        
        # Check memory system has the conversation
        memory_conv = agent.memory.db.get_conversation(conv_id)
        assert memory_conv is not None
        assert memory_conv.id == conv_id
    
    @patch('src.agents.base_agent.OpenAI')
    @patch('src.agents.base_agent.AsyncOpenAI')
    @patch('src.memory.embeddings.OpenAI')
    def test_memory_stats(self, mock_embed_openai, mock_async_openai,
                         mock_openai, memory_config, temp_db):
        """Test memory statistics"""
        agent = MemoryAgent(memory_config, temp_db)
        
        stats = agent.get_memory_stats()
        
        assert "memory" in stats
        assert "total_conversations" in stats["memory"]
        assert "database_health" in stats["memory"]

Create Example Usage

Create examples/memory_agent_example.py:

"""
Example usage of memory-enhanced AI agent
"""

import sys
from pathlib import Path

# Add src to path
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))

from src.agents.memory_agent import MemoryAgent
from src.utils.config import get_config
from src.utils.logger import get_logger

def main():
    """Demonstrate memory-enhanced agent capabilities"""
    
    # Initialize agent with memory
    config = get_config()
    agent = MemoryAgent(config)
    logger = get_logger()
    
    print("🧠 Memory-Enhanced AI Agent Demo")
    print("=" * 40)
    
    try:
        # Initialize agent
        agent.initialize()
        
        # Create conversation
        conv_id = agent.create_conversation()
        print(f"Created conversation: {conv_id[:8]}...")
        
        # Simulate conversation with memory extraction
        messages = [
            "Hi! I'm John, a software engineer from San Francisco. I love Python programming.",
            "I'm working on a machine learning project using TensorFlow.",
            "My goal is to build an AI agent that can help with code reviews.",
            "I prefer clean, well-documented code with good test coverage."
        ]
        
        print("\n📝 Conversation with Memory Extraction:")
        for i, message in enumerate(messages, 1):
            print(f"\nUser: {message}")
            
            response = agent.chat(message, conv_id)
            print(f"Agent: {response}")
            
            # Show memory stats after each message
            if i % 2 == 0:
                stats = agent.get_memory_stats()
                print(f"💾 Memory: {stats['memory']['total_memories']} memories stored")
        
        # Demonstrate memory search
        print("\n🔍 Memory Search Demo:")
        search_queries = [
            "What programming language does the user prefer?",
            "What is the user's goal?",
            "Where is the user from?"
        ]
        
        for query in search_queries:
            print(f"\nQuery: {query}")
            results = agent.search_conversation_history(query, limit=3)
            
            for result in results:
                print(f"  - {result['content']} (similarity: {result['similarity']:.2f})")
        
        # Show final statistics
        print("\n📊 Final Memory Statistics:")
        final_stats = agent.get_memory_stats()
        memory_stats = final_stats['memory']
        
        print(f"  Total Conversations: {memory_stats['total_conversations']}")
        print(f"  Total Messages: {memory_stats['total_messages']}")
        print(f"  Total Memories: {memory_stats['total_memories']}")
        print(f"  Memory Types: {memory_stats['memory_by_type']}")
        
        # Demonstrate context window
        print("\n🪟 Context Window Demo:")
        context = agent.memory.get_context_window(conv_id, "Tell me about my preferences")
        print(f"  Messages in context: {len(context.messages)}")
        print(f"  Relevant memories: {len(context.memories)}")
        print(f"  Token usage: {context.total_tokens}/{context.max_tokens}")
        
    except Exception as e:
        logger.error(f"Demo failed: {e}")
        print(f"❌ Error: {e}")

if __name__ == "__main__":
    main()

Step 6: Advanced Memory Features

Memory Consolidation

Create src/memory/consolidation.py:

"""
Memory consolidation and optimization
"""

from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
import json

from .memory_manager import MemoryManager
from .models import Memory
from ..utils.logger import get_logger

class MemoryConsolidator:
    """Handles memory consolidation and optimization"""
    
    def __init__(self, memory_manager: MemoryManager):
        self.memory = memory_manager
        self.logger = get_logger()
    
    def consolidate_similar_memories(self, similarity_threshold: float = 0.9) -> int:
        """Consolidate very similar memories"""
        try:
            memories = self.memory.db.search_memories(limit=1000)
            consolidated_count = 0
            
            # Group memories by type
            by_type = {}
            for memory in memories:
                if memory.memory_type not in by_type:
                    by_type[memory.memory_type] = []
                by_type[memory.memory_type].append(memory)
            
            # Find and consolidate similar memories within each type
            for memory_type, type_memories in by_type.items():
                if len(type_memories) < 2:
                    continue
                
                # Get embeddings for all memories of this type
                embeddings = []
                valid_memories = []
                
                for memory in type_memories:
                    embedding = memory.get_embedding()
                    if embedding:
                        embeddings.append(embedding)
                        valid_memories.append(memory)
                
                if len(embeddings) < 2:
                    continue
                
                # Find similar pairs
                for i in range(len(embeddings)):
                    for j in range(i + 1, len(embeddings)):
                        similarity = self.memory.embeddings.calculate_similarity(
                            embeddings[i], embeddings[j]
                        )
                        
                        if similarity >= similarity_threshold:
                            # Consolidate these memories
                            memory1, memory2 = valid_memories[i], valid_memories[j]
                            
                            # Create consolidated memory
                            consolidated_content = f"{memory1.content} | {memory2.content}"
                            consolidated_importance = max(memory1.importance, memory2.importance)
                            
                            # Add consolidated memory
                            new_embedding = self.memory.embeddings.get_embedding(consolidated_content)
                            self.memory.db.add_memory(
                                content=consolidated_content,
                                memory_type=memory_type,
                                importance=consolidated_importance,
                                embedding=new_embedding,
                                metadata={"consolidated": True, "source_ids": [memory1.id, memory2.id]}
                            )
                            
                            # Remove original memories (simplified - in production, mark as consolidated)
                            consolidated_count += 2
            
            self.logger.info(f"Consolidated {consolidated_count} similar memories")
            return consolidated_count
            
        except Exception as e:
            self.logger.error(f"Failed to consolidate memories: {e}")
            return 0
    
    def promote_important_memories(self, access_threshold: int = 5) -> int:
        """Promote frequently accessed memories"""
        try:
            # Find memories that are accessed frequently
            memories = self.memory.db.search_memories(limit=1000)
            promoted_count = 0
            
            for memory in memories:
                if memory.access_count >= access_threshold and memory.importance < 0.8:
                    # Increase importance based on access frequency
                    new_importance = min(0.9, memory.importance + (memory.access_count * 0.05))
                    
                    # Update memory importance
                    with self.memory.db.get_session() as session:
                        session.query(Memory).filter(Memory.id == memory.id).update({
                            "importance": new_importance
                        })
                    
                    promoted_count += 1
            
            self.logger.info(f"Promoted {promoted_count} frequently accessed memories")
            return promoted_count
            
        except Exception as e:
            self.logger.error(f"Failed to promote memories: {e}")
            return 0

What You've Accomplished

Congratulations! You've built a sophisticated memory system for your AI agent:

  • Persistent Storage - SQLite database with proper relationships
  • Semantic Memory - Vector embeddings for intelligent retrieval
  • Context Management - Smart context window with token limits
  • Memory Extraction - Automatic extraction from conversations
  • Summarization - Conversation summarization for long chats
  • Memory Search - Semantic search across conversation history
  • Memory Consolidation - Advanced memory optimization
  • Comprehensive Testing - Unit tests and examples

Key Features Implemented:

  1. Multi-layered Memory Architecture
  2. Intelligent Context Window Management
  3. Semantic Search and Retrieval
  4. Automatic Memory Extraction
  5. Conversation Summarization
  6. Memory Consolidation and Cleanup
  7. Comprehensive Statistics and Monitoring

What's Next?

In Part 4: Implementing Tool Usage and API Integrations, you'll learn:

  • Creating custom tools for your agent
  • API integration patterns
  • Function calling with OpenAI
  • Tool result processing and chaining
  • Error handling for external services
  • Building a tool registry system

Quick Reference Commands

# Run memory agent example
python examples/memory_agent_example.py

# Run memory tests
pytest tests/test_memory_agent.py -v

# Check memory database
sqlite3 data/agent_memory.db ".tables"

Additional Resources

Ready to add tool usage capabilities to your agent? Continue to Part 4: Implementing Tool Usage and API Integrations to make your agent truly powerful!


This tutorial is part of our comprehensive AI Agent Development series. The memory system you've built provides the foundation for creating truly intelligent agents that learn and remember from every interaction.

Ad Space

Recommended Tools & Resources

* This section contains affiliate links. We may earn a commission when you purchase through these links at no additional cost to you.

OpenAI API

AI Platform

Access GPT-4 and other powerful AI models for your agent development.

Pay-per-use

LangChain Plus

Framework

Advanced framework for building applications with large language models.

Free + Paid

Pinecone Vector Database

Database

High-performance vector database for AI applications and semantic search.

Free tier available

AI Agent Development Course

Education

Complete course on building production-ready AI agents from scratch.

$199

💡 Pro Tip

Start with the free tiers of these tools to experiment, then upgrade as your AI agent projects grow. Most successful developers use a combination of 2-3 core tools rather than trying everything at once.

🚀 Join the AgentForge Community

Get weekly insights, tutorials, and the latest AI agent developments delivered to your inbox.

No spam, ever. Unsubscribe at any time.

Loading conversations...