Part 5: Voice Processing - Natural Interaction

5 June 2025 · netologist · 6 min, 1126 words ·

Why Voice Processing?

Voice interaction makes AI assistants more natural and accessible:

Speech-to-Text Implementation

# voice/speech_processor.py
import whisper
import pyaudio
import wave
import tempfile
import asyncio
import logging
from typing import Optional, Callable
import threading
import queue
import numpy as np
from config import config

class SpeechToTextProcessor:
    def __init__(self):
        # Load Whisper model
        self.model = whisper.load_model(config.STT_MODEL)
        
        # Audio configuration
        self.chunk_size = 1024
        self.format = pyaudio.paInt16
        self.channels = 1
        self.rate = 16000  # Whisper works best with 16kHz
        
        # Initialize PyAudio
        self.audio = pyaudio.PyAudio()
        
        # Recording state
        self.is_recording = False
        self.audio_queue = queue.Queue()
        
    def _record_audio(self, duration: Optional[float] = None) -> bytes:
        """Record audio from microphone"""
        
        stream = self.audio.open(
            format=self.format,
            channels=self.channels,
            rate=self.rate,
            input=True,
            frames_per_buffer=self.chunk_size
        )
        
        frames = []
        
        if duration:
            # Record for specified duration
            for _ in range(0, int(self.rate / self.chunk_size * duration)):
                data = stream.read(self.chunk_size)
                frames.append(data)
        else:
            # Record until stopped
            self.is_recording = True
            while self.is_recording:
                try:
                    data = stream.read(self.chunk_size, exception_on_overflow=False)
                    frames.append(data)
                except Exception as e:
                    logging.warning(f"Audio recording error: {e}")
                    break
        
        stream.stop_stream()
        stream.close()
        
        return b''.join(frames)
    
    def start_recording(self):
        """Start continuous recording in background thread"""
        if self.is_recording:
            return
        
        def record_thread():
            audio_data = self._record_audio()
            self.audio_queue.put(audio_data)
        
        thread = threading.Thread(target=record_thread)
        thread.daemon = True
        thread.start()
    
    def stop_recording(self) -> Optional[bytes]:
        """Stop recording and return audio data"""
        if not self.is_recording:
            return None
        
        self.is_recording = False
        
        try:
            # Get recorded audio data
            return self.audio_queue.get(timeout=1.0)
        except queue.Empty:
            return None
    
    async def transcribe_audio(self, audio_data: bytes) -> str:
        """Transcribe audio data to text"""
        
        # Save audio data to temporary file
        with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as temp_file:
            # Convert raw audio to WAV format
            with wave.open(temp_file.name, 'wb') as wav_file:
                wav_file.setnchannels(self.channels)
                wav_file.setsampwidth(self.audio.get_sample_size(self.format))
                wav_file.setframerate(self.rate)
                wav_file.writeframes(audio_data)
            
            try:
                # Transcribe using Whisper
                result = self.model.transcribe(temp_file.name)
                return result['text'].strip()
            
            except Exception as e:
                logging.error(f"Transcription failed: {e}")
                return ""
            
            finally:
                # Clean up temporary file
                import os
                try:
                    os.unlink(temp_file.name)
                except:
                    pass
    
    async def transcribe_file(self, file_path: str) -> str:
        """Transcribe audio file to text"""
        try:
            result = self.model.transcribe(file_path)
            return result['text'].strip()
        except Exception as e:
            logging.error(f"File transcription failed: {e}")
            return ""
    
    def cleanup(self):
        """Clean up audio resources"""
        if hasattr(self, 'audio'):
            self.audio.terminate()

class VoiceActivityDetector:
    """Simple voice activity detection"""
    
    def __init__(self, threshold: float = 0.01, min_duration: float = 0.5):
        self.threshold = threshold
        self.min_duration = min_duration
        
    def detect_speech(self, audio_data: bytes, sample_rate: int = 16000) -> bool:
        """Detect if audio contains speech"""
        
        # Convert bytes to numpy array
        audio_array = np.frombuffer(audio_data, dtype=np.int16)
        
        # Normalize audio
        audio_normalized = audio_array.astype(np.float32) / 32768.0
        
        # Calculate RMS energy
        rms_energy = np.sqrt(np.mean(audio_normalized ** 2))
        
        # Check if energy exceeds threshold
        return rms_energy > self.threshold

# Usage example
async def demo_speech_to_text():
    stt = SpeechToTextProcessor()
    vad = VoiceActivityDetector()
    
    print("Starting voice recording... Press Enter to stop")
    
    # Start recording
    stt.start_recording()
    
    # Wait for user input to stop
    input()
    
    # Stop recording and get audio
    audio_data = stt.stop_recording()
    
    if audio_data and vad.detect_speech(audio_data):
        print("Transcribing...")
        text = await stt.transcribe_audio(audio_data)
        print(f"Transcription: {text}")
    else:
        print("No speech detected")
    
    stt.cleanup()

Text-to-Speech Implementation

# voice/text_to_speech.py
import pyttsx3
import asyncio
import tempfile
import threading
from typing import Optional, Dict, Any
import logging
from config import config

class TextToSpeechProcessor:
    def __init__(self):
        # Initialize pyttsx3 engine
        self.engine = pyttsx3.init()
        
        # Configure voice settings
        self._configure_voice()
        
        # Speech queue for async processing
        self.speech_queue = asyncio.Queue()
        self.is_speaking = False
        
    def _configure_voice(self):
        """Configure TTS engine settings"""
        
        # Set speech rate
        self.engine.setProperty('rate', config.TTS_VOICE_RATE)
        
        # Set volume (0.0 to 1.0)
        self.engine.setProperty('volume', 0.9)
        
        # Get available voices and set preferred voice
        voices = self.engine.getProperty('voices')
        if voices:
            # Prefer female voice if available
            for voice in voices:
                if 'female' in voice.name.lower() or 'zira' in voice.name.lower():
                    self.engine.setProperty('voice', voice.id)
                    break
            else:
                # Use first available voice
                self.engine.setProperty('voice', voices[0].id)
    
    def get_available_voices(self) -> list[Dict[str, Any]]:
        """Get list of available voices"""
        voices = self.engine.getProperty('voices')
        
        return [{
            'id': voice.id,
            'name': voice.name,
            'languages': getattr(voice, 'languages', []),
            'gender': getattr(voice, 'gender', 'unknown')
        } for voice in voices] if voices else []
    
    def set_voice(self, voice_id: str) -> bool:
        """Set specific voice by ID"""
        try:
            self.engine.setProperty('voice', voice_id)
            return True
        except Exception as e:
            logging.error(f"Failed to set voice {voice_id}: {e}")
            return False
    
    def speak_sync(self, text: str):
        """Speak text synchronously"""
        try:
            self.engine.say(text)
            self.engine.runAndWait()
        except Exception as e:
            logging.error(f"TTS sync error: {e}")
    
    async def speak_async(self, text: str):
        """Speak text asynchronously"""
        
        def speak_thread():
            try:
                self.engine.say(text)
                self.engine.runAndWait()
            except Exception as e:
                logging.error(f"TTS async error: {e}")
            finally:
                self.is_speaking = False
        
        if self.is_speaking:
            # Queue the speech request
            await self.speech_queue.put(text)
        else:
            self.is_speaking = True
            
            # Start speaking in background thread
            thread = threading.Thread(target=speak_thread)
            thread.daemon = True
            thread.start()
            
            # Process queued speech requests
            asyncio.create_task(self._process_speech_queue())
    
    async def _process_speech_queue(self):
        """Process queued speech requests"""
        while not self.speech_queue.empty():
            try:
                text = await asyncio.wait_for(self.speech_queue.get(), timeout=0.1)
                await asyncio.sleep(0.1)  # Small delay between speeches
                await self.speak_async(text)
            except asyncio.TimeoutError:
                break
    
    def save_to_file(self, text: str, filename: str) -> bool:
        """Save speech to audio file"""
        try:
            self.engine.save_to_file(text, filename)
            self.engine.runAndWait()
            return True
        except Exception as e:
            logging.error(f"Failed to save TTS to file: {e}")
            return False
    
    def stop_speaking(self):
        """Stop current speech"""
        try:
            self.engine.stop()
            self.is_speaking = False
        except Exception as e:
            logging.error(f"Failed to stop TTS: {e}")

# Combined Voice Interface
class VoiceInterface:
    """Combined speech-to-text and text-to-speech interface"""
    
    def __init__(self):
        self.stt = SpeechToTextProcessor()
        self.tts = TextToSpeechProcessor()
        self.vad = VoiceActivityDetector()
        
        # Conversation state
        self.listening = False
        self.wake_words = ['alex', 'alexa', 'assistant']
    
    async def listen_for_wake_word(self, timeout: float = 30.0) -> bool:
        """Listen for wake word activation"""
        
        print("Listening for wake word...")
        
        start_time = asyncio.get_event_loop().time()
        
        while (asyncio.get_event_loop().time() - start_time) < timeout:
            # Record short audio snippet
            audio_data = self.stt._record_audio(duration=2.0)
            
            if self.vad.detect_speech(audio_data):
                # Transcribe and check for wake word
                text = await self.stt.transcribe_audio(audio_data)
                text_lower = text.lower()
                
                for wake_word in self.wake_words:
                    if wake_word in text_lower:
                        await self.tts.speak_async("Yes, I'm listening.")
                        return True
            
            await asyncio.sleep(0.1)
        
        return False
    
    async def voice_conversation(self, callback: Callable[[str], str]) -> str:
        """Conduct a voice conversation"""
        
        await self.tts.speak_async("I'm ready. Please speak your message.")
        
        # Start recording
        self.stt.start_recording()
        
        # Wait for silence or timeout
        await asyncio.sleep(5.0)  # Max 5 seconds of recording
        
        # Stop recording
        audio_data = self.stt.stop_recording()
        
        if audio_data and self.vad.detect_speech(audio_data):
            # Transcribe user input
            user_text = await self.stt.transcribe_audio(audio_data)
            
            if user_text:
                print(f"User said: {user_text}")
                
                # Get response from callback
                response = await callback(user_text)
                
                # Speak response
                await self.tts.speak_async(response)
                
                return response
            else:
                await self.tts.speak_async("I didn't catch that. Could you repeat?")
                return ""
        else:
            await self.tts.speak_async("I didn't hear anything. Please try again.")
            return ""
    
    def cleanup(self):
        """Clean up voice interface resources"""
        self.stt.cleanup()

# Usage example
async def demo_voice_interface():
    voice = VoiceInterface()
    
    # Define response callback
    async def respond_to_user(text: str) -> str:
        # Simple echo response
        return f"You said: {text}. How can I help you with that?"
    
    # Wait for wake word
    if await voice.listen_for_wake_word():
        # Conduct conversation
        response = await voice.voice_conversation(respond_to_user)
        print(f"Assistant responded: {response}")
    
    voice.cleanup()

if __name__ == "__main__":
    asyncio.run(demo_voice_interface())

Why this voice processing architecture?