Part 4: Memory System Implementation
Why Persistent Memory?
Memory systems enable AI assistants to:
- Learn preferences: Remember user likes and dislikes
- Maintain context: Continue conversations across sessions
- Build relationships: Develop personalized interactions
- Improve over time: Accumulate knowledge about the user
Database Schema for Memory
# memory/models.py
from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime, Float, JSON
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
from config import config
Base = declarative_base()
class ConversationMemory(Base):
__tablename__ = "conversation_memory"
id = Column(Integer, primary_key=True)
session_id = Column(String(255), index=True)
user_message = Column(Text)
assistant_response = Column(Text)
timestamp = Column(DateTime, default=datetime.utcnow)
importance_score = Column(Float, default=0.5) # 0-1 scale
tags = Column(JSON) # For categorizing memories
embedding_id = Column(String(255)) # Reference to vector store
class UserPreferences(Base):
__tablename__ = "user_preferences"
id = Column(Integer, primary_key=True)
category = Column(String(100), index=True) # e.g., "communication_style", "interests"
key = Column(String(100))
value = Column(Text)
confidence = Column(Float, default=0.5) # How confident we are about this preference
last_updated = Column(DateTime, default=datetime.utcnow)
source = Column(String(100)) # How we learned this preference
class LongTermMemory(Base):
__tablename__ = "long_term_memory"
id = Column(Integer, primary_key=True)
memory_type = Column(String(50)) # "fact", "preference", "relationship", etc.
content = Column(Text)
importance = Column(Float, default=0.5)
last_accessed = Column(DateTime, default=datetime.utcnow)
access_count = Column(Integer, default=1)
metadata = Column(JSON)
# Database setup
engine = create_engine(config.MEMORY_DB_URL, echo=False)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def create_tables():
Base.metadata.create_all(bind=engine)
Memory Manager Implementation
# memory/memory_manager.py
from typing import List, Dict, Any, Optional, Tuple
from sqlalchemy.orm import Session
from datetime import datetime, timedelta
import json
import logging
from .models import SessionLocal, ConversationMemory, UserPreferences, LongTermMemory
from rag.vector_store import VectorStore
from models.ollama_manager import OllamaManager
class MemoryManager:
def __init__(self):
self.vector_store = VectorStore()
self.ollama_manager = OllamaManager()
def get_db(self) -> Session:
return SessionLocal()
async def store_conversation(
self,
session_id: str,
user_message: str,
assistant_response: str,
tags: Optional[List[str]] = None
) -> str:
"""Store conversation in both SQL and vector databases"""
db = self.get_db()
try:
# Calculate importance score
importance = await self._calculate_importance(user_message, assistant_response)
# Store in SQL database
memory = ConversationMemory(
session_id=session_id,
user_message=user_message,
assistant_response=assistant_response,
importance_score=importance,
tags=tags or []
)
db.add(memory)
db.commit()
# Store in vector database for semantic search
conversation_text = f"User: {user_message}\nAssistant: {assistant_response}"
embedding_id = await self.vector_store.add_document(
content=conversation_text,
metadata={
"type": "conversation",
"session_id": session_id,
"importance": importance,
"tags": tags or []
},
collection_name="conversations"
)
# Update SQL record with embedding ID
memory.embedding_id = embedding_id
db.commit()
logging.info(f"Stored conversation with importance {importance}")
return embedding_id
except Exception as e:
db.rollback()
logging.error(f"Failed to store conversation: {e}")
raise
finally:
db.close()
async def _calculate_importance(self, user_message: str, assistant_response: str) -> float:
"""Calculate importance score for a conversation"""
# Use LLM to assess importance
assessment_prompt = f"""Assess the importance of this conversation on a scale of 0.0 to 1.0:
User: {user_message}
Assistant: {assistant_response}
Consider:
- Is this about personal preferences or information?
- Does it contain important facts or decisions?
- Is it likely to be referenced in future conversations?
- Does it reveal emotional or relationship context?
Respond with only a number between 0.0 and 1.0:"""
try:
response = await self.ollama_manager.generate_response(
assessment_prompt,
system_prompt="You are an expert at assessing conversation importance. Respond only with a decimal number."
)
# Extract number from response
import re
numbers = re.findall(r'0\.\d+|1\.0|0\.0', response)
if numbers:
return float(numbers[0])
else:
return 0.5 # Default importance
except Exception as e:
logging.warning(f"Importance calculation failed: {e}")
return 0.5
async def get_conversation_context(
self,
session_id: str,
query: Optional[str] = None,
max_messages: int = 10
) -> List[Dict[str, Any]]:
"""Get relevant conversation context"""
db = self.get_db()
try:
if query:
# Semantic search for relevant conversations
search_results = await self.vector_store.similarity_search(
query=query,
collection_name="conversations",
n_results=max_messages
)
# Get detailed conversation records
context = []
for result in search_results:
# Find matching conversation in SQL
conversation = db.query(ConversationMemory).filter(
ConversationMemory.embedding_id == result['id']
).first()
if conversation:
context.append({
'user_message': conversation.user_message,
'assistant_response': conversation.assistant_response,
'timestamp': conversation.timestamp,
'importance': conversation.importance_score,
'similarity': result['similarity_score']
})
return context
else:
# Get recent conversations from current session
conversations = db.query(ConversationMemory).filter(
ConversationMemory.session_id == session_id
).order_by(ConversationMemory.timestamp.desc()).limit(max_messages).all()
return [{
'user_message': conv.user_message,
'assistant_response': conv.assistant_response,
'timestamp': conv.timestamp,
'importance': conv.importance_score
} for conv in reversed(conversations)]
except Exception as e:
logging.error(f"Failed to get conversation context: {e}")
return []
finally:
db.close()
async def learn_preference(
self,
category: str,
key: str,
value: str,
confidence: float = 0.5,
source: str = "conversation"
):
"""Learn and store user preference"""
db = self.get_db()
try:
# Check if preference already exists
existing = db.query(UserPreferences).filter(
UserPreferences.category == category,
UserPreferences.key == key
).first()
if existing:
# Update existing preference
existing.value = value
existing.confidence = min(1.0, existing.confidence + confidence * 0.1)
existing.last_updated = datetime.utcnow()
existing.source = source
else:
# Create new preference
preference = UserPreferences(
category=category,
key=key,
value=value,
confidence=confidence,
source=source
)
db.add(preference)
db.commit()
logging.info(f"Learned preference: {category}.{key} = {value}")
except Exception as e:
db.rollback()
logging.error(f"Failed to learn preference: {e}")
finally:
db.close()
def get_user_preferences(self, category: Optional[str] = None) -> Dict[str, Any]:
"""Get user preferences"""
db = self.get_db()
try:
query = db.query(UserPreferences)
if category:
query = query.filter(UserPreferences.category == category)
preferences = query.all()
result = {}
for pref in preferences:
if pref.category not in result:
result[pref.category] = {}
result[pref.category][pref.key] = {
'value': pref.value,
'confidence': pref.confidence,
'last_updated': pref.last_updated
}
return result
except Exception as e:
logging.error(f"Failed to get preferences: {e}")
return {}
finally:
db.close()
# Usage example
async def demo_memory():
memory = MemoryManager()
# Store a conversation
await memory.store_conversation(
session_id="demo_session",
user_message="I prefer coffee over tea in the morning",
assistant_response="I'll remember that you prefer coffee in the morning. Would you like me to remind you about coffee-related deals or recipes?",
tags=["preference", "beverages"]
)
# Learn a preference
await memory.learn_preference(
category="beverages",
key="morning_preference",
value="coffee",
confidence=0.8,
source="direct_statement"
)
# Get conversation context
context = await memory.get_conversation_context("demo_session")
print("Context:", context)
# Get preferences
preferences = memory.get_user_preferences()
print("Preferences:", preferences)
if __name__ == "__main__":
import asyncio
asyncio.run(demo_memory())
Why this memory architecture?
- Dual storage: SQL for structured data, vectors for semantic search
- Importance scoring: Prioritizes meaningful conversations
- Preference learning: Automatic extraction of user preferences
- Context retrieval: Relevant memory based on current conversation
- Confidence tracking: Measures certainty of learned information