Part 5: Voice Processing - Natural Interaction
Why Voice Processing?
Voice interaction makes AI assistants more natural and accessible:
- Hands-free operation: Use while driving, cooking, or working
- Natural communication: More intuitive than typing
- Accessibility: Helps users with visual or mobility impairments
- Multitasking: Continue other activities while interacting
Speech-to-Text Implementation
# voice/speech_processor.py
import whisper
import pyaudio
import wave
import tempfile
import asyncio
import logging
from typing import Optional, Callable
import threading
import queue
import numpy as np
from config import config
class SpeechToTextProcessor:
def __init__(self):
# Load Whisper model
self.model = whisper.load_model(config.STT_MODEL)
# Audio configuration
self.chunk_size = 1024
self.format = pyaudio.paInt16
self.channels = 1
self.rate = 16000 # Whisper works best with 16kHz
# Initialize PyAudio
self.audio = pyaudio.PyAudio()
# Recording state
self.is_recording = False
self.audio_queue = queue.Queue()
def _record_audio(self, duration: Optional[float] = None) -> bytes:
"""Record audio from microphone"""
stream = self.audio.open(
format=self.format,
channels=self.channels,
rate=self.rate,
input=True,
frames_per_buffer=self.chunk_size
)
frames = []
if duration:
# Record for specified duration
for _ in range(0, int(self.rate / self.chunk_size * duration)):
data = stream.read(self.chunk_size)
frames.append(data)
else:
# Record until stopped
self.is_recording = True
while self.is_recording:
try:
data = stream.read(self.chunk_size, exception_on_overflow=False)
frames.append(data)
except Exception as e:
logging.warning(f"Audio recording error: {e}")
break
stream.stop_stream()
stream.close()
return b''.join(frames)
def start_recording(self):
"""Start continuous recording in background thread"""
if self.is_recording:
return
def record_thread():
audio_data = self._record_audio()
self.audio_queue.put(audio_data)
thread = threading.Thread(target=record_thread)
thread.daemon = True
thread.start()
def stop_recording(self) -> Optional[bytes]:
"""Stop recording and return audio data"""
if not self.is_recording:
return None
self.is_recording = False
try:
# Get recorded audio data
return self.audio_queue.get(timeout=1.0)
except queue.Empty:
return None
async def transcribe_audio(self, audio_data: bytes) -> str:
"""Transcribe audio data to text"""
# Save audio data to temporary file
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as temp_file:
# Convert raw audio to WAV format
with wave.open(temp_file.name, 'wb') as wav_file:
wav_file.setnchannels(self.channels)
wav_file.setsampwidth(self.audio.get_sample_size(self.format))
wav_file.setframerate(self.rate)
wav_file.writeframes(audio_data)
try:
# Transcribe using Whisper
result = self.model.transcribe(temp_file.name)
return result['text'].strip()
except Exception as e:
logging.error(f"Transcription failed: {e}")
return ""
finally:
# Clean up temporary file
import os
try:
os.unlink(temp_file.name)
except:
pass
async def transcribe_file(self, file_path: str) -> str:
"""Transcribe audio file to text"""
try:
result = self.model.transcribe(file_path)
return result['text'].strip()
except Exception as e:
logging.error(f"File transcription failed: {e}")
return ""
def cleanup(self):
"""Clean up audio resources"""
if hasattr(self, 'audio'):
self.audio.terminate()
class VoiceActivityDetector:
"""Simple voice activity detection"""
def __init__(self, threshold: float = 0.01, min_duration: float = 0.5):
self.threshold = threshold
self.min_duration = min_duration
def detect_speech(self, audio_data: bytes, sample_rate: int = 16000) -> bool:
"""Detect if audio contains speech"""
# Convert bytes to numpy array
audio_array = np.frombuffer(audio_data, dtype=np.int16)
# Normalize audio
audio_normalized = audio_array.astype(np.float32) / 32768.0
# Calculate RMS energy
rms_energy = np.sqrt(np.mean(audio_normalized ** 2))
# Check if energy exceeds threshold
return rms_energy > self.threshold
# Usage example
async def demo_speech_to_text():
stt = SpeechToTextProcessor()
vad = VoiceActivityDetector()
print("Starting voice recording... Press Enter to stop")
# Start recording
stt.start_recording()
# Wait for user input to stop
input()
# Stop recording and get audio
audio_data = stt.stop_recording()
if audio_data and vad.detect_speech(audio_data):
print("Transcribing...")
text = await stt.transcribe_audio(audio_data)
print(f"Transcription: {text}")
else:
print("No speech detected")
stt.cleanup()
Text-to-Speech Implementation
# voice/text_to_speech.py
import pyttsx3
import asyncio
import tempfile
import threading
from typing import Optional, Dict, Any
import logging
from config import config
class TextToSpeechProcessor:
def __init__(self):
# Initialize pyttsx3 engine
self.engine = pyttsx3.init()
# Configure voice settings
self._configure_voice()
# Speech queue for async processing
self.speech_queue = asyncio.Queue()
self.is_speaking = False
def _configure_voice(self):
"""Configure TTS engine settings"""
# Set speech rate
self.engine.setProperty('rate', config.TTS_VOICE_RATE)
# Set volume (0.0 to 1.0)
self.engine.setProperty('volume', 0.9)
# Get available voices and set preferred voice
voices = self.engine.getProperty('voices')
if voices:
# Prefer female voice if available
for voice in voices:
if 'female' in voice.name.lower() or 'zira' in voice.name.lower():
self.engine.setProperty('voice', voice.id)
break
else:
# Use first available voice
self.engine.setProperty('voice', voices[0].id)
def get_available_voices(self) -> list[Dict[str, Any]]:
"""Get list of available voices"""
voices = self.engine.getProperty('voices')
return [{
'id': voice.id,
'name': voice.name,
'languages': getattr(voice, 'languages', []),
'gender': getattr(voice, 'gender', 'unknown')
} for voice in voices] if voices else []
def set_voice(self, voice_id: str) -> bool:
"""Set specific voice by ID"""
try:
self.engine.setProperty('voice', voice_id)
return True
except Exception as e:
logging.error(f"Failed to set voice {voice_id}: {e}")
return False
def speak_sync(self, text: str):
"""Speak text synchronously"""
try:
self.engine.say(text)
self.engine.runAndWait()
except Exception as e:
logging.error(f"TTS sync error: {e}")
async def speak_async(self, text: str):
"""Speak text asynchronously"""
def speak_thread():
try:
self.engine.say(text)
self.engine.runAndWait()
except Exception as e:
logging.error(f"TTS async error: {e}")
finally:
self.is_speaking = False
if self.is_speaking:
# Queue the speech request
await self.speech_queue.put(text)
else:
self.is_speaking = True
# Start speaking in background thread
thread = threading.Thread(target=speak_thread)
thread.daemon = True
thread.start()
# Process queued speech requests
asyncio.create_task(self._process_speech_queue())
async def _process_speech_queue(self):
"""Process queued speech requests"""
while not self.speech_queue.empty():
try:
text = await asyncio.wait_for(self.speech_queue.get(), timeout=0.1)
await asyncio.sleep(0.1) # Small delay between speeches
await self.speak_async(text)
except asyncio.TimeoutError:
break
def save_to_file(self, text: str, filename: str) -> bool:
"""Save speech to audio file"""
try:
self.engine.save_to_file(text, filename)
self.engine.runAndWait()
return True
except Exception as e:
logging.error(f"Failed to save TTS to file: {e}")
return False
def stop_speaking(self):
"""Stop current speech"""
try:
self.engine.stop()
self.is_speaking = False
except Exception as e:
logging.error(f"Failed to stop TTS: {e}")
# Combined Voice Interface
class VoiceInterface:
"""Combined speech-to-text and text-to-speech interface"""
def __init__(self):
self.stt = SpeechToTextProcessor()
self.tts = TextToSpeechProcessor()
self.vad = VoiceActivityDetector()
# Conversation state
self.listening = False
self.wake_words = ['alex', 'alexa', 'assistant']
async def listen_for_wake_word(self, timeout: float = 30.0) -> bool:
"""Listen for wake word activation"""
print("Listening for wake word...")
start_time = asyncio.get_event_loop().time()
while (asyncio.get_event_loop().time() - start_time) < timeout:
# Record short audio snippet
audio_data = self.stt._record_audio(duration=2.0)
if self.vad.detect_speech(audio_data):
# Transcribe and check for wake word
text = await self.stt.transcribe_audio(audio_data)
text_lower = text.lower()
for wake_word in self.wake_words:
if wake_word in text_lower:
await self.tts.speak_async("Yes, I'm listening.")
return True
await asyncio.sleep(0.1)
return False
async def voice_conversation(self, callback: Callable[[str], str]) -> str:
"""Conduct a voice conversation"""
await self.tts.speak_async("I'm ready. Please speak your message.")
# Start recording
self.stt.start_recording()
# Wait for silence or timeout
await asyncio.sleep(5.0) # Max 5 seconds of recording
# Stop recording
audio_data = self.stt.stop_recording()
if audio_data and self.vad.detect_speech(audio_data):
# Transcribe user input
user_text = await self.stt.transcribe_audio(audio_data)
if user_text:
print(f"User said: {user_text}")
# Get response from callback
response = await callback(user_text)
# Speak response
await self.tts.speak_async(response)
return response
else:
await self.tts.speak_async("I didn't catch that. Could you repeat?")
return ""
else:
await self.tts.speak_async("I didn't hear anything. Please try again.")
return ""
def cleanup(self):
"""Clean up voice interface resources"""
self.stt.cleanup()
# Usage example
async def demo_voice_interface():
voice = VoiceInterface()
# Define response callback
async def respond_to_user(text: str) -> str:
# Simple echo response
return f"You said: {text}. How can I help you with that?"
# Wait for wake word
if await voice.listen_for_wake_word():
# Conduct conversation
response = await voice.voice_conversation(respond_to_user)
print(f"Assistant responded: {response}")
voice.cleanup()
if __name__ == "__main__":
asyncio.run(demo_voice_interface())
Why this voice processing architecture?
- Local processing: Whisper runs locally for privacy
- Async operations: Non-blocking audio processing
- Voice activity detection: Reduces unnecessary processing
- Wake word activation: Natural activation method
- Queue management: Handles multiple speech requests
- Error handling: Graceful degradation on audio issues