Building a RAG System

This tutorial guides you through building a complete Retrieval-Augmented Generation (RAG) system using Arshai. You’ll create a document-based question-answering system that combines vector search with LLM reasoning.

What You’ll Build:

  • Document ingestion and chunking pipeline

  • Vector embeddings and storage system

  • Semantic search with ranking

  • RAG-enabled conversational agent

  • Complete Q&A application

What You’ll Learn:

  • Document processing strategies

  • Embedding generation and storage

  • Vector similarity search

  • RAG agent implementation

  • Production patterns

Prerequisites:

  • Python 3.9+

  • Arshai with extras: pip install arshai[openai,milvus]

  • OpenAI API key

  • Milvus running (Docker or cloud)

Time to Complete: 60-90 minutes

Project Setup

Install Dependencies

# Create project
mkdir rag-system
cd rag-system

# Create virtual environment
python -m venv venv
source venv/bin/activate

# Install dependencies
pip install arshai[openai,milvus] python-dotenv

# Create project structure
mkdir -p documents data
touch .env
touch rag_system.py
touch document_processor.py
touch rag_agent.py

Configure Environment

Create .env file:

# .env
OPENAI_API_KEY=your-openai-key
MILVUS_HOST=localhost
MILVUS_PORT=19530
MILVUS_DB_NAME=rag_db

Start Milvus (Docker):

# Download docker-compose
wget https://github.com/milvus-io/milvus/releases/download/v2.3.0/milvus-standalone-docker-compose.yml -O docker-compose.yml

# Start Milvus
docker-compose up -d

# Verify
docker-compose ps

Step 1: Document Processing

Create document processor for chunking:

# document_processor.py
from typing import List, Dict, Any
from pathlib import Path
import re

class DocumentChunker:
    """Process and chunk documents for RAG."""

    def __init__(self, chunk_size: int = 500, chunk_overlap: int = 50):
        """
        Initialize chunker.

        Args:
            chunk_size: Target size for each chunk in characters
            chunk_overlap: Overlap between chunks in characters
        """
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap

    def load_document(self, file_path: str) -> str:
        """Load document from file."""
        path = Path(file_path)

        if not path.exists():
            raise FileNotFoundError(f"File not found: {file_path}")

        # Handle different file types
        if path.suffix == '.txt':
            with open(path, 'r', encoding='utf-8') as f:
                return f.read()
        elif path.suffix == '.md':
            with open(path, 'r', encoding='utf-8') as f:
                return f.read()
        else:
            raise ValueError(f"Unsupported file type: {path.suffix}")

    def chunk_text(self, text: str) -> List[Dict[str, Any]]:
        """
        Chunk text into smaller pieces with metadata.

        Args:
            text: Text to chunk

        Returns:
            List of chunks with metadata
        """
        # Clean text
        text = self._clean_text(text)

        # Split into sentences
        sentences = self._split_into_sentences(text)

        # Create chunks
        chunks = []
        current_chunk = []
        current_length = 0

        for sentence in sentences:
            sentence_length = len(sentence)

            # If adding this sentence exceeds chunk size
            if current_length + sentence_length > self.chunk_size and current_chunk:
                # Save current chunk
                chunk_text = ' '.join(current_chunk)
                chunks.append({
                    'text': chunk_text,
                    'length': len(chunk_text),
                    'sentence_count': len(current_chunk)
                })

                # Start new chunk with overlap
                overlap_sentences = self._get_overlap_sentences(
                    current_chunk,
                    self.chunk_overlap
                )
                current_chunk = overlap_sentences
                current_length = sum(len(s) for s in current_chunk)

            # Add sentence to current chunk
            current_chunk.append(sentence)
            current_length += sentence_length

        # Add final chunk
        if current_chunk:
            chunk_text = ' '.join(current_chunk)
            chunks.append({
                'text': chunk_text,
                'length': len(chunk_text),
                'sentence_count': len(current_chunk)
            })

        return chunks

    def process_document(
        self,
        file_path: str,
        metadata: Dict[str, Any] = None
    ) -> List[Dict[str, Any]]:
        """
        Process document into chunks with metadata.

        Args:
            file_path: Path to document
            metadata: Additional metadata for chunks

        Returns:
            List of processed chunks
        """
        # Load document
        text = self.load_document(file_path)

        # Chunk text
        chunks = self.chunk_text(text)

        # Add metadata
        file_name = Path(file_path).name
        base_metadata = metadata or {}

        processed_chunks = []
        for i, chunk in enumerate(chunks):
            chunk_data = {
                'id': f"{file_name}_{i}",
                'text': chunk['text'],
                'source': file_name,
                'chunk_index': i,
                'total_chunks': len(chunks),
                **base_metadata
            }
            processed_chunks.append(chunk_data)

        return processed_chunks

    def _clean_text(self, text: str) -> str:
        """Clean and normalize text."""
        # Remove extra whitespace
        text = re.sub(r'\s+', ' ', text)
        # Remove special characters (optional)
        text = text.strip()
        return text

    def _split_into_sentences(self, text: str) -> List[str]:
        """Split text into sentences."""
        # Simple sentence splitting (can be improved)
        sentences = re.split(r'(?<=[.!?])\s+', text)
        return [s.strip() for s in sentences if s.strip()]

    def _get_overlap_sentences(
        self,
        sentences: List[str],
        target_overlap: int
    ) -> List[str]:
        """Get sentences for overlap."""
        overlap_sentences = []
        overlap_length = 0

        # Take sentences from the end
        for sentence in reversed(sentences):
            if overlap_length >= target_overlap:
                break
            overlap_sentences.insert(0, sentence)
            overlap_length += len(sentence)

        return overlap_sentences

Step 2: Build RAG Agent

Create the RAG agent with retrieval:

# rag_agent.py
import asyncio
from typing import List, Dict, Any
from dotenv import load_dotenv

from arshai.agents.base import BaseAgent
from arshai.core.interfaces.iagent import IAgentInput
from arshai.core.interfaces.illm import ILLMInput
from arshai.embeddings.openai_embeddings import OpenAIEmbedding
from arshai.core.interfaces.iembedding import EmbeddingConfig
from arshai.vector_db.milvus_client import MilvusClient
from arshai.core.interfaces.ivector_db_client import ICollectionConfig
from pydantic import BaseModel

load_dotenv()

class DocumentChunk(BaseModel):
    """Schema for document chunks in vector DB."""
    id: str
    text: str
    source: str
    chunk_index: int
    embedding: List[float]

class RAGAgent(BaseAgent):
    """Retrieval-Augmented Generation agent."""

    def __init__(
        self,
        llm_client,
        embedding_service,
        vector_client,
        collection_name: str = "knowledge_base",
        top_k: int = 3
    ):
        """
        Initialize RAG agent.

        Args:
            llm_client: LLM client for generation
            embedding_service: Service for generating embeddings
            vector_client: Vector database client
            collection_name: Name of vector collection
            top_k: Number of documents to retrieve
        """
        super().__init__(
            llm_client,
            system_prompt="""You are a helpful AI assistant with access to a knowledge base.
            Use the provided context to answer questions accurately.
            If the context doesn't contain the answer, say so clearly.
            Always cite the source when using information from the context."""
        )
        self.embedding_service = embedding_service
        self.vector_client = vector_client
        self.top_k = top_k

        # Configure collection
        self.collection_config = ICollectionConfig(
            collection_name=collection_name,
            dimension=embedding_service.dimension,
            model_class=DocumentChunk
        )

        # Ensure collection exists
        self.vector_client.get_or_create_collection(self.collection_config)

    async def process(self, input: IAgentInput) -> Dict[str, Any]:
        """Process query with RAG."""

        # Step 1: Retrieve relevant documents
        retrieved_docs = await self._retrieve_documents(input.message)

        # Step 2: Build context from retrieved documents
        context = self._build_context(retrieved_docs)

        # Step 3: Generate response with context
        response = await self._generate_response(input.message, context)

        return {
            'response': response,
            'sources': [doc['source'] for doc in retrieved_docs],
            'retrieved_chunks': len(retrieved_docs),
            'context': context
        }

    async def _retrieve_documents(self, query: str) -> List[Dict[str, Any]]:
        """Retrieve relevant documents from vector store."""

        # Generate query embedding
        query_result = self.embedding_service.embed_query(query)
        query_embedding = query_result['embedding']

        # Search vector database
        search_results = self.vector_client.search(
            config=self.collection_config,
            query_vectors=[query_embedding],
            limit=self.top_k,
            output_fields=["id", "text", "source", "chunk_index"]
        )

        # Extract and format results
        retrieved_docs = []
        if search_results and len(search_results) > 0:
            for result in search_results[0]:
                retrieved_docs.append({
                    'text': result.get('text', ''),
                    'source': result.get('source', 'unknown'),
                    'chunk_index': result.get('chunk_index', 0),
                    'distance': result.get('distance', 0.0)
                })

        return retrieved_docs

    def _build_context(self, retrieved_docs: List[Dict[str, Any]]) -> str:
        """Build context string from retrieved documents."""
        if not retrieved_docs:
            return "No relevant information found in the knowledge base."

        context_parts = []
        for i, doc in enumerate(retrieved_docs, 1):
            context_parts.append(
                f"[Source {i}: {doc['source']}]\n{doc['text']}"
            )

        return "\n\n".join(context_parts)

    async def _generate_response(self, query: str, context: str) -> str:
        """Generate response using LLM with context."""

        # Build enhanced prompt
        enhanced_prompt = f"""Context from knowledge base:
{context}

Question: {query}

Please provide a detailed answer based on the context above. If the context doesn't contain enough information, say so clearly."""

        # Call LLM
        llm_input = ILLMInput(
            system_prompt=self.system_prompt,
            user_message=enhanced_prompt
        )

        result = await self.llm_client.chat(llm_input)
        return result.get('llm_response', 'Unable to generate response')

    def add_documents(self, chunks: List[Dict[str, Any]]):
        """Add document chunks to vector store."""

        # Extract texts for embedding
        texts = [chunk['text'] for chunk in chunks]

        # Generate embeddings
        embedding_result = self.embedding_service.embed_documents(texts)
        embeddings = embedding_result['embeddings']

        # Prepare data for insertion
        insert_data = []
        for i, chunk in enumerate(chunks):
            insert_data.append({
                'id': chunk.get('id', f'chunk_{i}'),
                'text': chunk['text'],
                'source': chunk.get('source', 'unknown'),
                'chunk_index': chunk.get('chunk_index', i),
                'embedding': embeddings[i]
            })

        # Insert into vector database
        self.vector_client.insert(
            config=self.collection_config,
            data=insert_data
        )

        print(f"✓ Added {len(chunks)} document chunks to knowledge base")

Step 3: Build Complete RAG System

Create the main application:

# rag_system.py
import asyncio
import os
from pathlib import Path
from typing import List
from dotenv import load_dotenv

from arshai.llms.openai import OpenAIClient
from arshai.core.interfaces.illm import ILLMConfig
from arshai.embeddings.openai_embeddings import OpenAIEmbedding
from arshai.core.interfaces.iembedding import EmbeddingConfig
from arshai.vector_db.milvus_client import MilvusClient

from document_processor import DocumentChunker
from rag_agent import RAGAgent

load_dotenv()

class RAGSystem:
    """Complete RAG system with document ingestion and Q&A."""

    def __init__(self):
        """Initialize RAG system components."""

        # Create LLM client
        self.llm_client = OpenAIClient(
            ILLMConfig(
                model="gpt-3.5-turbo",
                temperature=0.7,
                max_tokens=500
            )
        )

        # Create embedding service
        self.embedding_service = OpenAIEmbedding(
            EmbeddingConfig(
                model_name="text-embedding-3-small",
                batch_size=100
            )
        )

        # Create vector database client
        self.vector_client = MilvusClient()

        # Create document processor
        self.document_processor = DocumentChunker(
            chunk_size=500,
            chunk_overlap=50
        )

        # Create RAG agent
        self.rag_agent = RAGAgent(
            llm_client=self.llm_client,
            embedding_service=self.embedding_service,
            vector_client=self.vector_client,
            collection_name="knowledge_base",
            top_k=3
        )

        print("✓ RAG system initialized")

    def ingest_documents(self, document_paths: List[str]):
        """
        Ingest documents into the knowledge base.

        Args:
            document_paths: List of paths to documents
        """
        all_chunks = []

        for doc_path in document_paths:
            try:
                print(f"Processing: {doc_path}")

                # Process document
                chunks = self.document_processor.process_document(
                    doc_path,
                    metadata={'type': 'documentation'}
                )

                all_chunks.extend(chunks)
                print(f"  ✓ Created {len(chunks)} chunks")

            except Exception as e:
                print(f"  ✗ Error processing {doc_path}: {e}")

        if all_chunks:
            # Add to vector store
            print(f"\nAdding {len(all_chunks)} chunks to knowledge base...")
            self.rag_agent.add_documents(all_chunks)
            print("✓ Document ingestion complete")
        else:
            print("No chunks to add")

    async def query(self, question: str) -> dict:
        """
        Query the RAG system.

        Args:
            question: User question

        Returns:
            Response with answer and sources
        """
        from arshai.core.interfaces.iagent import IAgentInput

        result = await self.rag_agent.process(
            IAgentInput(message=question)
        )

        return result

    async def interactive_session(self):
        """Run interactive Q&A session."""
        print("\n" + "=" * 60)
        print("RAG System - Interactive Q&A")
        print("=" * 60)
        print("\nCommands:")
        print("  /quit   - Exit")
        print("  /stats  - Show system statistics")
        print("\nAsk questions about your documents!\n")

        while True:
            try:
                # Get user question
                question = input("\n❓ Question: ").strip()

                if not question:
                    continue

                # Handle commands
                if question.lower() == '/quit':
                    print("\n👋 Goodbye!")
                    break

                if question.lower() == '/stats':
                    self._show_statistics()
                    continue

                # Process question
                print("🔍 Searching knowledge base...", end="", flush=True)
                result = await self.query(question)
                print("\r" + " " * 40 + "\r", end="")

                # Display answer
                print(f"\n💡 Answer:\n{result['response']}\n")

                # Display sources
                if result['sources']:
                    print(f"📚 Sources: {', '.join(set(result['sources']))}")
                    print(f"📊 Retrieved {result['retrieved_chunks']} relevant chunks")

            except KeyboardInterrupt:
                print("\n\n👋 Goodbye!")
                break
            except Exception as e:
                print(f"\n❌ Error: {e}")

    def _show_statistics(self):
        """Show system statistics."""
        print("\n📊 System Statistics:")
        print(f"  Embedding Model: {self.embedding_service.model_name}")
        print(f"  Embedding Dimension: {self.embedding_service.dimension}")
        print(f"  LLM Model: {self.llm_client.config.model}")
        print(f"  Top-K Retrieval: {self.rag_agent.top_k}")

async def main():
    """Main application entry point."""

    # Create RAG system
    rag_system = RAGSystem()

    # Ingest documents
    documents_dir = Path("documents")
    if documents_dir.exists():
        document_files = list(documents_dir.glob("*.txt")) + \
                        list(documents_dir.glob("*.md"))

        if document_files:
            print(f"\nFound {len(document_files)} documents to ingest\n")
            rag_system.ingest_documents([str(f) for f in document_files])
        else:
            print("\n⚠️ No documents found in 'documents/' directory")
            print("Add .txt or .md files to get started\n")
    else:
        print("\n⚠️ 'documents/' directory not found")
        documents_dir.mkdir()
        print("Created 'documents/' directory - add your files there\n")

    # Start interactive session
    await rag_system.interactive_session()

if __name__ == "__main__":
    asyncio.run(main())

Step 4: Test the System

Create sample documents:

# Create sample document
cat > documents/arshai_intro.txt << 'EOF'
Arshai Framework

Arshai is a framework for building agentic AI systems. It provides three main layers:

Layer 1 - LLM Clients: Standardized interfaces for different language model providers including OpenAI, Google Gemini, Azure OpenAI, and OpenRouter.

Layer 2 - Agents: Building blocks for creating conversational agents with BaseAgent as the foundation. Agents can have memory, use tools, and handle complex interactions.

Layer 3 - Systems: Patterns for composing multiple agents into complete agentic systems through orchestration and workflow management.

The framework follows these core principles:
- Direct Control: Developers explicitly create and configure all components
- Building Blocks: Framework provides foundations, developers build solutions
- Progressive Complexity: Start simple and scale to sophisticated systems as needed
EOF

Run the system:

python rag_system.py

Test queries:

RAG System - Interactive Q&A
============================================================

Commands:
  /quit   - Exit
  /stats  - Show system statistics

Ask questions about your documents!

❓ Question: What is Arshai?

💡 Answer:
Arshai is a framework for building agentic AI systems. It provides a three-layer architecture consisting of LLM Clients (Layer 1), Agents (Layer 2), and Systems (Layer 3). The framework emphasizes direct control, provides building blocks for developers, and supports progressive complexity.

📚 Sources: arshai_intro.txt
📊 Retrieved 3 relevant chunks

❓ Question: What are the three layers?

💡 Answer:
The three layers in Arshai are:
1. Layer 1 - LLM Clients: Standardized interfaces for language model providers
2. Layer 2 - Agents: Building blocks for conversational agents
3. Layer 3 - Systems: Patterns for composing agents into complete systems

📚 Sources: arshai_intro.txt
📊 Retrieved 3 relevant chunks

Step 5: Add Advanced Features

Hybrid Search with Reranking

class AdvancedRAGAgent(RAGAgent):
    """RAG agent with reranking."""

    async def _retrieve_documents(self, query: str) -> List[Dict[str, Any]]:
        """Retrieve with reranking."""

        # Step 1: Initial retrieval (more candidates)
        query_result = self.embedding_service.embed_query(query)
        query_embedding = query_result['embedding']

        search_results = self.vector_client.search(
            config=self.collection_config,
            query_vectors=[query_embedding],
            limit=self.top_k * 3,  # Get 3x candidates
            output_fields=["id", "text", "source", "chunk_index"]
        )

        # Step 2: Rerank using cross-encoder (simplified)
        candidates = []
        if search_results and len(search_results) > 0:
            for result in search_results[0]:
                candidates.append({
                    'text': result.get('text', ''),
                    'source': result.get('source', 'unknown'),
                    'chunk_index': result.get('chunk_index', 0),
                    'distance': result.get('distance', 0.0),
                    'score': self._calculate_relevance_score(
                        query,
                        result.get('text', '')
                    )
                })

        # Sort by reranked score and return top-k
        candidates.sort(key=lambda x: x['score'], reverse=True)
        return candidates[:self.top_k]

    def _calculate_relevance_score(self, query: str, text: str) -> float:
        """Simple relevance scoring (can be improved with cross-encoder)."""
        # Simple keyword matching score
        query_terms = set(query.lower().split())
        text_terms = set(text.lower().split())

        if not query_terms:
            return 0.0

        overlap = len(query_terms & text_terms)
        score = overlap / len(query_terms)

        return score

Conversation Memory with RAG

from arshai.memory.working_memory.in_memory_manager import InMemoryManager

class ConversationalRAGAgent(RAGAgent):
    """RAG agent with conversation memory."""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.memory_manager = InMemoryManager()
        self.conversation_id = "rag_session"

    async def process(self, input):
        """Process with conversation history."""

        # Get conversation history
        history = self._get_conversation_history()

        # Enhance query with history context
        enhanced_query = self._build_query_with_history(
            input.message,
            history
        )

        # Standard RAG processing
        result = await super().process(
            IAgentInput(message=enhanced_query)
        )

        # Store conversation turn
        self._store_conversation_turn(
            input.message,
            result['response']
        )

        return result

    def _get_conversation_history(self) -> str:
        """Get recent conversation history."""
        from arshai.core.interfaces.imemorymanager import IMemoryInput
        from arshai.memory.memory_types import ConversationMemoryType

        try:
            memory_input = IMemoryInput(
                conversation_id=self.conversation_id,
                memory_type=ConversationMemoryType.WORKING_MEMORY
            )
            memories = self.memory_manager.retrieve(memory_input)
            if memories:
                return memories[0].working_memory
        except:
            pass

        return ""

    def _build_query_with_history(self, query: str, history: str) -> str:
        """Build query enhanced with conversation history."""
        if history:
            # Only use last 3 turns
            history_lines = history.split('\n')[-6:]  # 3 turns * 2 lines
            recent_history = '\n'.join(history_lines)
            return f"Previous conversation:\n{recent_history}\n\nCurrent question: {query}"
        return query

    def _store_conversation_turn(self, question: str, answer: str):
        """Store conversation turn."""
        from arshai.core.interfaces.imemorymanager import IMemoryInput, IWorkingMemory
        from arshai.memory.memory_types import ConversationMemoryType

        try:
            history = self._get_conversation_history()
            new_turn = f"\nQ: {question}\nA: {answer}"
            updated_history = history + new_turn

            memory_input = IMemoryInput(
                conversation_id=self.conversation_id,
                memory_type=ConversationMemoryType.WORKING_MEMORY,
                data=[IWorkingMemory(working_memory=updated_history)]
            )
            self.memory_manager.store(memory_input)
        except Exception as e:
            print(f"Error storing conversation: {e}")

Document Metadata Filtering

class FilteredRAGAgent(RAGAgent):
    """RAG with metadata filtering."""

    async def process(self, input: IAgentInput) -> Dict[str, Any]:
        """Process with optional metadata filters."""

        # Extract filters from input metadata
        filters = input.metadata or {}
        document_type = filters.get('document_type')
        date_range = filters.get('date_range')

        # Build filter expression
        filter_expr = self._build_filter_expression(
            document_type=document_type,
            date_range=date_range
        )

        # Retrieve with filters
        retrieved_docs = await self._retrieve_documents(
            input.message,
            filter_expr=filter_expr
        )

        # Rest of processing...
        context = self._build_context(retrieved_docs)
        response = await self._generate_response(input.message, context)

        return {
            'response': response,
            'sources': [doc['source'] for doc in retrieved_docs],
            'filters_applied': filter_expr
        }

    def _build_filter_expression(self, **kwargs) -> str:
        """Build Milvus filter expression."""
        filters = []

        if kwargs.get('document_type'):
            filters.append(f"type == '{kwargs['document_type']}'")

        if kwargs.get('date_range'):
            start, end = kwargs['date_range']
            filters.append(f"date >= '{start}' and date <= '{end}'")

        return " and ".join(filters) if filters else ""

Production Deployment

Environment-Based Configuration

import os

class ProductionRAGSystem(RAGSystem):
    """Production-ready RAG system."""

    def __init__(self):
        """Initialize with environment-based config."""

        # Determine environment
        env = os.getenv('ENVIRONMENT', 'development')

        # Environment-specific configuration
        if env == 'production':
            llm_config = ILLMConfig(
                model="gpt-4",  # Better model for production
                temperature=0.3,  # Lower temperature for consistency
                max_tokens=1000
            )

            # Use Redis for production memory
            from arshai.memory.working_memory.redis_memory_manager import RedisWorkingMemoryManager
            memory_manager = RedisWorkingMemoryManager()

            chunk_size = 1000  # Larger chunks
            top_k = 5  # More context
        else:
            llm_config = ILLMConfig(
                model="gpt-3.5-turbo",
                temperature=0.7,
                max_tokens=500
            )

            memory_manager = InMemoryManager()
            chunk_size = 500
            top_k = 3

        # Initialize components with env-specific config
        # ... rest of initialization

Error Handling and Monitoring

import logging
from datetime import datetime

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('rag_system.log'),
        logging.StreamHandler()
    ]
)

class MonitoredRAGAgent(RAGAgent):
    """RAG agent with monitoring."""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.logger = logging.getLogger('RAGAgent')
        self.query_count = 0
        self.error_count = 0

    async def process(self, input: IAgentInput) -> Dict[str, Any]:
        """Process with monitoring."""
        self.query_count += 1
        start_time = datetime.now()

        try:
            self.logger.info(f"Processing query: {input.message[:100]}")

            result = await super().process(input)

            # Log success
            duration = (datetime.now() - start_time).total_seconds()
            self.logger.info(
                f"Query successful - Duration: {duration:.2f}s, "
                f"Sources: {len(result['sources'])}"
            )

            return result

        except Exception as e:
            self.error_count += 1
            self.logger.error(f"Query failed: {e}", exc_info=True)

            # Return error response
            return {
                'response': 'I encountered an error processing your question. Please try again.',
                'error': str(e),
                'sources': []
            }

Next Steps

Enhance the RAG system:

  • Add support for more document types (PDF, DOCX, HTML)

  • Implement citation tracking and verification

  • Add query expansion and rephrasing

  • Implement answer validation

Scale for production:

  • Deploy with Redis memory: Redis Memory Manager

  • Use production vector database (Zilliz Cloud, Pinecone)

  • Add caching layer for frequent queries

  • Implement rate limiting and authentication

Learn more:

Congratulations! You’ve built a complete RAG system with Arshai! 🎉