Building a Voice RAG Agent with Real-Time Audio
Create a voice-enabled RAG agent using CartesiaAI, AssemblyAI, LlamaIndex, and Livekit. Build conversational AI with real-time speech interaction.
Building a Voice RAG Agent with Real-Time Audio
Text-based RAG systems are powerful, but voice interaction opens new possibilities. Users can ask questions naturally, receive spoken answers, and have conversations without touching a keyboard. This is essential for hands-free scenarios, accessibility, and more natural human-AI interaction.
This tutorial builds a complete voice RAG agent. We combine CartesiaAI for high-quality text-to-speech, AssemblyAI for speech recognition, LlamaIndex for RAG, and Livekit for real-time audio streaming. The result is a conversational agent that listens, understands, retrieves relevant information, and speaks responses.
Architecture Overview
Component Roles
Our voice RAG system has four main layers:
1. Audio Transport (Livekit) Real-time audio streaming infrastructure:
- WebRTC-based communication
- Low latency audio transmission
- Cross-platform client support
2. Speech Recognition (AssemblyAI) Convert spoken words to text:
- Real-time transcription
- Speaker diarization
- Punctuation and formatting
3. RAG Engine (LlamaIndex) Information retrieval and synthesis:
- Document indexing
- Semantic search
- Response generation
4. Text-to-Speech (CartesiaAI) Convert text responses to natural speech:
- High-quality voice synthesis
- Streaming audio generation
- Multiple voice options
Data Flow
User speaks → Livekit captures audio → AssemblyAI transcribes →
LlamaIndex retrieves and generates → CartesiaAI synthesizes speech →
Livekit streams audio → User hears response
The key challenge is keeping this loop fast enough for natural conversation. Latency at each stage accumulates, so we need streaming at every step.
Project Setup
Project Structure
voice-rag/
├── agent/
│ ├── __init__.py
│ ├── voice_agent.py
│ ├── speech_to_text.py
│ ├── text_to_speech.py
│ └── rag_engine.py
├── livekit/
│ ├── __init__.py
│ ├── room_manager.py
│ └── audio_handler.py
├── documents/
│ └── (your knowledge base documents)
├── config/
│ └── settings.py
├── main.py
├── requirements.txt
└── README.md
Dependencies
# requirements.txt
livekit>=0.10.0
livekit-agents>=0.7.0
assemblyai>=0.20.0
cartesia>=0.2.0
llama-index>=0.10.0
llama-index-embeddings-openai>=0.1.0
llama-index-llms-openai>=0.1.0
openai>=1.0.0
numpy>=1.24.0
python-dotenv>=1.0.0
Configuration
# config/settings.py
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
# API Keys
livekit_api_key: str
livekit_api_secret: str
livekit_url: str
assemblyai_api_key: str
cartesia_api_key: str
openai_api_key: str
# Voice Settings
cartesia_voice_id: str = "a0e99841-438c-4a64-b679-ae501e7d6091" # Default voice
sample_rate: int = 24000
# RAG Settings
chunk_size: int = 512
similarity_top_k: int = 3
# Agent Settings
system_prompt: str = """You are a helpful voice assistant.
Keep responses concise and conversational.
Speak naturally as if in a conversation."""
class Config:
env_file = ".env"
settings = Settings()
Building Speech Components
Speech-to-Text with AssemblyAI
Real-time transcription of user speech:
# agent/speech_to_text.py
import assemblyai as aai
from typing import Callable, Optional
import asyncio
class SpeechToText:
def __init__(self, api_key: str):
aai.settings.api_key = api_key
self.transcriber: Optional[aai.RealtimeTranscriber] = None
self.on_transcript: Optional[Callable[[str], None]] = None
self.buffer = ""
async def start(self, on_transcript: Callable[[str], None]):
"""Start real-time transcription."""
self.on_transcript = on_transcript
def on_data(transcript: aai.RealtimeTranscript):
if isinstance(transcript, aai.RealtimeFinalTranscript):
# Final transcript for this utterance
text = transcript.text.strip()
if text:
self.on_transcript(text)
elif isinstance(transcript, aai.RealtimePartialTranscript):
# Partial - could show interim results
pass
def on_error(error: aai.RealtimeError):
print(f"Transcription error: {error}")
self.transcriber = aai.RealtimeTranscriber(
sample_rate=16000,
on_data=on_data,
on_error=on_error,
encoding=aai.AudioEncoding.pcm_s16le
)
await asyncio.to_thread(self.transcriber.connect)
async def send_audio(self, audio_data: bytes):
"""Send audio chunk for transcription."""
if self.transcriber:
await asyncio.to_thread(self.transcriber.stream, audio_data)
async def stop(self):
"""Stop transcription."""
if self.transcriber:
await asyncio.to_thread(self.transcriber.close)
self.transcriber = None
class BufferedSpeechToText(SpeechToText):
"""Speech-to-text with silence detection for turn-taking."""
def __init__(self, api_key: str, silence_threshold: float = 1.0):
super().__init__(api_key)
self.silence_threshold = silence_threshold
self.last_speech_time = 0
self.current_utterance = []
self.on_utterance_complete: Optional[Callable[[str], None]] = None
async def start(self, on_utterance: Callable[[str], None]):
"""Start with utterance-level callbacks."""
self.on_utterance_complete = on_utterance
def handle_transcript(text: str):
self.current_utterance.append(text)
self.last_speech_time = asyncio.get_event_loop().time()
await super().start(handle_transcript)
# Start silence detection
asyncio.create_task(self._detect_silence())
async def _detect_silence(self):
"""Detect silence to determine end of utterance."""
while self.transcriber:
await asyncio.sleep(0.1)
current_time = asyncio.get_event_loop().time()
if (self.current_utterance and
current_time - self.last_speech_time > self.silence_threshold):
# Silence detected - utterance complete
full_utterance = " ".join(self.current_utterance)
self.current_utterance = []
if self.on_utterance_complete:
self.on_utterance_complete(full_utterance)
Text-to-Speech with CartesiaAI
Streaming speech synthesis:
# agent/text_to_speech.py
from cartesia import Cartesia
from typing import AsyncGenerator, Optional
import asyncio
class TextToSpeech:
def __init__(self, api_key: str, voice_id: str):
self.client = Cartesia(api_key=api_key)
self.voice_id = voice_id
self.voice = self.client.voices.get(id=voice_id)
async def synthesize(self, text: str) -> bytes:
"""Synthesize complete audio from text."""
output = await asyncio.to_thread(
self.client.tts.bytes,
model_id="sonic-english",
transcript=text,
voice_embedding=self.voice["embedding"],
output_format={
"container": "raw",
"encoding": "pcm_s16le",
"sample_rate": 24000
}
)
return output
async def synthesize_stream(
self,
text: str
) -> AsyncGenerator[bytes, None]:
"""Stream audio chunks as they are generated."""
# Use streaming API for lower latency
stream = self.client.tts.sse(
model_id="sonic-english",
transcript=text,
voice_embedding=self.voice["embedding"],
output_format={
"container": "raw",
"encoding": "pcm_s16le",
"sample_rate": 24000
},
stream=True
)
for event in stream:
if hasattr(event, 'audio'):
yield event.audio
class StreamingTTS(TextToSpeech):
"""TTS with sentence-level streaming for faster response."""
def __init__(self, api_key: str, voice_id: str):
super().__init__(api_key, voice_id)
self.current_stream = None
async def synthesize_sentences(
self,
text_generator: AsyncGenerator[str, None]
) -> AsyncGenerator[bytes, None]:
"""Synthesize as sentences become available."""
sentence_buffer = ""
sentence_endings = ".!?"
async for chunk in text_generator:
sentence_buffer += chunk
# Check for complete sentences
while any(end in sentence_buffer for end in sentence_endings):
# Find first sentence ending
end_idx = min(
(sentence_buffer.find(end) for end in sentence_endings
if end in sentence_buffer),
default=-1
)
if end_idx == -1:
break
# Extract and synthesize sentence
sentence = sentence_buffer[:end_idx + 1].strip()
sentence_buffer = sentence_buffer[end_idx + 1:]
if sentence:
async for audio in self.synthesize_stream(sentence):
yield audio
# Handle remaining text
if sentence_buffer.strip():
async for audio in self.synthesize_stream(sentence_buffer):
yield audio
async def interrupt(self):
"""Stop current synthesis."""
self.current_stream = None
Building the RAG Engine
LlamaIndex Integration
# agent/rag_engine.py
from llama_index.core import (
VectorStoreIndex,
SimpleDirectoryReader,
Settings,
StorageContext,
load_index_from_storage
)
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.openai import OpenAI
from typing import AsyncGenerator, Optional
import os
class RAGEngine:
def __init__(
self,
documents_path: str,
openai_api_key: str,
persist_dir: str = "./storage"
):
# Configure LlamaIndex
Settings.embed_model = OpenAIEmbedding(
api_key=openai_api_key,
model="text-embedding-3-small"
)
Settings.llm = OpenAI(
api_key=openai_api_key,
model="gpt-4-turbo-preview",
temperature=0.7
)
self.persist_dir = persist_dir
self.documents_path = documents_path
self.index: Optional[VectorStoreIndex] = None
self._load_or_create_index()
def _load_or_create_index(self):
"""Load existing index or create new one."""
if os.path.exists(self.persist_dir):
# Load existing
storage_context = StorageContext.from_defaults(
persist_dir=self.persist_dir
)
self.index = load_index_from_storage(storage_context)
else:
# Create new
documents = SimpleDirectoryReader(
self.documents_path
).load_data()
self.index = VectorStoreIndex.from_documents(documents)
self.index.storage_context.persist(persist_dir=self.persist_dir)
def query(self, question: str, top_k: int = 3) -> str:
"""Query the index and return response."""
query_engine = self.index.as_query_engine(
similarity_top_k=top_k
)
response = query_engine.query(question)
return str(response)
async def query_stream(
self,
question: str,
system_prompt: str = ""
) -> AsyncGenerator[str, None]:
"""Stream response tokens as they are generated."""
query_engine = self.index.as_query_engine(
streaming=True,
similarity_top_k=3
)
# Add system prompt for voice-appropriate responses
full_query = question
if system_prompt:
full_query = f"{system_prompt}\n\nUser question: {question}"
response = query_engine.query(full_query)
# Stream response tokens
for token in response.response_gen:
yield token
def add_document(self, filepath: str):
"""Add a new document to the index."""
from llama_index.core import Document
with open(filepath, 'r') as f:
text = f.read()
doc = Document(text=text, metadata={"source": filepath})
self.index.insert(doc)
self.index.storage_context.persist(persist_dir=self.persist_dir)
class ConversationalRAG(RAGEngine):
"""RAG with conversation history for context."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.conversation_history = []
self.max_history = 10
def add_to_history(self, role: str, content: str):
"""Add message to conversation history."""
self.conversation_history.append({
"role": role,
"content": content
})
# Trim history if too long
if len(self.conversation_history) > self.max_history:
self.conversation_history = self.conversation_history[-self.max_history:]
async def query_with_history(
self,
question: str,
system_prompt: str = ""
) -> AsyncGenerator[str, None]:
"""Query with conversation context."""
# Build context from history
history_context = "\n".join([
f"{msg['role']}: {msg['content']}"
for msg in self.conversation_history[-6:] # Last 3 exchanges
])
enhanced_query = f"""Previous conversation:
{history_context}
Current question: {question}
Provide a response that:
1. Considers the conversation context
2. Is conversational and natural for speech
3. Is concise (1-3 sentences when possible)"""
self.add_to_history("user", question)
full_response = ""
async for token in self.query_stream(enhanced_query, system_prompt):
full_response += token
yield token
self.add_to_history("assistant", full_response)
def clear_history(self):
"""Clear conversation history."""
self.conversation_history = []
Livekit Integration
Audio Handler
# livekit/audio_handler.py
from livekit import rtc
from typing import Callable, Optional
import numpy as np
import asyncio
class AudioHandler:
def __init__(self, sample_rate: int = 24000):
self.sample_rate = sample_rate
self.on_audio_received: Optional[Callable[[bytes], None]] = None
self.audio_source: Optional[rtc.AudioSource] = None
self.audio_track: Optional[rtc.LocalAudioTrack] = None
async def setup_input(
self,
room: rtc.Room,
on_audio: Callable[[bytes], None]
):
"""Set up audio input handling."""
self.on_audio_received = on_audio
@room.on("track_subscribed")
def on_track_subscribed(
track: rtc.Track,
publication: rtc.RemoteTrackPublication,
participant: rtc.RemoteParticipant
):
if track.kind == rtc.TrackKind.KIND_AUDIO:
asyncio.create_task(self._process_audio_track(track))
async def _process_audio_track(self, track: rtc.Track):
"""Process incoming audio track."""
audio_stream = rtc.AudioStream(track)
async for frame in audio_stream:
# Convert to bytes for STT
audio_data = frame.data.tobytes()
if self.on_audio_received:
self.on_audio_received(audio_data)
async def setup_output(self, room: rtc.Room):
"""Set up audio output."""
self.audio_source = rtc.AudioSource(
self.sample_rate,
num_channels=1
)
self.audio_track = rtc.LocalAudioTrack.create_audio_track(
"agent-voice",
self.audio_source
)
await room.local_participant.publish_track(
self.audio_track,
rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE)
)
async def play_audio(self, audio_data: bytes):
"""Play audio through the output track."""
if not self.audio_source:
return
# Convert bytes to numpy array
audio_array = np.frombuffer(audio_data, dtype=np.int16)
# Create audio frame
frame = rtc.AudioFrame(
data=audio_array.tobytes(),
sample_rate=self.sample_rate,
num_channels=1,
samples_per_channel=len(audio_array)
)
await self.audio_source.capture_frame(frame)
async def play_audio_stream(self, audio_generator):
"""Play streaming audio."""
async for chunk in audio_generator:
await self.play_audio(chunk)
# Small delay to prevent buffer overflow
await asyncio.sleep(0.01)
Room Manager
# livekit/room_manager.py
from livekit import api, rtc
from typing import Optional
import asyncio
class RoomManager:
def __init__(
self,
api_key: str,
api_secret: str,
livekit_url: str
):
self.api_key = api_key
self.api_secret = api_secret
self.livekit_url = livekit_url
self.room: Optional[rtc.Room] = None
async def create_room(self, room_name: str) -> str:
"""Create a room and return access token."""
lk_api = api.LiveKitAPI(
self.livekit_url,
self.api_key,
self.api_secret
)
# Create room
await lk_api.room.create_room(
api.CreateRoomRequest(name=room_name)
)
# Generate token for agent
token = api.AccessToken(self.api_key, self.api_secret)
token.with_identity("voice-agent")
token.with_name("Voice RAG Agent")
token.with_grants(api.VideoGrants(
room_join=True,
room=room_name
))
return token.to_jwt()
async def join_room(self, room_name: str) -> rtc.Room:
"""Join a room as the agent."""
token = await self.create_room(room_name)
self.room = rtc.Room()
await self.room.connect(
self.livekit_url,
token,
options=rtc.RoomOptions(
auto_subscribe=True
)
)
return self.room
async def leave_room(self):
"""Leave the current room."""
if self.room:
await self.room.disconnect()
self.room = None
def generate_participant_token(
self,
room_name: str,
participant_name: str
) -> str:
"""Generate token for a participant to join."""
token = api.AccessToken(self.api_key, self.api_secret)
token.with_identity(participant_name)
token.with_name(participant_name)
token.with_grants(api.VideoGrants(
room_join=True,
room=room_name
))
return token.to_jwt()
Complete Voice Agent
Main Agent Class
# agent/voice_agent.py
from agent.speech_to_text import BufferedSpeechToText
from agent.text_to_speech import StreamingTTS
from agent.rag_engine import ConversationalRAG
from livekit.room_manager import RoomManager
from livekit.audio_handler import AudioHandler
from config.settings import settings
import asyncio
from typing import Optional
class VoiceRAGAgent:
def __init__(self):
# Initialize components
self.stt = BufferedSpeechToText(
settings.assemblyai_api_key,
silence_threshold=1.0
)
self.tts = StreamingTTS(
settings.cartesia_api_key,
settings.cartesia_voice_id
)
self.rag = ConversationalRAG(
documents_path="./documents",
openai_api_key=settings.openai_api_key
)
self.room_manager = RoomManager(
settings.livekit_api_key,
settings.livekit_api_secret,
settings.livekit_url
)
self.audio_handler = AudioHandler(settings.sample_rate)
self.is_speaking = False
self.should_interrupt = False
async def start(self, room_name: str):
"""Start the voice agent in a room."""
print(f"Starting voice agent in room: {room_name}")
# Join room
room = await self.room_manager.join_room(room_name)
# Setup audio handling
await self.audio_handler.setup_output(room)
await self.audio_handler.setup_input(
room,
self._on_audio_received
)
# Start speech-to-text
await self.stt.start(self._on_utterance_complete)
# Send greeting
await self._speak("Hello! I'm your voice assistant. How can I help you today?")
print("Voice agent is ready and listening...")
# Keep running
try:
while True:
await asyncio.sleep(1)
except asyncio.CancelledError:
await self.stop()
async def stop(self):
"""Stop the voice agent."""
await self.stt.stop()
await self.room_manager.leave_room()
print("Voice agent stopped.")
def _on_audio_received(self, audio_data: bytes):
"""Handle incoming audio from user."""
# Check for barge-in (user speaks while agent is speaking)
if self.is_speaking:
self.should_interrupt = True
# Send to STT
asyncio.create_task(self.stt.send_audio(audio_data))
def _on_utterance_complete(self, text: str):
"""Handle complete utterance from user."""
print(f"User said: {text}")
# Process the utterance
asyncio.create_task(self._process_utterance(text))
async def _process_utterance(self, text: str):
"""Process user utterance and respond."""
# Handle interruption
if self.should_interrupt:
await self.tts.interrupt()
self.is_speaking = False
self.should_interrupt = False
# Skip if still speaking (and no interrupt)
if self.is_speaking:
return
# Check for special commands
if self._is_command(text):
await self._handle_command(text)
return
# Query RAG and respond
await self._query_and_respond(text)
async def _query_and_respond(self, question: str):
"""Query RAG engine and speak response."""
self.is_speaking = True
try:
# Stream response generation
response_generator = self.rag.query_with_history(
question,
settings.system_prompt
)
# Convert text stream to speech stream
audio_generator = self.tts.synthesize_sentences(response_generator)
# Play audio
await self.audio_handler.play_audio_stream(audio_generator)
except Exception as e:
print(f"Error processing query: {e}")
await self._speak("I'm sorry, I had trouble understanding that. Could you repeat?")
finally:
self.is_speaking = False
async def _speak(self, text: str):
"""Speak a text message."""
self.is_speaking = True
try:
async for audio in self.tts.synthesize_stream(text):
if self.should_interrupt:
break
await self.audio_handler.play_audio(audio)
finally:
self.is_speaking = False
self.should_interrupt = False
def _is_command(self, text: str) -> bool:
"""Check if utterance is a command."""
commands = ["clear history", "reset", "goodbye", "stop"]
return any(cmd in text.lower() for cmd in commands)
async def _handle_command(self, text: str):
"""Handle special commands."""
text_lower = text.lower()
if "clear history" in text_lower or "reset" in text_lower:
self.rag.clear_history()
await self._speak("Conversation history cleared. How can I help you?")
elif "goodbye" in text_lower or "stop" in text_lower:
await self._speak("Goodbye! Have a great day.")
await self.stop()
Main Entry Point
# main.py
import asyncio
from agent.voice_agent import VoiceRAGAgent
async def main():
agent = VoiceRAGAgent()
# Get room name from command line or use default
import sys
room_name = sys.argv[1] if len(sys.argv) > 1 else "voice-rag-room"
await agent.start(room_name)
if __name__ == "__main__":
asyncio.run(main())
Client Integration
Web Client Example
<!-- client/index.html -->
<!DOCTYPE html>
<html>
<head>
<title>Voice RAG Assistant</title>
<script src="https://unpkg.com/livekit-client/dist/livekit-client.umd.min.js"></script>
</head>
<body>
<h1>Voice RAG Assistant</h1>
<button id="connect">Connect</button>
<button id="disconnect" disabled>Disconnect</button>
<div id="status">Not connected</div>
<script>
const connectBtn = document.getElementById('connect');
const disconnectBtn = document.getElementById('disconnect');
const statusDiv = document.getElementById('status');
let room = null;
connectBtn.addEventListener('click', async () => {
// Get token from your backend
const response = await fetch('/api/token?room=voice-rag-room&name=user');
const { token, url } = await response.json();
room = new LivekitClient.Room();
room.on('connected', () => {
statusDiv.textContent = 'Connected - speak to interact';
connectBtn.disabled = true;
disconnectBtn.disabled = false;
});
room.on('disconnected', () => {
statusDiv.textContent = 'Disconnected';
connectBtn.disabled = false;
disconnectBtn.disabled = true;
});
await room.connect(url, token);
// Enable microphone
await room.localParticipant.setMicrophoneEnabled(true);
});
disconnectBtn.addEventListener('click', async () => {
if (room) {
await room.disconnect();
}
});
</script>
</body>
</html>
Token Endpoint
# api/token.py
from fastapi import FastAPI, Query
from livekit.room_manager import RoomManager
from config.settings import settings
app = FastAPI()
room_manager = RoomManager(
settings.livekit_api_key,
settings.livekit_api_secret,
settings.livekit_url
)
@app.get("/api/token")
async def get_token(
room: str = Query(...),
name: str = Query(...)
):
token = room_manager.generate_participant_token(room, name)
return {
"token": token,
"url": settings.livekit_url
}
Optimizing for Low Latency
Latency Sources
Component | Typical Latency | Optimization
-------------------|-----------------|-------------
Audio capture | 20-50ms | Smaller buffer sizes
Network (WebRTC) | 50-150ms | Use closest server
Speech-to-text | 200-500ms | Streaming transcription
RAG query | 500-2000ms | Caching, smaller context
Text-to-speech | 100-300ms | Streaming synthesis
Audio playback | 20-50ms | Smaller buffer sizes
-------------------|-----------------|-------------
Total | 900-3000ms | Target: <1500ms
Optimization Strategies
# Optimize RAG for voice
class VoiceOptimizedRAG(ConversationalRAG):
"""RAG optimized for voice latency."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.response_cache = {}
async def query_with_history(self, question: str, system_prompt: str = ""):
# Check cache for common questions
cache_key = question.lower().strip()
if cache_key in self.response_cache:
for char in self.response_cache[cache_key]:
yield char
return
# Limit context for faster responses
trimmed_history = self.conversation_history[-4:] # Only last 2 exchanges
# Use faster model for initial response
Settings.llm = OpenAI(model="gpt-3.5-turbo", temperature=0.7)
full_response = ""
async for token in super().query_with_history(question, system_prompt):
full_response += token
yield token
# Cache the response
if len(full_response) < 500: # Only cache short responses
self.response_cache[cache_key] = full_response
Parallel Processing
async def _query_and_respond_parallel(self, question: str):
"""Process with parallel TTS generation."""
self.is_speaking = True
try:
# Start generating response
response_task = asyncio.create_task(
self._collect_response(question)
)
# Start TTS on first sentence as soon as available
first_sentence_ready = asyncio.Event()
sentence_queue = asyncio.Queue()
async def stream_sentences():
buffer = ""
async for token in self.rag.query_with_history(question, settings.system_prompt):
buffer += token
if any(end in buffer for end in ".!?"):
# Extract complete sentence
for end in ".!?":
if end in buffer:
idx = buffer.index(end)
sentence = buffer[:idx+1]
buffer = buffer[idx+1:]
await sentence_queue.put(sentence)
first_sentence_ready.set()
break
# Remaining text
if buffer.strip():
await sentence_queue.put(buffer)
await sentence_queue.put(None) # Signal end
asyncio.create_task(stream_sentences())
# Wait for first sentence then start speaking
await first_sentence_ready.wait()
while True:
sentence = await sentence_queue.get()
if sentence is None:
break
async for audio in self.tts.synthesize_stream(sentence):
if self.should_interrupt:
return
await self.audio_handler.play_audio(audio)
finally:
self.is_speaking = False
Testing
Unit Tests
# tests/test_voice_agent.py
import pytest
import asyncio
from agent.speech_to_text import SpeechToText
from agent.text_to_speech import TextToSpeech
@pytest.mark.asyncio
async def test_tts_synthesis():
tts = TextToSpeech("test-key", "test-voice")
# Mock the API call
audio = await tts.synthesize("Hello world")
assert len(audio) > 0
@pytest.mark.asyncio
async def test_rag_query():
rag = ConversationalRAG("./test_docs", "test-key")
response = rag.query("What is Python?")
assert len(response) > 0
Integration Test
# tests/test_integration.py
@pytest.mark.asyncio
async def test_full_conversation():
agent = VoiceRAGAgent()
# Simulate utterance
response_text = []
async def capture_tts(text):
response_text.append(text)
agent.tts.synthesize = capture_tts
await agent._process_utterance("What can you help me with?")
assert len(response_text) > 0
Summary
Building a voice RAG agent combines multiple real-time systems into a cohesive conversational experience. The key challenges are managing latency across the pipeline and handling the complexities of turn-taking in spoken conversation.
Key implementation points:
- Stream everything: From transcription to response generation to speech synthesis
- Handle interruptions: Users will speak over the agent
- Optimize for latency: Target under 1.5 seconds end-to-end
- Maintain context: Conversation history makes interactions natural
- Use appropriate responses: Voice responses should be concise and conversational
The architecture is extensible. Add wake word detection for hands-free activation. Implement speaker identification for multi-user scenarios. Integrate with external APIs for actions beyond information retrieval.
Ready to build multi-agent research systems? Continue to Build a Deep Researcher: Multi-Agent Tutorial to learn how multiple agents collaborate on complex research tasks.