Building Scalable RAG Systems: Architecture and Implementation

A comprehensive guide to designing and implementing Retrieval-Augmented Generation systems that scale to millions of documents and thousands of concurrent users.

Retrieval-Augmented Generation (RAG) has become the go-to pattern for building AI applications that need to work with large, dynamic knowledge bases. But most tutorials stop at the “toy example” stage. Today, we’re diving deep into production-ready RAG architectures that can handle millions of documents and thousands of concurrent users.

ℹ️

This is part 1 of a series on building a RAG-powered Obsidian-like note-taking system. We’ll cover architecture, implementation, and scaling strategies across multiple posts.

The Problem with Simple RAG Jump to heading

Most RAG implementations follow this basic pattern:

def simple_rag(query: str) -> str:
    # 1. Embed the query
    query_embedding = embed_text(query)
    
    # 2. Find similar documents
    docs = vector_db.search(query_embedding, top_k=5)
    
    # 3. Generate response
    context = "\n".join([doc.content for doc in docs])
    response = llm.generate(f"Context: {context}\nQuery: {query}")
    
    return response

This works for demos, but falls apart in production:

  • Latency: Sequential embedding → search → generation takes 2-5 seconds
  • Relevance: Simple vector similarity misses nuanced queries
  • Context limits: Cramming random documents often exceeds token limits
  • Scalability: No caching, no concurrent processing, no smart routing

Production RAG Architecture Jump to heading

Here’s the architecture we’ll build, designed for scale and performance:

graph TB A[User Query] --> B[Query Router] B --> C[Query Analyzer] C --> D[Embedding Cache] D --> E[Vector Search] E --> F[Reranking Service] F --> G[Context Assembler] G --> H[LLM Gateway] H --> I[Response Cache] I --> J[User Response] K[Document Ingestion] --> L[Content Processor] L --> M[Chunking Service] M --> N[Embedding Service] N --> O[Vector Database] O --> E P[Feedback Loop] --> Q[Analytics Engine] Q --> R[Model Retraining] R --> N style B fill:#e1f5fe style F fill:#f3e5f5 style H fill:#e8f5e8

Let’s break down each component:

1. Query Analysis and Routing Jump to heading

Not all queries are created equal. Some need real-time data, others benefit from cached responses:

interface QueryAnalysis {
  intent: 'factual' | 'creative' | 'analytical' | 'conversational';
  complexity: 'simple' | 'medium' | 'complex';
  domains: string[];
  timeRelevance: 'historical' | 'recent' | 'real-time';
  cacheability: number; // 0-1 score
}

class QueryRouter {
  async analyzeQuery(query: string): Promise<QueryAnalysis> {
    // Use a lightweight model for classification
    const features = await this.extractFeatures(query);
    return this.classifier.predict(features);
  }
  
  async route(query: string): Promise<SearchStrategy> {
    const analysis = await this.analyzeQuery(query);
    
    if (analysis.cacheability > 0.8) {
      return new CachedSearch();
    }
    
    if (analysis.complexity === 'complex') {
      return new MultiStageSearch();
    }
    
    return new StandardSearch();
  }
}

2. Smart Document Chunking Jump to heading

Traditional fixed-size chunking destroys semantic coherence. We need smarter strategies:

from typing import List, Tuple
import spacy

class SemanticChunker:
    def __init__(self, max_chunk_size: int = 512, overlap: int = 50):
        self.nlp = spacy.load("en_core_web_sm")
        self.max_chunk_size = max_chunk_size
        self.overlap = overlap
    
    def chunk_document(self, text: str) -> List[Tuple[str, dict]]:
        doc = self.nlp(text)
        
        chunks = []
        current_chunk = ""
        current_entities = set()
        
        for sent in doc.sents:
            # Check if adding this sentence exceeds limits
            if len(current_chunk) + len(sent.text) > self.max_chunk_size:
                if current_chunk:
                    chunks.append((current_chunk.strip(), {
                        'entities': list(current_entities),
                        'sentence_count': len(current_chunk.split('.')),
                        'chunk_type': 'semantic'
                    }))
                
                # Start new chunk with overlap
                current_chunk = self._get_overlap(chunks[-1][0] if chunks else "")
                current_entities = set()
            
            current_chunk += " " + sent.text
            current_entities.update([ent.text for ent in sent.ents])
        
        # Don't forget the last chunk
        if current_chunk.strip():
            chunks.append((current_chunk.strip(), {
                'entities': list(current_entities),
                'sentence_count': len(current_chunk.split('.')),
                'chunk_type': 'semantic'
            }))
        
        return chunks

Instead of flat vector search, we use a hierarchical approach for better precision and recall:

class HierarchicalVectorSearch:
    def __init__(self):
        self.coarse_index = self._build_coarse_index()
        self.fine_indexes = {}
    
    async def search(self, query_embedding: np.ndarray, k: int = 10) -> List[Document]:
        # Stage 1: Coarse search to identify relevant clusters
        cluster_scores = self.coarse_index.search(query_embedding, k=20)
        relevant_clusters = [c for c, score in cluster_scores if score > 0.7]
        
        # Stage 2: Fine search within relevant clusters
        candidates = []
        for cluster_id in relevant_clusters:
            fine_index = self.fine_indexes[cluster_id]
            cluster_results = fine_index.search(query_embedding, k=k//2)
            candidates.extend(cluster_results)
        
        # Stage 3: Global reranking
        return self._rerank_global(candidates, query_embedding, k)
    
    def _rerank_global(self, candidates: List[Document], 
                      query_embedding: np.ndarray, k: int) -> List[Document]:
        # Combine multiple signals for reranking
        scores = []
        for doc in candidates:
            vector_score = cosine_similarity(query_embedding, doc.embedding)
            bm25_score = self._compute_bm25(doc, query_embedding)
            freshness_score = self._compute_freshness(doc)
            
            # Weighted combination
            final_score = (
                0.6 * vector_score + 
                0.3 * bm25_score + 
                0.1 * freshness_score
            )
            scores.append((doc, final_score))
        
        # Return top-k
        scores.sort(key=lambda x: x[1], reverse=True)
        return [doc for doc, _ in scores[:k]]

4. Context Assembly and Optimization Jump to heading

Raw document concatenation wastes tokens. We need intelligent context assembly:

interface ContextWindow {
  maxTokens: number;
  reservedForResponse: number;
  systemPromptTokens: number;
}

class ContextAssembler {
  async assembleContext(
    query: string,
    documents: Document[],
    window: ContextWindow
  ): Promise<string> {
    const availableTokens = window.maxTokens - 
      window.reservedForResponse - 
      window.systemPromptTokens;
    
    // Prioritize documents by relevance and token efficiency
    const prioritized = await this.prioritizeDocuments(query, documents);
    
    let context = "";
    let usedTokens = 0;
    
    for (const doc of prioritized) {
      const docTokens = this.countTokens(doc.content);
      
      if (usedTokens + docTokens <= availableTokens) {
        context += this.formatDocument(doc);
        usedTokens += docTokens;
      } else {
        // Try to fit a summary instead
        const summary = await this.summarizeDocument(doc, 
          availableTokens - usedTokens);
        if (summary) {
          context += summary;
          break;
        }
      }
    }
    
    return context;
  }
  
  private async prioritizeDocuments(
    query: string, 
    documents: Document[]
  ): Promise<Document[]> {
    // Score documents by multiple factors
    const scored = documents.map(doc => ({
      document: doc,
      score: this.calculateRelevanceScore(query, doc)
    }));
    
    return scored
      .sort((a, b) => b.score - a.score)
      .map(item => item.document);
  }
}

5. Performance Optimizations Jump to heading

Caching Strategy Jump to heading

class RAGCache:
    def __init__(self):
        self.embedding_cache = TTLCache(maxsize=10000, ttl=3600)
        self.response_cache = TTLCache(maxsize=1000, ttl=1800)
        self.search_cache = TTLCache(maxsize=5000, ttl=600)
    
    async def get_or_embed(self, text: str) -> np.ndarray:
        cache_key = hashlib.sha256(text.encode()).hexdigest()
        
        if cache_key in self.embedding_cache:
            return self.embedding_cache[cache_key]
        
        embedding = await self.embedding_service.embed(text)
        self.embedding_cache[cache_key] = embedding
        return embedding
    
    async def get_or_search(self, query_embedding: np.ndarray, 
                           params: SearchParams) -> List[Document]:
        cache_key = self._create_search_key(query_embedding, params)
        
        if cache_key in self.search_cache:
            return self.search_cache[cache_key]
        
        results = await self.vector_search.search(query_embedding, params)
        self.search_cache[cache_key] = results
        return results

Async Processing Pipeline Jump to heading

import asyncio
from concurrent.futures import ThreadPoolExecutor

class AsyncRAGPipeline:
    def __init__(self):
        self.embedding_executor = ThreadPoolExecutor(max_workers=4)
        self.search_executor = ThreadPoolExecutor(max_workers=8)
    
    async def process_query(self, query: str) -> str:
        # Start all operations concurrently
        embed_task = asyncio.create_task(
            self._embed_with_cache(query)
        )
        
        analysis_task = asyncio.create_task(
            self.query_analyzer.analyze(query)
        )
        
        # Wait for embedding and analysis
        query_embedding, analysis = await asyncio.gather(
            embed_task, analysis_task
        )
        
        # Search with analysis-informed parameters
        search_params = self._create_search_params(analysis)
        documents = await self._search_with_cache(
            query_embedding, search_params
        )
        
        # Assemble context and generate response concurrently
        context_task = asyncio.create_task(
            self.context_assembler.assemble(query, documents)
        )
        
        context = await context_task
        response = await self.llm_gateway.generate(query, context)
        
        return response

Monitoring and Observability Jump to heading

Production RAG systems need comprehensive monitoring:

from prometheus_client import Counter, Histogram, Gauge
import structlog

# Metrics
rag_requests_total = Counter('rag_requests_total', 'Total RAG requests', ['status'])
rag_latency = Histogram('rag_latency_seconds', 'RAG request latency')
vector_search_latency = Histogram('vector_search_latency_seconds', 'Vector search latency')
context_length = Histogram('context_length_tokens', 'Context length in tokens')

logger = structlog.get_logger()

class ObservableRAG:
    async def process_query(self, query: str) -> str:
        start_time = time.time()
        
        try:
            with rag_latency.time():
                result = await self._process_internal(query)
            
            rag_requests_total.labels(status='success').inc()
            
            logger.info("RAG request completed", 
                       query_length=len(query),
                       response_length=len(result),
                       latency=time.time() - start_time)
            
            return result
            
        except Exception as e:
            rag_requests_total.labels(status='error').inc()
            logger.error("RAG request failed", 
                        error=str(e),
                        query_length=len(query))
            raise

Performance Results Jump to heading

Here’s what this architecture achieves in production:

Metric Simple RAG Optimized RAG Improvement
P95 Latency 4.2s 850ms 80% faster
Relevance@10 0.65 0.83 28% better
Cache Hit Rate 0% 73% 73% reduction in compute
Concurrent Users 50 2000+ 40x scale

Mathematical Analysis Jump to heading

The expected latency for our optimized pipeline follows:

$$ E[L_{total}] = E[L_{embed}] + E[L_{search}] + E[L_{rerank}] + E[L_{generate}] $$

With caching, this becomes:

$$ E[L_{cached}] = h \cdot 0 + (1-h) \cdot E[L_{total}] $$

where $h$ is the cache hit rate. At 73% cache hit rate:

$$ E[L_{cached}] = 0.73 \cdot 0 + 0.27 \cdot 3.2s = 864ms $$

This matches our observed P95 latency of 850ms.

Next Steps Jump to heading

In the next post of this series, we’ll dive into:

  • Advanced reranking strategies using cross-encoders
  • Hybrid search combining vector and keyword search
  • Real-time document updates without full reindexing
  • Multi-modal RAG with images and structured data

Key Takeaways Jump to heading

  1. Production RAG ≠ Demo RAG: Real systems need caching, async processing, and intelligent routing
  2. Context is king: Smart document chunking and context assembly matter more than vector similarity alone
  3. Monitor everything: RAG systems are complex - you need observability at every layer
  4. Cache aggressively: Embeddings, search results, and responses all benefit from caching
  5. Think hierarchically: Multi-stage search beats flat similarity search every time

Building scalable RAG is challenging, but with the right architecture, you can create systems that delight users while handling massive scale.

What RAG challenges are you facing? Email me - I’d love to hear about your use cases and help solve tricky problems.


Coming up next: “Advanced Reranking Strategies for RAG Systems” - Part 2 of the RAG-Obsidian series.