Part 9: Complete Pydantic AI Integration

9 June 2025 · netologist · 8 min, 1698 words ·

Why Pydantic AI?

Pydantic AI provides type-safe AI application development:

Main Application with Pydantic AI

# main.py - Complete AlexAI Assistant
from pydantic_ai import Agent, RunContext
from pydantic import BaseModel, Field, ValidationError
from typing import List, Dict, Any, Optional, Union, Literal
import asyncio
import logging
from datetime import datetime
import sys
from pathlib import Path

# Import all our components
from config import config
from models.ollama_manager import OllamaManager
from rag.vector_store import VectorStore, RAGGenerator
from memory.memory_manager import MemoryManager, create_tables
from voice.speech_processor import VoiceInterface
from vision.ocr_processor import VisionLanguageProcessor
from mcp.mcp_server import MCPServer
from a2a.protocol import AgentCoordinator

# Pydantic Models for type safety
class ChatRequest(BaseModel):
    message: str = Field(..., description="User message", min_length=1, max_length=10000)
    session_id: str = Field(default="default", description="Session identifier")
    use_memory: bool = Field(default=True, description="Whether to use conversation memory")
    use_rag: bool = Field(default=True, description="Whether to use RAG for enhanced responses")
    voice_response: bool = Field(default=False, description="Whether to respond with voice")

class ChatResponse(BaseModel):
    response: str = Field(..., description="Assistant response")
    session_id: str = Field(..., description="Session identifier")
    used_memory: bool = Field(..., description="Whether memory was used")
    used_rag: bool = Field(..., description="Whether RAG was used")
    response_time: float = Field(..., description="Response time in seconds")
    confidence_score: Optional[float] = Field(None, description="Response confidence (0-1)")

class DocumentProcessRequest(BaseModel):
    file_path: str = Field(..., description="Path to document file")
    analysis_type: Literal["extract", "analyze", "qa"] = Field(default="analyze")
    question: Optional[str] = Field(None, description="Question for QA analysis")
    enhancement_type: Literal["default", "scan", "photo", "screenshot"] = Field(default="default")

class DocumentProcessResponse(BaseModel):
    text: Optional[str] = Field(None, description="Extracted text")
    analysis: Optional[str] = Field(None, description="Document analysis")
    confidence: float = Field(..., description="OCR confidence score")
    word_count: int = Field(..., description="Number of words extracted")
    processing_time: float = Field(..., description="Processing time in seconds")

class VoiceRequest(BaseModel):
    mode: Literal["listen", "speak", "conversation"] = Field(default="conversation")
    text: Optional[str] = Field(None, description="Text to speak (for speak mode)")
    timeout: float = Field(default=30.0, description="Timeout in seconds")
    wake_word_detection: bool = Field(default=True, description="Enable wake word detection")

class VoiceResponse(BaseModel):
    status: str = Field(..., description="Operation status")
    transcribed_text: Optional[str] = Field(None, description="Transcribed speech")
    response_text: Optional[str] = Field(None, description="Assistant response")
    wake_word_detected: Optional[bool] = Field(None, description="Wake word detection result")

class KnowledgeRequest(BaseModel):
    action: Literal["search", "add", "update", "delete"] = Field(..., description="Knowledge base action")
    query: str = Field(..., description="Search query or content")
    collection: str = Field(default="documents", description="Collection name")
    metadata: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Document metadata")

class KnowledgeResponse(BaseModel):
    results: Optional[List[Dict[str, Any]]] = Field(None, description="Search results")
    document_id: Optional[str] = Field(None, description="Added document ID")
    status: str = Field(..., description="Operation status")

class SystemStatus(BaseModel):
    service: str = Field(..., description="Service name")
    version: str = Field(..., description="Service version")
    status: str = Field(..., description="Service status")
    uptime: float = Field(..., description="Uptime in seconds")
    memory_usage: Dict[str, Any] = Field(..., description="Memory usage statistics")
    active_sessions: int = Field(..., description="Number of active sessions")
    total_requests: int = Field(..., description="Total requests processed")

# Context for passing data between agent functions
class AlexAIContext(RunContext):
    def __init__(self):
        super().__init__()
        self.start_time = datetime.now()
        self.total_requests = 0
        self.active_sessions = set()
        
        # Initialize all components
        self.ollama_manager = OllamaManager()
        self.vector_store = VectorStore()
        self.rag_generator = RAGGenerator()
        self.memory_manager = MemoryManager()
        self.voice_interface = VoiceInterface()
        self.vision_processor = VisionLanguageProcessor(self.ollama_manager)
        self.mcp_server = MCPServer()
        self.agent_coordinator = AgentCoordinator(config.AGENT_ID)

# Create Pydantic AI Agent
alexai_agent = Agent(
    'ollama:llama3.1:8b',  # Primary model
    result_type=str,       # Default result type
    system_prompt="""You are AlexAI, an advanced personal assistant that prioritizes user privacy and runs entirely locally. 

You have access to:
- Local language models (Ollama)
- Vector search and RAG capabilities
- Persistent memory and learning
- Voice interaction (speech-to-text and text-to-speech)
- Document processing with OCR
- Agent-to-agent communication
- Comprehensive tool integration via MCP

Your core principles:
1. Privacy first - all processing happens locally
2. Helpful and accurate responses
3. Learn from interactions to improve over time
4. Be transparent about your capabilities and limitations
5. Provide actionable and contextual assistance

Always be friendly, professional, and focused on helping the user achieve their goals."""
)

# Agent tool functions
@alexai_agent.tool
async def chat_with_memory_and_rag(
    ctx: AlexAIContext, 
    request: ChatRequest
) -> ChatResponse:
    """Enhanced chat with memory and RAG capabilities"""
    
    start_time = datetime.now()
    
    try:
        # Update session tracking
        ctx.active_sessions.add(request.session_id)
        ctx.total_requests += 1
        
        # Get conversation context if memory enabled
        context = ""
        if request.use_memory:
            conversations = await ctx.memory_manager.get_conversation_context(
                session_id=request.session_id,
                query=request.message
            )
            if conversations:
                context_parts = []
                for conv in conversations[-3:]:
                    context_parts.append(f"User: {conv['user_message']}")
                    context_parts.append(f"Assistant: {conv['assistant_response']}")
                context = "\n".join(context_parts)
        
        # Generate response
        if request.use_rag:
            response = await ctx.rag_generator.generate_with_context(
                query=request.message,
                system_prompt=f"Previous conversation context:\n{context}" if context else None
            )
        else:
            prompt = f"{context}\n\nUser: {request.message}" if context else request.message
            response = await ctx.ollama_manager.generate_response(
                prompt=prompt,
                system_prompt="You are AlexAI, a helpful personal assistant."
            )
        
        # Store conversation in memory
        if request.use_memory:
            await ctx.memory_manager.store_conversation(
                session_id=request.session_id,
                user_message=request.message,
                assistant_response=response
            )
        
        # Voice response if requested
        if request.voice_response:
            await ctx.voice_interface.tts.speak_async(response)
        
        response_time = (datetime.now() - start_time).total_seconds()
        
        return ChatResponse(
            response=response,
            session_id=request.session_id,
            used_memory=request.use_memory,
            used_rag=request.use_rag,
            response_time=response_time,
            confidence_score=0.9  # Could be calculated based on various factors
        )
        
    except Exception as e:
        logging.error(f"Chat error: {e}")
        raise

@alexai_agent.tool
async def process_document(
    ctx: AlexAIContext,
    request: DocumentProcessRequest
) -> DocumentProcessResponse:
    """Process documents with OCR and analysis"""
    
    start_time = datetime.now()
    
    try:
        if request.analysis_type == "extract":
            result = ctx.vision_processor.ocr.extract_text(
                request.file_path,
                enhancement_type=request.enhancement_type
            )
            return DocumentProcessResponse(
                text=result["text"],
                confidence=result.get("confidence", 0),
                word_count=result.get("word_count", 0),
                processing_time=(datetime.now() - start_time).total_seconds()
            )
        
        elif request.analysis_type == "analyze":
            analysis = await ctx.vision_processor.analyze_document(request.file_path)
            return DocumentProcessResponse(
                text=analysis["ocr_result"]["text"],
                analysis=analysis["analysis"],
                confidence=analysis["confidence"],
                word_count=analysis["word_count"],
                processing_time=(datetime.now() - start_time).total_seconds()
            )
        
        elif request.analysis_type == "qa":
            if not request.question:
                raise ValueError("Question required for QA analysis")
            
            answer = await ctx.vision_processor.answer_document_questions(
                request.file_path, 
                request.question
            )
            
            # Also extract text for metadata
            ocr_result = ctx.vision_processor.ocr.extract_text(request.file_path)
            
            return DocumentProcessResponse(
                text=answer,
                analysis=f"Q: {request.question}\nA: {answer}",
                confidence=ocr_result.get("confidence", 0),
                word_count=len(answer.split()),
                processing_time=(datetime.now() - start_time).total_seconds()
            )
        
    except Exception as e:
        logging.error(f"Document processing error: {e}")
        raise

@alexai_agent.tool
async def voice_interaction(
    ctx: AlexAIContext,
    request: VoiceRequest
) -> VoiceResponse:
    """Handle voice interactions"""
    
    try:
        if request.mode == "speak":
            if not request.text:
                raise ValueError("Text required for speak mode")
            
            await ctx.voice_interface.tts.speak_async(request.text)
            return VoiceResponse(
                status="spoken",
                response_text=request.text
            )
        
        elif request.mode == "listen":
            if request.wake_word_detection:
                detected = await ctx.voice_interface.listen_for_wake_word(request.timeout)
                return VoiceResponse(
                    status="listening_completed",
                    wake_word_detected=detected
                )
            else:
                # Direct listening without wake word
                return VoiceResponse(
                    status="listening_ready"
                )
        
        elif request.mode == "conversation":
            async def chat_callback(user_text: str) -> str:
                chat_request = ChatRequest(
                    message=user_text,
                    session_id="voice_session",
                    use_memory=True,
                    use_rag=True
                )
                chat_response = await chat_with_memory_and_rag(ctx, chat_request)
                return chat_response.response
            
            if request.wake_word_detection:
                wake_detected = await ctx.voice_interface.listen_for_wake_word(request.timeout)
                if not wake_detected:
                    return VoiceResponse(
                        status="no_wake_word",
                        wake_word_detected=False
                    )
            
            response_text = await ctx.voice_interface.voice_conversation(chat_callback)
            return VoiceResponse(
                status="conversation_completed",
                response_text=response_text,
                wake_word_detected=request.wake_word_detection
            )
        
    except Exception as e:
        logging.error(f"Voice interaction error: {e}")
        raise

@alexai_agent.tool
async def manage_knowledge_base(
    ctx: AlexAIContext,
    request: KnowledgeRequest
) -> KnowledgeResponse:
    """Manage knowledge base operations"""
    
    try:
        if request.action == "search":
            results = await ctx.vector_store.similarity_search(
                query=request.query,
                collection_name=request.collection
            )
            return KnowledgeResponse(
                results=results,
                status="search_completed"
            )
        
        elif request.action == "add":
            doc_id = await ctx.vector_store.add_document(
                content=request.query,
                metadata=request.metadata,
                collection_name=request.collection
            )
            return KnowledgeResponse(
                document_id=doc_id,
                status="document_added"
            )
        
        else:
            raise ValueError(f"Unsupported action: {request.action}")
        
    except Exception as e:
        logging.error(f"Knowledge base error: {e}")
        raise

@alexai_agent.tool
async def get_system_status(ctx: AlexAIContext) -> SystemStatus:
    """Get system status and statistics"""
    
    uptime = (datetime.now() - ctx.start_time).total_seconds()
    
    # Simple memory usage (could be enhanced with psutil)
    import sys
    memory_usage = {
        "python_objects": sys.getsizeof(ctx),
        "active_sessions": len(ctx.active_sessions)
    }
    
    return SystemStatus(
        service="AlexAI",
        version="1.0.0",
        status="running",
        uptime=uptime,
        memory_usage=memory_usage,
        active_sessions=len(ctx.active_sessions),
        total_requests=ctx.total_requests
    )

# Main application class
class AlexAIAssistant:
    def __init__(self):
        self.context = AlexAIContext()
        self.is_running = False
        
    async def initialize(self):
        """Initialize all components"""
        logging.info("Initializing AlexAI Assistant...")
        
        # Create database tables
        create_tables()
        
        # Initialize Ollama models
        await self.context.ollama_manager.ensure_model_available(config.PRIMARY_MODEL)
        await self.context.ollama_manager.ensure_model_available(config.EMBEDDING_MODEL)
        
        # Start agent coordination
        await self.context.agent_coordinator.start()
        
        logging.info("AlexAI Assistant initialized successfully")
    
    async def shutdown(self):
        """Shutdown assistant gracefully"""
        logging.info("Shutting down AlexAI Assistant...")
        
        # Stop agent coordination
        await self.context.agent_coordinator.stop()
        
        # Clean up voice interface
        self.context.voice_interface.cleanup()
        
        logging.info("AlexAI Assistant shutdown complete")
    
    async def run_interactive_mode(self):
        """Run in interactive command-line mode"""
        print("AlexAI Assistant - Interactive Mode")
        print("Commands: chat, voice, document, knowledge, status, quit")
        print("=" * 50)
        
        self.is_running = True
        
        while self.is_running:
            try:
                command = input("\nAlexAI> ").strip().lower()
                
                if command == "quit":
                    self.is_running = False
                    break
                
                elif command == "chat":
                    await self._interactive_chat()
                
                elif command == "voice":
                    await self._interactive_voice()
                
                elif command == "document":
                    await self._interactive_document()
                
                elif command == "knowledge":
                    await self._interactive_knowledge()
                
                elif command == "status":
                    await self._interactive_status()
                
                else:
                    print("Unknown command. Available: chat, voice, document, knowledge, status, quit")
                    
            except KeyboardInterrupt:
                print("\nUse 'quit' to exit gracefully")
            except Exception as e:
                print(f"Error: {e}")
    
    async def _interactive_chat(self):
        """Interactive chat mode"""
        print("\nChat Mode (type 'back' to return)")
        
        while True:
            try:
                message = input("You: ").strip()
                if message.lower() == "back":
                    break
                if not message:
                    continue
                
                request = ChatRequest(message=message)
                response = await chat_with_memory_and_rag(self.context, request)
                
                print(f"AlexAI: {response.response}")
                print(f"(Response time: {response.response_time:.2f}s)")
                
            except Exception as e:
                print(f"Chat error: {e}")
    
    async def _interactive_voice(self):
        """Interactive voice mode"""
        print("\nVoice Mode")
        print("1. Speak text")
        print("2. Listen for wake word")
        print("3. Voice conversation")
        
        choice = input("Choose option (1-3): ").strip()
        
        try:
            if choice == "1":
                text = input("Enter text to speak: ")
                request = VoiceRequest(mode="speak", text=text)
                response = await voice_interaction(self.context, request)
                print(f"Status: {response.status}")
            
            elif choice == "2":
                print("Listening for wake word...")
                request = VoiceRequest(mode="listen", timeout=10)
                response = await voice_interaction(self.context, request)
                print(f"Wake word detected: {response.wake_word_detected}")
            
            elif choice == "3":
                print("Starting voice conversation...")
                request = VoiceRequest(mode="conversation")
                response = await voice_interaction(self.context, request)
                print(f"Conversation completed: {response.response_text}")
        
        except Exception as e:
            print(f"Voice error: {e}")
    
    async def _interactive_document(self):
        """Interactive document processing"""
        file_path = input("Enter document file path: ").strip()
        
        if not Path(file_path).exists():
            print("File not found")
            return
        
        print("Analysis types: extract, analyze, qa")
        analysis_type = input("Choose analysis type: ").strip()
        
        question = None
        if analysis_type == "qa":
            # Building an AI-Powered Personal Assistant: A Complete Guide

## Series Overview: The Ultimate AI Assistant Stack

In this comprehensive tutorial series, we'll build **"AlexAI"** - a sophisticated personal assistant that combines the power of local AI models, vector databases, memory systems, and multimodal capabilities. Our assistant will be able to:

- Process voice commands and respond with natural speech
- Remember conversations and learn from interactions
- Extract text from images and documents (OCR)
- Manage your calendar, emails, and tasks
- Search through your personal knowledge base
- Communicate with other AI agents using A2A protocols

### Why This Technology Stack?

**Scenario**: Imagine you're a busy professional who needs an AI assistant that:
- Works offline for privacy and speed
- Remembers your preferences and past conversations
- Can read documents, screenshots, and handwritten notes
- Integrates with your existing tools and workflows
- Shares information with other AI systems securely

### Technologies We'll Use and Why

1. **Ollama**: Local LLM hosting for privacy and speed
2. **MCP (Model Context Protocol)**: Standardized tool integration
3. **RAG (Retrieval Augmented Generation)**: Smart knowledge retrieval
4. **Memory Systems**: Persistent conversation history and learning
5. **Local Embeddings**: Fast, private vector search
6. **Multi-Model Support**: Different models for different tasks
7. **Voice-to-Text/Text-to-Voice**: Natural interaction
8. **OCR & Image Processing**: Visual document understanding
9. **A2A Protocols**: Agent-to-agent communication
10. **Pydantic AI**: Type-safe AI application development