Building an Agentic RAG System: Complete Tutorial
Build a retrieval-augmented generation system with agentic capabilities. Learn to combine CrewAI, Firecrawl, and LitServe for intelligent document retrieval.
Building an Agentic RAG System: Complete Tutorial
Traditional RAG systems are reactive. They receive a query, search for relevant documents, and generate a response. The retrieval strategy is fixed: embed the query, find similar documents, stuff them into context.
Agentic RAG goes further. Instead of a fixed retrieval pipeline, an agent decides how to search, what to retrieve, when to search again, and how to combine information from multiple sources. The agent can reformulate queries, follow references, validate information, and iterate until it finds what it needs.
This tutorial builds a complete agentic RAG system using CrewAI for agent orchestration, Firecrawl for web scraping, and LitServe for serving the system. By the end, you will have a working system that intelligently retrieves and synthesizes information.
Understanding Agentic RAG
Traditional RAG Limitations
Standard RAG pipelines have fixed behavior:
Query → Embed → Search Vector Store → Retrieve Top K → Generate Response
This works for straightforward questions but fails when:
- The query needs reformulation for better retrieval
- Multiple searches with different strategies would help
- Retrieved documents need validation or fact-checking
- Information must be synthesized from multiple sources
- The answer requires following chains of references
What Makes RAG Agentic?
Agentic RAG adds decision-making to the retrieval process:
Query → Agent Analyzes Query → Agent Decides Search Strategy →
Agent Executes Search(es) → Agent Evaluates Results →
Agent Decides: More Search Needed? → Agent Synthesizes Answer
The agent can:
- Decompose complex queries into sub-queries
- Choose between different search strategies
- Evaluate retrieval quality and search again if needed
- Follow references in retrieved documents
- Combine information from multiple retrieval rounds
- Validate retrieved information against other sources
Architecture Overview
Our system has three main components:
1. Agent Layer (CrewAI) Orchestrates the retrieval process with multiple specialized agents:
- Query Analyzer Agent
- Retrieval Strategist Agent
- Information Synthesizer Agent
2. Retrieval Layer (Firecrawl + Vector Store) Handles actual document retrieval:
- Web scraping with Firecrawl
- Vector similarity search
- Document chunking and embedding
3. Serving Layer (LitServe) Exposes the system as an API:
- Request handling
- Response streaming
- Caching and optimization
Setting Up the Environment
Project Structure
agentic-rag/
├── agents/
│ ├── __init__.py
│ ├── query_analyzer.py
│ ├── retrieval_strategist.py
│ └── synthesizer.py
├── retrieval/
│ ├── __init__.py
│ ├── firecrawl_loader.py
│ ├── vector_store.py
│ └── chunker.py
├── serving/
│ ├── __init__.py
│ └── api.py
├── config/
│ └── settings.py
├── main.py
├── requirements.txt
└── README.md
Dependencies
# requirements.txt
crewai>=0.28.0
firecrawl-py>=0.0.8
litserve>=0.1.0
chromadb>=0.4.0
langchain>=0.1.0
openai>=1.0.0
pydantic>=2.0.0
python-dotenv>=1.0.0
Configuration
# config/settings.py
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
# API Keys
openai_api_key: str
firecrawl_api_key: str
# Model Settings
embedding_model: str = "text-embedding-3-small"
llm_model: str = "gpt-4-turbo-preview"
# Retrieval Settings
chunk_size: int = 1000
chunk_overlap: int = 200
top_k: int = 5
similarity_threshold: float = 0.7
# Agent Settings
max_iterations: int = 5
max_search_rounds: int = 3
class Config:
env_file = ".env"
settings = Settings()
Building the Retrieval Layer
Document Loader with Firecrawl
Firecrawl scrapes web content and converts it to clean markdown:
# retrieval/firecrawl_loader.py
from firecrawl import FirecrawlApp
from typing import List, Dict
import hashlib
class FirecrawlLoader:
def __init__(self, api_key: str):
self.app = FirecrawlApp(api_key=api_key)
self.cache: Dict[str, str] = {}
def scrape_url(self, url: str) -> Dict:
"""Scrape a single URL and return structured content."""
cache_key = hashlib.md5(url.encode()).hexdigest()
if cache_key in self.cache:
return self.cache[cache_key]
result = self.app.scrape_url(
url,
params={
'formats': ['markdown', 'html'],
'onlyMainContent': True
}
)
document = {
'url': url,
'content': result.get('markdown', ''),
'title': result.get('metadata', {}).get('title', ''),
'description': result.get('metadata', {}).get('description', ''),
'links': self._extract_links(result.get('html', ''))
}
self.cache[cache_key] = document
return document
def crawl_site(self, url: str, max_pages: int = 10) -> List[Dict]:
"""Crawl a website starting from URL."""
result = self.app.crawl_url(
url,
params={
'limit': max_pages,
'scrapeOptions': {
'formats': ['markdown'],
'onlyMainContent': True
}
}
)
documents = []
for page in result.get('data', []):
documents.append({
'url': page.get('url', ''),
'content': page.get('markdown', ''),
'title': page.get('metadata', {}).get('title', ''),
})
return documents
def _extract_links(self, html: str) -> List[str]:
"""Extract links from HTML content."""
from bs4 import BeautifulSoup
soup = BeautifulSoup(html, 'html.parser')
links = []
for a in soup.find_all('a', href=True):
href = a['href']
if href.startswith('http'):
links.append(href)
return links[:20] # Limit to 20 links
Document Chunking
Split documents into retrievable chunks:
# retrieval/chunker.py
from typing import List, Dict
from langchain.text_splitter import RecursiveCharacterTextSplitter
class DocumentChunker:
def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
self.splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["\n\n", "\n", ". ", " ", ""]
)
def chunk_document(self, document: Dict) -> List[Dict]:
"""Split a document into chunks with metadata."""
chunks = self.splitter.split_text(document['content'])
return [
{
'content': chunk,
'metadata': {
'url': document.get('url', ''),
'title': document.get('title', ''),
'chunk_index': i,
'total_chunks': len(chunks)
}
}
for i, chunk in enumerate(chunks)
]
def chunk_documents(self, documents: List[Dict]) -> List[Dict]:
"""Chunk multiple documents."""
all_chunks = []
for doc in documents:
all_chunks.extend(self.chunk_document(doc))
return all_chunks
Vector Store
Store and search document embeddings:
# retrieval/vector_store.py
import chromadb
from chromadb.utils import embedding_functions
from typing import List, Dict, Optional
import uuid
class VectorStore:
def __init__(self, collection_name: str, openai_api_key: str):
self.client = chromadb.Client()
self.embedding_fn = embedding_functions.OpenAIEmbeddingFunction(
api_key=openai_api_key,
model_name="text-embedding-3-small"
)
self.collection = self.client.get_or_create_collection(
name=collection_name,
embedding_function=self.embedding_fn,
metadata={"hnsw:space": "cosine"}
)
def add_documents(self, chunks: List[Dict]) -> None:
"""Add document chunks to the vector store."""
ids = [str(uuid.uuid4()) for _ in chunks]
documents = [chunk['content'] for chunk in chunks]
metadatas = [chunk['metadata'] for chunk in chunks]
self.collection.add(
ids=ids,
documents=documents,
metadatas=metadatas
)
def search(
self,
query: str,
top_k: int = 5,
filter_metadata: Optional[Dict] = None
) -> List[Dict]:
"""Search for relevant documents."""
results = self.collection.query(
query_texts=[query],
n_results=top_k,
where=filter_metadata
)
documents = []
for i in range(len(results['ids'][0])):
documents.append({
'id': results['ids'][0][i],
'content': results['documents'][0][i],
'metadata': results['metadatas'][0][i],
'distance': results['distances'][0][i] if results['distances'] else None
})
return documents
def search_with_reranking(
self,
query: str,
top_k: int = 5,
initial_k: int = 20
) -> List[Dict]:
"""Search with initial over-retrieval and reranking."""
# Get more results initially
initial_results = self.search(query, top_k=initial_k)
# Simple relevance scoring (could use a reranker model)
for doc in initial_results:
# Score based on keyword overlap
query_terms = set(query.lower().split())
doc_terms = set(doc['content'].lower().split())
overlap = len(query_terms & doc_terms)
doc['relevance_score'] = overlap / len(query_terms) if query_terms else 0
# Sort by combined score
initial_results.sort(
key=lambda x: (x['relevance_score'], -x['distance']),
reverse=True
)
return initial_results[:top_k]
Building the Agent Layer
Query Analyzer Agent
Analyzes queries and plans retrieval:
# agents/query_analyzer.py
from crewai import Agent, Task
from langchain_openai import ChatOpenAI
def create_query_analyzer(llm: ChatOpenAI) -> Agent:
return Agent(
role='Query Analyzer',
goal='Analyze user queries and determine the best retrieval strategy',
backstory='''You are an expert at understanding user information needs.
You analyze queries to identify:
- The core information need
- Required specificity level
- Whether the query needs decomposition
- What types of sources would be most helpful
- Keywords and concepts for retrieval''',
llm=llm,
verbose=True
)
def create_analysis_task(agent: Agent, query: str) -> Task:
return Task(
description=f'''Analyze this query and create a retrieval plan:
Query: {query}
Provide:
1. Query Type: (factual, conceptual, procedural, comparative)
2. Complexity: (simple, moderate, complex)
3. Sub-queries: If complex, break into simpler sub-queries
4. Key Concepts: Main concepts to search for
5. Search Strategy: Recommended approach
6. Expected Source Types: What kinds of documents would help
Output as structured JSON.''',
expected_output='JSON object with query analysis and retrieval plan',
agent=agent
)
Retrieval Strategist Agent
Executes searches and evaluates results:
# agents/retrieval_strategist.py
from crewai import Agent, Task
from crewai_tools import tool
from langchain_openai import ChatOpenAI
from typing import List, Dict
class RetrievalTools:
def __init__(self, vector_store, firecrawl_loader):
self.vector_store = vector_store
self.firecrawl_loader = firecrawl_loader
@tool("Vector Search")
def vector_search(self, query: str, top_k: int = 5) -> str:
"""Search the vector store for relevant documents."""
results = self.vector_store.search(query, top_k=top_k)
return self._format_results(results)
@tool("Web Search")
def web_search(self, url: str) -> str:
"""Scrape and search a specific URL."""
document = self.firecrawl_loader.scrape_url(url)
return f"Title: {document['title']}\n\nContent: {document['content'][:2000]}"
@tool("Follow Link")
def follow_link(self, url: str) -> str:
"""Follow a link found in previous results."""
document = self.firecrawl_loader.scrape_url(url)
# Add to vector store for future queries
from retrieval.chunker import DocumentChunker
chunker = DocumentChunker()
chunks = chunker.chunk_document(document)
self.vector_store.add_documents(chunks)
return f"Added content from {url} to knowledge base. Key content: {document['content'][:1000]}"
def _format_results(self, results: List[Dict]) -> str:
formatted = []
for i, doc in enumerate(results, 1):
formatted.append(f"""
Result {i}:
Source: {doc['metadata'].get('url', 'Unknown')}
Title: {doc['metadata'].get('title', 'Unknown')}
Content: {doc['content']}
Relevance: {1 - doc['distance']:.2f}
---""")
return "\n".join(formatted)
def create_retrieval_strategist(
llm: ChatOpenAI,
tools: RetrievalTools
) -> Agent:
return Agent(
role='Retrieval Strategist',
goal='Execute optimal retrieval strategies to find relevant information',
backstory='''You are an expert at finding information. You know how to:
- Craft effective search queries
- Evaluate search result quality
- Decide when more searching is needed
- Follow promising leads in documents
- Combine information from multiple sources''',
llm=llm,
tools=[tools.vector_search, tools.web_search, tools.follow_link],
verbose=True
)
def create_retrieval_task(
agent: Agent,
query: str,
analysis: Dict,
previous_results: List[Dict] = None
) -> Task:
previous_context = ""
if previous_results:
previous_context = f"\nPrevious search results:\n{previous_results}\n"
return Task(
description=f'''Execute retrieval for this query:
Original Query: {query}
Query Analysis:
{analysis}
{previous_context}
Steps:
1. Start with vector search using key concepts
2. Evaluate result quality (relevance, completeness)
3. If results are insufficient:
- Try reformulated queries
- Follow promising links
- Search for related concepts
4. Continue until you have sufficient information or hit limits
Return the best results found with quality assessment.''',
expected_output='Retrieved documents with relevance assessment',
agent=agent
)
Information Synthesizer Agent
Combines retrieved information into coherent answers:
# agents/synthesizer.py
from crewai import Agent, Task
from langchain_openai import ChatOpenAI
from typing import List, Dict
def create_synthesizer(llm: ChatOpenAI) -> Agent:
return Agent(
role='Information Synthesizer',
goal='Synthesize retrieved information into accurate, comprehensive answers',
backstory='''You are an expert at combining information from multiple
sources into clear, accurate answers. You:
- Identify the most relevant information
- Resolve conflicts between sources
- Acknowledge uncertainty when present
- Cite sources appropriately
- Structure answers for clarity''',
llm=llm,
verbose=True
)
def create_synthesis_task(
agent: Agent,
query: str,
retrieved_docs: List[Dict],
analysis: Dict
) -> Task:
docs_context = "\n\n".join([
f"Source: {doc['metadata'].get('url', 'Unknown')}\n{doc['content']}"
for doc in retrieved_docs
])
return Task(
description=f'''Synthesize an answer to this query from the retrieved documents:
Query: {query}
Query Analysis: {analysis}
Retrieved Documents:
{docs_context}
Requirements:
1. Answer the query directly and completely
2. Use only information from the provided documents
3. Cite sources for key claims
4. Note any gaps or uncertainties
5. If documents conflict, explain the different perspectives
6. Structure the answer clearly
If the documents do not contain sufficient information to answer,
explain what is missing and what was found.''',
expected_output='Synthesized answer with citations and confidence assessment',
agent=agent
)
Orchestrating the Agents
Main RAG Pipeline
# main.py
from crewai import Crew, Process
from langchain_openai import ChatOpenAI
from config.settings import settings
from retrieval.firecrawl_loader import FirecrawlLoader
from retrieval.vector_store import VectorStore
from retrieval.chunker import DocumentChunker
from agents.query_analyzer import create_query_analyzer, create_analysis_task
from agents.retrieval_strategist import (
create_retrieval_strategist,
RetrievalTools,
create_retrieval_task
)
from agents.synthesizer import create_synthesizer, create_synthesis_task
import json
class AgenticRAG:
def __init__(self):
# Initialize LLM
self.llm = ChatOpenAI(
model=settings.llm_model,
api_key=settings.openai_api_key
)
# Initialize retrieval components
self.firecrawl = FirecrawlLoader(settings.firecrawl_api_key)
self.vector_store = VectorStore(
collection_name="agentic_rag",
openai_api_key=settings.openai_api_key
)
self.chunker = DocumentChunker(
chunk_size=settings.chunk_size,
chunk_overlap=settings.chunk_overlap
)
# Initialize agents
self.query_analyzer = create_query_analyzer(self.llm)
retrieval_tools = RetrievalTools(self.vector_store, self.firecrawl)
self.retrieval_strategist = create_retrieval_strategist(
self.llm,
retrieval_tools
)
self.synthesizer = create_synthesizer(self.llm)
def ingest_url(self, url: str) -> int:
"""Ingest a URL into the knowledge base."""
document = self.firecrawl.scrape_url(url)
chunks = self.chunker.chunk_document(document)
self.vector_store.add_documents(chunks)
return len(chunks)
def ingest_site(self, url: str, max_pages: int = 10) -> int:
"""Crawl and ingest a website."""
documents = self.firecrawl.crawl_site(url, max_pages)
all_chunks = self.chunker.chunk_documents(documents)
self.vector_store.add_documents(all_chunks)
return len(all_chunks)
def query(self, question: str) -> Dict:
"""Run agentic RAG query."""
# Phase 1: Analyze Query
analysis_task = create_analysis_task(self.query_analyzer, question)
analysis_crew = Crew(
agents=[self.query_analyzer],
tasks=[analysis_task],
process=Process.sequential,
verbose=True
)
analysis_result = analysis_crew.kickoff()
analysis = self._parse_analysis(analysis_result)
# Phase 2: Retrieve Information
retrieval_task = create_retrieval_task(
self.retrieval_strategist,
question,
analysis
)
retrieval_crew = Crew(
agents=[self.retrieval_strategist],
tasks=[retrieval_task],
process=Process.sequential,
verbose=True
)
retrieval_result = retrieval_crew.kickoff()
# Get the actual retrieved documents
retrieved_docs = self.vector_store.search(
question,
top_k=settings.top_k
)
# Phase 3: Synthesize Answer
synthesis_task = create_synthesis_task(
self.synthesizer,
question,
retrieved_docs,
analysis
)
synthesis_crew = Crew(
agents=[self.synthesizer],
tasks=[synthesis_task],
process=Process.sequential,
verbose=True
)
final_answer = synthesis_crew.kickoff()
return {
'question': question,
'analysis': analysis,
'sources': [doc['metadata'].get('url') for doc in retrieved_docs],
'answer': str(final_answer),
'confidence': self._estimate_confidence(retrieved_docs)
}
def _parse_analysis(self, result) -> Dict:
"""Parse the analysis result into structured format."""
try:
return json.loads(str(result))
except json.JSONDecodeError:
return {'raw_analysis': str(result)}
def _estimate_confidence(self, docs: List[Dict]) -> float:
"""Estimate answer confidence based on retrieval quality."""
if not docs:
return 0.0
# Average relevance of top results
avg_relevance = sum(1 - doc['distance'] for doc in docs) / len(docs)
# Bonus for multiple relevant sources
high_relevance_count = sum(
1 for doc in docs if (1 - doc['distance']) > 0.8
)
source_bonus = min(high_relevance_count * 0.1, 0.2)
return min(avg_relevance + source_bonus, 1.0)
# Usage example
if __name__ == "__main__":
rag = AgenticRAG()
# Ingest some documentation
rag.ingest_site("https://docs.example.com", max_pages=20)
# Query the system
result = rag.query("How do I configure authentication in the API?")
print(f"Answer: {result['answer']}")
print(f"Confidence: {result['confidence']:.2f}")
print(f"Sources: {result['sources']}")
Serving with LitServe
API Server
# serving/api.py
import litserve as ls
from main import AgenticRAG
from pydantic import BaseModel
from typing import List, Optional
class QueryRequest(BaseModel):
question: str
max_sources: Optional[int] = 5
class IngestRequest(BaseModel):
url: str
crawl: Optional[bool] = False
max_pages: Optional[int] = 10
class QueryResponse(BaseModel):
question: str
answer: str
sources: List[str]
confidence: float
class AgenticRAGAPI(ls.LitAPI):
def setup(self, device):
"""Initialize the RAG system."""
self.rag = AgenticRAG()
def decode_request(self, request):
"""Parse incoming request."""
return request
def predict(self, request):
"""Handle query or ingest request."""
if hasattr(request, 'question'):
# Query request
result = self.rag.query(request.question)
return QueryResponse(
question=result['question'],
answer=result['answer'],
sources=result['sources'],
confidence=result['confidence']
)
elif hasattr(request, 'url'):
# Ingest request
if request.crawl:
chunks = self.rag.ingest_site(request.url, request.max_pages)
else:
chunks = self.rag.ingest_url(request.url)
return {"status": "success", "chunks_added": chunks}
def encode_response(self, output):
"""Format response."""
if isinstance(output, QueryResponse):
return output.model_dump()
return output
def create_server():
api = AgenticRAGAPI()
server = ls.LitServer(api, accelerator="auto")
return server
if __name__ == "__main__":
server = create_server()
server.run(port=8000)
Running the Server
# Start the server
python serving/api.py
# Query the API
curl -X POST http://localhost:8000/predict \
-H "Content-Type: application/json" \
-d '{"question": "How do I set up authentication?"}'
# Ingest a URL
curl -X POST http://localhost:8000/predict \
-H "Content-Type: application/json" \
-d '{"url": "https://docs.example.com/auth", "crawl": false}'
Advanced Patterns
Iterative Retrieval
When initial results are insufficient:
def iterative_query(self, question: str, max_rounds: int = 3) -> Dict:
"""Query with multiple retrieval rounds if needed."""
all_docs = []
queries_tried = [question]
for round_num in range(max_rounds):
# Search with current best query
docs = self.vector_store.search(queries_tried[-1])
all_docs.extend(docs)
# Evaluate if we have enough
if self._sufficient_coverage(all_docs, question):
break
# Generate reformulated query
new_query = self._reformulate_query(
question,
queries_tried,
all_docs
)
if new_query and new_query not in queries_tried:
queries_tried.append(new_query)
# Deduplicate and rank
unique_docs = self._deduplicate_docs(all_docs)
return self._synthesize(question, unique_docs)
Multi-Source Validation
Cross-reference information across sources:
def validated_query(self, question: str) -> Dict:
"""Query with cross-source validation."""
results = self.query(question)
# Extract key claims
claims = self._extract_claims(results['answer'])
# Validate each claim
validated_claims = []
for claim in claims:
sources = self._find_supporting_sources(claim)
validated_claims.append({
'claim': claim,
'source_count': len(sources),
'validated': len(sources) >= 2
})
results['validation'] = validated_claims
results['validation_score'] = (
sum(1 for c in validated_claims if c['validated'])
/ len(validated_claims) if validated_claims else 0
)
return results
Testing the System
Unit Tests
# tests/test_retrieval.py
import pytest
from retrieval.vector_store import VectorStore
from retrieval.chunker import DocumentChunker
def test_chunker():
chunker = DocumentChunker(chunk_size=100, chunk_overlap=20)
doc = {
'content': 'A' * 250,
'url': 'http://test.com',
'title': 'Test'
}
chunks = chunker.chunk_document(doc)
assert len(chunks) >= 2
assert all('url' in c['metadata'] for c in chunks)
def test_vector_store_search(mock_openai):
store = VectorStore('test', 'fake-key')
store.add_documents([
{'content': 'Python is a programming language', 'metadata': {}},
{'content': 'JavaScript runs in browsers', 'metadata': {}}
])
results = store.search('programming language', top_k=1)
assert len(results) == 1
assert 'Python' in results[0]['content']
Integration Tests
# tests/test_integration.py
def test_full_pipeline():
rag = AgenticRAG()
# Ingest test content
rag.ingest_url('https://docs.pytest.org/en/latest/')
# Query
result = rag.query('How do I write a test fixture in pytest?')
assert result['answer']
assert result['confidence'] > 0.5
assert len(result['sources']) > 0
Summary
Agentic RAG transforms retrieval from a fixed pipeline into an intelligent search process. By combining CrewAI agents with Firecrawl scraping and vector search, you create a system that can:
- Analyze queries and plan retrieval strategies
- Execute multiple search rounds when needed
- Follow references and expand the knowledge base
- Synthesize coherent answers from multiple sources
- Serve results through a production-ready API
Key implementation points:
- Separate concerns: Query analysis, retrieval, and synthesis each get dedicated agents
- Iterate intelligently: Do not settle for poor retrieval results
- Track confidence: Know when answers are well-supported
- Cache aggressively: Avoid redundant scraping and embedding
- Serve efficiently: LitServe provides production-ready API serving
This architecture extends naturally. Add more specialized agents for domain-specific retrieval. Integrate additional data sources beyond web content. Implement more sophisticated reranking. The agentic foundation makes these extensions straightforward.
Want to add voice interaction to your RAG system? Continue to Building a Voice RAG Agent to learn how to build real-time audio interfaces.