Part 7: MCP (Model Context Protocol) Integration
Why MCP?
The Model Context Protocol standardizes how AI models interact with external tools and data sources:
- Standardized interface: Consistent way to connect tools across different AI systems
- Security: Controlled access to external resources
- Extensibility: Easy addition of new capabilities
- Interoperability: Tools work across different AI platforms
MCP Server Implementation
# mcp/mcp_server.py
from fastapi import FastAPI, HTTPException, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel, Field
from typing import List, Dict, Any, Optional, Union
import asyncio
import logging
from datetime import datetime
import json
# MCP Protocol Models
class MCPResource(BaseModel):
uri: str
name: str
description: Optional[str] = None
mime_type: Optional[str] = None
class MCPTool(BaseModel):
name: str
description: str
input_schema: Dict[str, Any]
class MCPPrompt(BaseModel):
name: str
description: str
arguments: Optional[List[Dict[str, Any]]] = None
class MCPRequest(BaseModel):
method: str
params: Optional[Dict[str, Any]] = None
class MCPResponse(BaseModel):
result: Optional[Any] = None
error: Optional[Dict[str, Any]] = None
class MCPServer:
def __init__(self):
self.app = FastAPI(title="AlexAI MCP Server", version="1.0.0")
self.security = HTTPBearer()
# Register available tools and resources
self.tools = {}
self.resources = {}
self.prompts = {}
# Setup routes
self._setup_routes()
# Initialize integrations
self._setup_integrations()
def _setup_integrations(self):
"""Initialize all system integrations"""
from models.ollama_manager import OllamaManager
from rag.vector_store import VectorStore, RAGGenerator
from memory.memory_manager import MemoryManager
from voice.speech_processor import VoiceInterface
from vision.ocr_processor import VisionLanguageProcessor
self.ollama_manager = OllamaManager()
self.vector_store = VectorStore()
self.rag_generator = RAGGenerator()
self.memory_manager = MemoryManager()
self.voice_interface = VoiceInterface()
self.vision_processor = VisionLanguageProcessor(self.ollama_manager)
# Register tools
self._register_tools()
def _register_tools(self):
"""Register all available tools"""
# Chat tool
self.tools['chat'] = MCPTool(
name="chat",
description="Have a conversation with the AI assistant",
input_schema={
"type": "object",
"properties": {
"message": {"type": "string", "description": "User message"},
"session_id": {"type": "string", "description": "Session identifier"},
"use_memory": {"type": "boolean", "default": True},
"use_rag": {"type": "boolean", "default": True}
},
"required": ["message"]
}
)
# Document processing tool
self.tools['process_document'] = MCPTool(
name="process_document",
description="Process and analyze documents using OCR and LLM",
input_schema={
"type": "object",
"properties": {
"file_path": {"type": "string", "description": "Path to document image"},
"analysis_type": {"type": "string", "enum": ["extract", "analyze", "qa"], "default": "analyze"},
"question": {"type": "string", "description": "Question for QA mode"}
},
"required": ["file_path"]
}
)
# Voice interaction tool
self.tools['voice_chat'] = MCPTool(
name="voice_chat",
description="Voice-based conversation with the assistant",
input_schema={
"type": "object",
"properties": {
"mode": {"type": "string", "enum": ["listen", "speak", "conversation"], "default": "conversation"},
"text": {"type": "string", "description": "Text to speak (for speak mode)"},
"timeout": {"type": "number", "default": 30}
}
}
)
# Memory management tool
self.tools['manage_memory'] = MCPTool(
name="manage_memory",
description="Manage conversation memory and user preferences",
input_schema={
"type": "object",
"properties": {
"action": {"type": "string", "enum": ["get_context", "get_preferences", "learn_preference"]},
"session_id": {"type": "string"},
"category": {"type": "string"},
"key": {"type": "string"},
"value": {"type": "string"},
"confidence": {"type": "number", "default": 0.5}
},
"required": ["action"]
}
)
# Knowledge base tool
self.tools['knowledge_base'] = MCPTool(
name="knowledge_base",
description="Search and manage knowledge base",
input_schema={
"type": "object",
"properties": {
"action": {"type": "string", "enum": ["search", "add", "update", "delete"]},
"query": {"type": "string", "description": "Search query or content"},
"collection": {"type": "string", "default": "documents"},
"metadata": {"type": "object", "description": "Document metadata"}
},
"required": ["action"]
}
)
def _setup_routes(self):
"""Setup FastAPI routes for MCP protocol"""
@self.app.get("/")
async def root():
return {"service": "AlexAI MCP Server", "version": "1.0.0", "status": "running"}
@self.app.post("/mcp/initialize")
async def initialize(credentials: HTTPAuthorizationCredentials = Depends(self.security)):
"""Initialize MCP session"""
return MCPResponse(result={
"protocol_version": "1.0",
"server_info": {
"name": "AlexAI",
"version": "1.0.0"
},
"capabilities": {
"tools": True,
"resources": True,
"prompts": True
}
})
@self.app.get("/mcp/tools")
async def list_tools():
"""List available tools"""
return MCPResponse(result={"tools": list(self.tools.values())})
@self.app.post("/mcp/tools/call")
async def call_tool(request: MCPRequest):
"""Call a specific tool"""
tool_name = request.params.get("name")
arguments = request.params.get("arguments", {})
if tool_name not in self.tools:
return MCPResponse(error={"code": -32601, "message": f"Tool {tool_name} not found"})
try:
result = await self._execute_tool(tool_name, arguments)
return MCPResponse(result=result)
except Exception as e:
logging.error(f"Tool execution failed: {e}")
return MCPResponse(error={"code": -32603, "message": str(e)})
@self.app.get("/mcp/resources")
async def list_resources():
"""List available resources"""
return MCPResponse(result={"resources": list(self.resources.values())})
@self.app.post("/mcp/resources/read")
async def read_resource(request: MCPRequest):
"""Read a specific resource"""
uri = request.params.get("uri")
try:
content = await self._read_resource(uri)
return MCPResponse(result={"contents": [{"uri": uri, "text": content}]})
except Exception as e:
return MCPResponse(error={"code": -32603, "message": str(e)})
async def _execute_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Any:
"""Execute a tool with given arguments"""
if tool_name == "chat":
return await self._handle_chat(arguments)
elif tool_name == "process_document":
return await self._handle_document_processing(arguments)
elif tool_name == "voice_chat":
return await self._handle_voice_chat(arguments)
elif tool_name == "manage_memory":
return await self._handle_memory_management(arguments)
elif tool_name == "knowledge_base":
return await self._handle_knowledge_base(arguments)
else:
raise ValueError(f"Unknown tool: {tool_name}")
async def _handle_chat(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Handle chat tool execution"""
message = args["message"]
session_id = args.get("session_id", "default")
use_memory = args.get("use_memory", True)
use_rag = args.get("use_rag", True)
# Get conversation context if memory is enabled
context = ""
if use_memory:
conversations = await self.memory_manager.get_conversation_context(
session_id=session_id,
query=message
)
if conversations:
context_parts = []
for conv in conversations[-3:]: # Last 3 relevant conversations
context_parts.append(f"User: {conv['user_message']}")
context_parts.append(f"Assistant: {conv['assistant_response']}")
context = "\n".join(context_parts)
# Generate response
if use_rag:
# Use RAG for enhanced responses
response = await self.rag_generator.generate_with_context(
query=message,
system_prompt=f"Previous conversation context:\n{context}" if context else None
)
else:
# Direct LLM response
prompt = f"{context}\n\nUser: {message}" if context else message
response = await self.ollama_manager.generate_response(
prompt=prompt,
system_prompt="You are AlexAI, a helpful personal assistant."
)
# Store conversation in memory
if use_memory:
await self.memory_manager.store_conversation(
session_id=session_id,
user_message=message,
assistant_response=response
)
return {
"response": response,
"session_id": session_id,
"used_memory": use_memory,
"used_rag": use_rag
}
async def _handle_document_processing(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Handle document processing tool"""
file_path = args["file_path"]
analysis_type = args.get("analysis_type", "analyze")
question = args.get("question")
if analysis_type == "extract":
# Simple OCR extraction
result = self.vision_processor.ocr.extract_text(file_path)
return {
"text": result["text"],
"confidence": result.get("confidence", 0),
"word_count": result.get("word_count", 0)
}
elif analysis_type == "analyze":
# Full document analysis
analysis = await self.vision_processor.analyze_document(file_path)
return analysis
elif analysis_type == "qa":
if not question:
raise ValueError("Question required for QA mode")
answer = await self.vision_processor.answer_document_questions(file_path, question)
return {
"question": question,
"answer": answer
}
else:
raise ValueError(f"Unknown analysis type: {analysis_type}")
async def _handle_voice_chat(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Handle voice chat tool"""
mode = args.get("mode", "conversation")
text = args.get("text")
timeout = args.get("timeout", 30)
if mode == "speak":
if not text:
raise ValueError("Text required for speak mode")
await self.voice_interface.tts.speak_async(text)
return {"status": "spoken", "text": text}
elif mode == "listen":
# Listen for wake word
detected = await self.voice_interface.listen_for_wake_word(timeout)
return {"wake_word_detected": detected}
elif mode == "conversation":
# Full voice conversation
async def chat_callback(user_text: str) -> str:
# Use chat tool to generate response
chat_result = await self._handle_chat({
"message": user_text,
"session_id": "voice_session",
"use_memory": True,
"use_rag": True
})
return chat_result["response"]
response = await self.voice_interface.voice_conversation(chat_callback)
return {"response": response}
else:
raise ValueError(f"Unknown voice mode: {mode}")
async def _handle_memory_management(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Handle memory management tool"""
action = args["action"]
if action == "get_context":
session_id = args.get("session_id", "default")
context = await self.memory_manager.get_conversation_context(session_id)
return {"context": context}
elif action == "get_preferences":
category = args.get("category")
preferences = self.memory_manager.get_user_preferences(category)
return {"preferences": preferences}
elif action == "learn_preference":
category = args["category"]
key = args["key"]
value = args["value"]
confidence = args.get("confidence", 0.5)
await self.memory_manager.learn_preference(category, key, value, confidence)
return {"status": "preference_learned"}
else:
raise ValueError(f"Unknown memory action: {action}")
async def _handle_knowledge_base(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Handle knowledge base tool"""
action = args["action"]
if action == "search":
query = args["query"]
collection = args.get("collection", "documents")
results = await self.vector_store.similarity_search(
query=query,
collection_name=collection
)
return {"results": results}
elif action == "add":
content = args["query"] # Using query field for content
collection = args.get("collection", "documents")
metadata = args.get("metadata", {})
doc_id = await self.vector_store.add_document(
content=content,
metadata=metadata,
collection_name=collection
)
return {"document_id": doc_id}
else:
raise ValueError(f"Unknown knowledge base action: {action}")
async def _read_resource(self, uri: str) -> str:
"""Read content from a resource URI"""
# Implement resource reading logic based on URI scheme
if uri.startswith("file://"):
file_path = uri[7:] # Remove file:// prefix
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
elif uri.startswith("memory://"):
# Read from memory system
# Implementation depends on specific memory resource format
return "Memory resource content"
else:
raise ValueError(f"Unsupported resource URI: {uri}")
# MCP Client for connecting to other services
class MCPClient:
def __init__(self, server_url: str, auth_token: Optional[str] = None):
self.server_url = server_url
self.auth_token = auth_token
self.session = None
async def initialize(self) -> Dict[str, Any]:
"""Initialize connection with MCP server"""
import aiohttp
headers = {}
if self.auth_token:
headers["Authorization"] = f"Bearer {self.auth_token}"
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.server_url}/mcp/initialize",
headers=headers
) as response:
result = await response.json()
return result
async def call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Call a tool on the MCP server"""
import aiohttp
request_data = {
"method": "tools/call",
"params": {
"name": tool_name,
"arguments": arguments
}
}
headers = {}
if self.auth_token:
headers["Authorization"] = f"Bearer {self.auth_token}"
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.server_url}/mcp/tools/call",
json=request_data,
headers=headers
) as response:
result = await response.json()
return result
async def list_tools(self) -> List[Dict[str, Any]]:
"""List available tools"""
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get(f"{self.server_url}/mcp/tools") as response:
result = await response.json()
return result.get("result", {}).get("tools", [])
# Usage example
async def demo_mcp_server():
"""Demonstrate MCP server functionality"""
# Start MCP server
server = MCPServer()
# Test chat tool
chat_result = await server._execute_tool("chat", {
"message": "Hello, how are you?",
"session_id": "demo_session"
})
print("Chat result:", chat_result)
# Test knowledge base
kb_result = await server._execute_tool("knowledge_base", {
"action": "add",
"query": "AlexAI is a privacy-focused personal assistant that runs locally.",
"metadata": {"category": "system_info"}
})
print("Knowledge base result:", kb_result)
# Search knowledge base
search_result = await server._execute_tool("knowledge_base", {
"action": "search",
"query": "privacy-focused assistant"
})
print("Search result:", search_result)
if __name__ == "__main__":
import uvicorn
# Create and run MCP server
server = MCPServer()
# Run with uvicorn
uvicorn.run(
server.app,
host="0.0.0.0",
port=8000,
log_level="info"
)
Why this MCP implementation?
- Standard compliance: Follows MCP protocol specifications
- Tool integration: Unified interface for all AI capabilities
- Security: Authentication and authorization support
- Async operations: Non-blocking tool execution
- Error handling: Proper error responses and logging
- Extensibility: Easy addition of new tools and resources