Part 7: MCP (Model Context Protocol) Integration

7 June 2025 · netologist · 8 min, 1553 words ·

Why MCP?

The Model Context Protocol standardizes how AI models interact with external tools and data sources:

MCP Server Implementation

# mcp/mcp_server.py
from fastapi import FastAPI, HTTPException, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel, Field
from typing import List, Dict, Any, Optional, Union
import asyncio
import logging
from datetime import datetime
import json

# MCP Protocol Models
class MCPResource(BaseModel):
    uri: str
    name: str
    description: Optional[str] = None
    mime_type: Optional[str] = None

class MCPTool(BaseModel):
    name: str
    description: str
    input_schema: Dict[str, Any]

class MCPPrompt(BaseModel):
    name: str
    description: str
    arguments: Optional[List[Dict[str, Any]]] = None

class MCPRequest(BaseModel):
    method: str
    params: Optional[Dict[str, Any]] = None

class MCPResponse(BaseModel):
    result: Optional[Any] = None
    error: Optional[Dict[str, Any]] = None

class MCPServer:
    def __init__(self):
        self.app = FastAPI(title="AlexAI MCP Server", version="1.0.0")
        self.security = HTTPBearer()
        
        # Register available tools and resources
        self.tools = {}
        self.resources = {}
        self.prompts = {}
        
        # Setup routes
        self._setup_routes()
        
        # Initialize integrations
        self._setup_integrations()
    
    def _setup_integrations(self):
        """Initialize all system integrations"""
        from models.ollama_manager import OllamaManager
        from rag.vector_store import VectorStore, RAGGenerator
        from memory.memory_manager import MemoryManager
        from voice.speech_processor import VoiceInterface
        from vision.ocr_processor import VisionLanguageProcessor
        
        self.ollama_manager = OllamaManager()
        self.vector_store = VectorStore()
        self.rag_generator = RAGGenerator()
        self.memory_manager = MemoryManager()
        self.voice_interface = VoiceInterface()
        self.vision_processor = VisionLanguageProcessor(self.ollama_manager)
        
        # Register tools
        self._register_tools()
    
    def _register_tools(self):
        """Register all available tools"""
        
        # Chat tool
        self.tools['chat'] = MCPTool(
            name="chat",
            description="Have a conversation with the AI assistant",
            input_schema={
                "type": "object",
                "properties": {
                    "message": {"type": "string", "description": "User message"},
                    "session_id": {"type": "string", "description": "Session identifier"},
                    "use_memory": {"type": "boolean", "default": True},
                    "use_rag": {"type": "boolean", "default": True}
                },
                "required": ["message"]
            }
        )
        
        # Document processing tool
        self.tools['process_document'] = MCPTool(
            name="process_document",
            description="Process and analyze documents using OCR and LLM",
            input_schema={
                "type": "object",
                "properties": {
                    "file_path": {"type": "string", "description": "Path to document image"},
                    "analysis_type": {"type": "string", "enum": ["extract", "analyze", "qa"], "default": "analyze"},
                    "question": {"type": "string", "description": "Question for QA mode"}
                },
                "required": ["file_path"]
            }
        )
        
        # Voice interaction tool
        self.tools['voice_chat'] = MCPTool(
            name="voice_chat",
            description="Voice-based conversation with the assistant",
            input_schema={
                "type": "object",
                "properties": {
                    "mode": {"type": "string", "enum": ["listen", "speak", "conversation"], "default": "conversation"},
                    "text": {"type": "string", "description": "Text to speak (for speak mode)"},
                    "timeout": {"type": "number", "default": 30}
                }
            }
        )
        
        # Memory management tool
        self.tools['manage_memory'] = MCPTool(
            name="manage_memory",
            description="Manage conversation memory and user preferences",
            input_schema={
                "type": "object",
                "properties": {
                    "action": {"type": "string", "enum": ["get_context", "get_preferences", "learn_preference"]},
                    "session_id": {"type": "string"},
                    "category": {"type": "string"},
                    "key": {"type": "string"},
                    "value": {"type": "string"},
                    "confidence": {"type": "number", "default": 0.5}
                },
                "required": ["action"]
            }
        )
        
        # Knowledge base tool
        self.tools['knowledge_base'] = MCPTool(
            name="knowledge_base",
            description="Search and manage knowledge base",
            input_schema={
                "type": "object",
                "properties": {
                    "action": {"type": "string", "enum": ["search", "add", "update", "delete"]},
                    "query": {"type": "string", "description": "Search query or content"},
                    "collection": {"type": "string", "default": "documents"},
                    "metadata": {"type": "object", "description": "Document metadata"}
                },
                "required": ["action"]
            }
        )
    
    def _setup_routes(self):
        """Setup FastAPI routes for MCP protocol"""
        
        @self.app.get("/")
        async def root():
            return {"service": "AlexAI MCP Server", "version": "1.0.0", "status": "running"}
        
        @self.app.post("/mcp/initialize")
        async def initialize(credentials: HTTPAuthorizationCredentials = Depends(self.security)):
            """Initialize MCP session"""
            return MCPResponse(result={
                "protocol_version": "1.0",
                "server_info": {
                    "name": "AlexAI",
                    "version": "1.0.0"
                },
                "capabilities": {
                    "tools": True,
                    "resources": True,
                    "prompts": True
                }
            })
        
        @self.app.get("/mcp/tools")
        async def list_tools():
            """List available tools"""
            return MCPResponse(result={"tools": list(self.tools.values())})
        
        @self.app.post("/mcp/tools/call")
        async def call_tool(request: MCPRequest):
            """Call a specific tool"""
            tool_name = request.params.get("name")
            arguments = request.params.get("arguments", {})
            
            if tool_name not in self.tools:
                return MCPResponse(error={"code": -32601, "message": f"Tool {tool_name} not found"})
            
            try:
                result = await self._execute_tool(tool_name, arguments)
                return MCPResponse(result=result)
            except Exception as e:
                logging.error(f"Tool execution failed: {e}")
                return MCPResponse(error={"code": -32603, "message": str(e)})
        
        @self.app.get("/mcp/resources")
        async def list_resources():
            """List available resources"""
            return MCPResponse(result={"resources": list(self.resources.values())})
        
        @self.app.post("/mcp/resources/read")
        async def read_resource(request: MCPRequest):
            """Read a specific resource"""
            uri = request.params.get("uri")
            
            try:
                content = await self._read_resource(uri)
                return MCPResponse(result={"contents": [{"uri": uri, "text": content}]})
            except Exception as e:
                return MCPResponse(error={"code": -32603, "message": str(e)})
    
    async def _execute_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Any:
        """Execute a tool with given arguments"""
        
        if tool_name == "chat":
            return await self._handle_chat(arguments)
        elif tool_name == "process_document":
            return await self._handle_document_processing(arguments)
        elif tool_name == "voice_chat":
            return await self._handle_voice_chat(arguments)
        elif tool_name == "manage_memory":
            return await self._handle_memory_management(arguments)
        elif tool_name == "knowledge_base":
            return await self._handle_knowledge_base(arguments)
        else:
            raise ValueError(f"Unknown tool: {tool_name}")
    
    async def _handle_chat(self, args: Dict[str, Any]) -> Dict[str, Any]:
        """Handle chat tool execution"""
        message = args["message"]
        session_id = args.get("session_id", "default")
        use_memory = args.get("use_memory", True)
        use_rag = args.get("use_rag", True)
        
        # Get conversation context if memory is enabled
        context = ""
        if use_memory:
            conversations = await self.memory_manager.get_conversation_context(
                session_id=session_id,
                query=message
            )
            if conversations:
                context_parts = []
                for conv in conversations[-3:]:  # Last 3 relevant conversations
                    context_parts.append(f"User: {conv['user_message']}")
                    context_parts.append(f"Assistant: {conv['assistant_response']}")
                context = "\n".join(context_parts)
        
        # Generate response
        if use_rag:
            # Use RAG for enhanced responses
            response = await self.rag_generator.generate_with_context(
                query=message,
                system_prompt=f"Previous conversation context:\n{context}" if context else None
            )
        else:
            # Direct LLM response
            prompt = f"{context}\n\nUser: {message}" if context else message
            response = await self.ollama_manager.generate_response(
                prompt=prompt,
                system_prompt="You are AlexAI, a helpful personal assistant."
            )
        
        # Store conversation in memory
        if use_memory:
            await self.memory_manager.store_conversation(
                session_id=session_id,
                user_message=message,
                assistant_response=response
            )
        
        return {
            "response": response,
            "session_id": session_id,
            "used_memory": use_memory,
            "used_rag": use_rag
        }
    
    async def _handle_document_processing(self, args: Dict[str, Any]) -> Dict[str, Any]:
        """Handle document processing tool"""
        file_path = args["file_path"]
        analysis_type = args.get("analysis_type", "analyze")
        question = args.get("question")
        
        if analysis_type == "extract":
            # Simple OCR extraction
            result = self.vision_processor.ocr.extract_text(file_path)
            return {
                "text": result["text"],
                "confidence": result.get("confidence", 0),
                "word_count": result.get("word_count", 0)
            }
        
        elif analysis_type == "analyze":
            # Full document analysis
            analysis = await self.vision_processor.analyze_document(file_path)
            return analysis
        
        elif analysis_type == "qa":
            if not question:
                raise ValueError("Question required for QA mode")
            
            answer = await self.vision_processor.answer_document_questions(file_path, question)
            return {
                "question": question,
                "answer": answer
            }
        
        else:
            raise ValueError(f"Unknown analysis type: {analysis_type}")
    
    async def _handle_voice_chat(self, args: Dict[str, Any]) -> Dict[str, Any]:
        """Handle voice chat tool"""
        mode = args.get("mode", "conversation")
        text = args.get("text")
        timeout = args.get("timeout", 30)
        
        if mode == "speak":
            if not text:
                raise ValueError("Text required for speak mode")
            
            await self.voice_interface.tts.speak_async(text)
            return {"status": "spoken", "text": text}
        
        elif mode == "listen":
            # Listen for wake word
            detected = await self.voice_interface.listen_for_wake_word(timeout)
            return {"wake_word_detected": detected}
        
        elif mode == "conversation":
            # Full voice conversation
            async def chat_callback(user_text: str) -> str:
                # Use chat tool to generate response
                chat_result = await self._handle_chat({
                    "message": user_text,
                    "session_id": "voice_session",
                    "use_memory": True,
                    "use_rag": True
                })
                return chat_result["response"]
            
            response = await self.voice_interface.voice_conversation(chat_callback)
            return {"response": response}
        
        else:
            raise ValueError(f"Unknown voice mode: {mode}")
    
    async def _handle_memory_management(self, args: Dict[str, Any]) -> Dict[str, Any]:
        """Handle memory management tool"""
        action = args["action"]
        
        if action == "get_context":
            session_id = args.get("session_id", "default")
            context = await self.memory_manager.get_conversation_context(session_id)
            return {"context": context}
        
        elif action == "get_preferences":
            category = args.get("category")
            preferences = self.memory_manager.get_user_preferences(category)
            return {"preferences": preferences}
        
        elif action == "learn_preference":
            category = args["category"]
            key = args["key"]
            value = args["value"]
            confidence = args.get("confidence", 0.5)
            
            await self.memory_manager.learn_preference(category, key, value, confidence)
            return {"status": "preference_learned"}
        
        else:
            raise ValueError(f"Unknown memory action: {action}")
    
    async def _handle_knowledge_base(self, args: Dict[str, Any]) -> Dict[str, Any]:
        """Handle knowledge base tool"""
        action = args["action"]
        
        if action == "search":
            query = args["query"]
            collection = args.get("collection", "documents")
            
            results = await self.vector_store.similarity_search(
                query=query,
                collection_name=collection
            )
            return {"results": results}
        
        elif action == "add":
            content = args["query"]  # Using query field for content
            collection = args.get("collection", "documents")
            metadata = args.get("metadata", {})
            
            doc_id = await self.vector_store.add_document(
                content=content,
                metadata=metadata,
                collection_name=collection
            )
            return {"document_id": doc_id}
        
        else:
            raise ValueError(f"Unknown knowledge base action: {action}")
    
    async def _read_resource(self, uri: str) -> str:
        """Read content from a resource URI"""
        # Implement resource reading logic based on URI scheme
        if uri.startswith("file://"):
            file_path = uri[7:]  # Remove file:// prefix
            with open(file_path, 'r', encoding='utf-8') as f:
                return f.read()
        elif uri.startswith("memory://"):
            # Read from memory system
            # Implementation depends on specific memory resource format
            return "Memory resource content"
        else:
            raise ValueError(f"Unsupported resource URI: {uri}")

# MCP Client for connecting to other services
class MCPClient:
    def __init__(self, server_url: str, auth_token: Optional[str] = None):
        self.server_url = server_url
        self.auth_token = auth_token
        self.session = None
    
    async def initialize(self) -> Dict[str, Any]:
        """Initialize connection with MCP server"""
        import aiohttp
        
        headers = {}
        if self.auth_token:
            headers["Authorization"] = f"Bearer {self.auth_token}"
        
        async with aiohttp.ClientSession() as session:
            async with session.post(
                f"{self.server_url}/mcp/initialize",
                headers=headers
            ) as response:
                result = await response.json()
                return result
    
    async def call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
        """Call a tool on the MCP server"""
        import aiohttp
        
        request_data = {
            "method": "tools/call",
            "params": {
                "name": tool_name,
                "arguments": arguments
            }
        }
        
        headers = {}
        if self.auth_token:
            headers["Authorization"] = f"Bearer {self.auth_token}"
        
        async with aiohttp.ClientSession() as session:
            async with session.post(
                f"{self.server_url}/mcp/tools/call",
                json=request_data,
                headers=headers
            ) as response:
                result = await response.json()
                return result
    
    async def list_tools(self) -> List[Dict[str, Any]]:
        """List available tools"""
        import aiohttp
        
        async with aiohttp.ClientSession() as session:
            async with session.get(f"{self.server_url}/mcp/tools") as response:
                result = await response.json()
                return result.get("result", {}).get("tools", [])

# Usage example
async def demo_mcp_server():
    """Demonstrate MCP server functionality"""
    
    # Start MCP server
    server = MCPServer()
    
    # Test chat tool
    chat_result = await server._execute_tool("chat", {
        "message": "Hello, how are you?",
        "session_id": "demo_session"
    })
    print("Chat result:", chat_result)
    
    # Test knowledge base
    kb_result = await server._execute_tool("knowledge_base", {
        "action": "add",
        "query": "AlexAI is a privacy-focused personal assistant that runs locally.",
        "metadata": {"category": "system_info"}
    })
    print("Knowledge base result:", kb_result)
    
    # Search knowledge base
    search_result = await server._execute_tool("knowledge_base", {
        "action": "search",
        "query": "privacy-focused assistant"
    })
    print("Search result:", search_result)

if __name__ == "__main__":
    import uvicorn
    
    # Create and run MCP server
    server = MCPServer()
    
    # Run with uvicorn
    uvicorn.run(
        server.app,
        host="0.0.0.0",
        port=8000,
        log_level="info"
    )

Why this MCP implementation?