Part 4: Memory System Implementation

4 June 2025 · netologist · 5 min, 948 words ·

Why Persistent Memory?

Memory systems enable AI assistants to:

Database Schema for Memory

# memory/models.py
from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime, Float, JSON
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
from config import config

Base = declarative_base()

class ConversationMemory(Base):
    __tablename__ = "conversation_memory"
    
    id = Column(Integer, primary_key=True)
    session_id = Column(String(255), index=True)
    user_message = Column(Text)
    assistant_response = Column(Text)
    timestamp = Column(DateTime, default=datetime.utcnow)
    importance_score = Column(Float, default=0.5)  # 0-1 scale
    tags = Column(JSON)  # For categorizing memories
    embedding_id = Column(String(255))  # Reference to vector store

class UserPreferences(Base):
    __tablename__ = "user_preferences"
    
    id = Column(Integer, primary_key=True)
    category = Column(String(100), index=True)  # e.g., "communication_style", "interests"
    key = Column(String(100))
    value = Column(Text)
    confidence = Column(Float, default=0.5)  # How confident we are about this preference
    last_updated = Column(DateTime, default=datetime.utcnow)
    source = Column(String(100))  # How we learned this preference

class LongTermMemory(Base):
    __tablename__ = "long_term_memory"
    
    id = Column(Integer, primary_key=True)
    memory_type = Column(String(50))  # "fact", "preference", "relationship", etc.
    content = Column(Text)
    importance = Column(Float, default=0.5)
    last_accessed = Column(DateTime, default=datetime.utcnow)
    access_count = Column(Integer, default=1)
    metadata = Column(JSON)

# Database setup
engine = create_engine(config.MEMORY_DB_URL, echo=False)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

def create_tables():
    Base.metadata.create_all(bind=engine)

Memory Manager Implementation

# memory/memory_manager.py
from typing import List, Dict, Any, Optional, Tuple
from sqlalchemy.orm import Session
from datetime import datetime, timedelta
import json
import logging
from .models import SessionLocal, ConversationMemory, UserPreferences, LongTermMemory
from rag.vector_store import VectorStore
from models.ollama_manager import OllamaManager

class MemoryManager:
    def __init__(self):
        self.vector_store = VectorStore()
        self.ollama_manager = OllamaManager()
        
    def get_db(self) -> Session:
        return SessionLocal()
    
    async def store_conversation(
        self,
        session_id: str,
        user_message: str,
        assistant_response: str,
        tags: Optional[List[str]] = None
    ) -> str:
        """Store conversation in both SQL and vector databases"""
        
        db = self.get_db()
        try:
            # Calculate importance score
            importance = await self._calculate_importance(user_message, assistant_response)
            
            # Store in SQL database
            memory = ConversationMemory(
                session_id=session_id,
                user_message=user_message,
                assistant_response=assistant_response,
                importance_score=importance,
                tags=tags or []
            )
            
            db.add(memory)
            db.commit()
            
            # Store in vector database for semantic search
            conversation_text = f"User: {user_message}\nAssistant: {assistant_response}"
            embedding_id = await self.vector_store.add_document(
                content=conversation_text,
                metadata={
                    "type": "conversation",
                    "session_id": session_id,
                    "importance": importance,
                    "tags": tags or []
                },
                collection_name="conversations"
            )
            
            # Update SQL record with embedding ID
            memory.embedding_id = embedding_id
            db.commit()
            
            logging.info(f"Stored conversation with importance {importance}")
            return embedding_id
            
        except Exception as e:
            db.rollback()
            logging.error(f"Failed to store conversation: {e}")
            raise
        finally:
            db.close()
    
    async def _calculate_importance(self, user_message: str, assistant_response: str) -> float:
        """Calculate importance score for a conversation"""
        
        # Use LLM to assess importance
        assessment_prompt = f"""Assess the importance of this conversation on a scale of 0.0 to 1.0:

User: {user_message}
Assistant: {assistant_response}

Consider:
- Is this about personal preferences or information?
- Does it contain important facts or decisions?
- Is it likely to be referenced in future conversations?
- Does it reveal emotional or relationship context?

Respond with only a number between 0.0 and 1.0:"""

        try:
            response = await self.ollama_manager.generate_response(
                assessment_prompt,
                system_prompt="You are an expert at assessing conversation importance. Respond only with a decimal number."
            )
            
            # Extract number from response
            import re
            numbers = re.findall(r'0\.\d+|1\.0|0\.0', response)
            if numbers:
                return float(numbers[0])
            else:
                return 0.5  # Default importance
                
        except Exception as e:
            logging.warning(f"Importance calculation failed: {e}")
            return 0.5
    
    async def get_conversation_context(
        self,
        session_id: str,
        query: Optional[str] = None,
        max_messages: int = 10
    ) -> List[Dict[str, Any]]:
        """Get relevant conversation context"""
        
        db = self.get_db()
        try:
            if query:
                # Semantic search for relevant conversations
                search_results = await self.vector_store.similarity_search(
                    query=query,
                    collection_name="conversations",
                    n_results=max_messages
                )
                
                # Get detailed conversation records
                context = []
                for result in search_results:
                    # Find matching conversation in SQL
                    conversation = db.query(ConversationMemory).filter(
                        ConversationMemory.embedding_id == result['id']
                    ).first()
                    
                    if conversation:
                        context.append({
                            'user_message': conversation.user_message,
                            'assistant_response': conversation.assistant_response,
                            'timestamp': conversation.timestamp,
                            'importance': conversation.importance_score,
                            'similarity': result['similarity_score']
                        })
                
                return context
            else:
                # Get recent conversations from current session
                conversations = db.query(ConversationMemory).filter(
                    ConversationMemory.session_id == session_id
                ).order_by(ConversationMemory.timestamp.desc()).limit(max_messages).all()
                
                return [{
                    'user_message': conv.user_message,
                    'assistant_response': conv.assistant_response,
                    'timestamp': conv.timestamp,
                    'importance': conv.importance_score
                } for conv in reversed(conversations)]
                
        except Exception as e:
            logging.error(f"Failed to get conversation context: {e}")
            return []
        finally:
            db.close()
    
    async def learn_preference(
        self,
        category: str,
        key: str,
        value: str,
        confidence: float = 0.5,
        source: str = "conversation"
    ):
        """Learn and store user preference"""
        
        db = self.get_db()
        try:
            # Check if preference already exists
            existing = db.query(UserPreferences).filter(
                UserPreferences.category == category,
                UserPreferences.key == key
            ).first()
            
            if existing:
                # Update existing preference
                existing.value = value
                existing.confidence = min(1.0, existing.confidence + confidence * 0.1)
                existing.last_updated = datetime.utcnow()
                existing.source = source
            else:
                # Create new preference
                preference = UserPreferences(
                    category=category,
                    key=key,
                    value=value,
                    confidence=confidence,
                    source=source
                )
                db.add(preference)
            
            db.commit()
            logging.info(f"Learned preference: {category}.{key} = {value}")
            
        except Exception as e:
            db.rollback()
            logging.error(f"Failed to learn preference: {e}")
        finally:
            db.close()
    
    def get_user_preferences(self, category: Optional[str] = None) -> Dict[str, Any]:
        """Get user preferences"""
        
        db = self.get_db()
        try:
            query = db.query(UserPreferences)
            if category:
                query = query.filter(UserPreferences.category == category)
            
            preferences = query.all()
            
            result = {}
            for pref in preferences:
                if pref.category not in result:
                    result[pref.category] = {}
                result[pref.category][pref.key] = {
                    'value': pref.value,
                    'confidence': pref.confidence,
                    'last_updated': pref.last_updated
                }
            
            return result
            
        except Exception as e:
            logging.error(f"Failed to get preferences: {e}")
            return {}
        finally:
            db.close()

# Usage example
async def demo_memory():
    memory = MemoryManager()
    
    # Store a conversation
    await memory.store_conversation(
        session_id="demo_session",
        user_message="I prefer coffee over tea in the morning",
        assistant_response="I'll remember that you prefer coffee in the morning. Would you like me to remind you about coffee-related deals or recipes?",
        tags=["preference", "beverages"]
    )
    
    # Learn a preference
    await memory.learn_preference(
        category="beverages",
        key="morning_preference",
        value="coffee",
        confidence=0.8,
        source="direct_statement"
    )
    
    # Get conversation context
    context = await memory.get_conversation_context("demo_session")
    print("Context:", context)
    
    # Get preferences
    preferences = memory.get_user_preferences()
    print("Preferences:", preferences)

if __name__ == "__main__":
    import asyncio
    asyncio.run(demo_memory())

Why this memory architecture?