Memory Reference Implementations

This section documents the reference memory management implementations provided with Arshai. These demonstrate different approaches to storing and managing conversation memory using various storage backends.

Note

Reference Implementation Philosophy

These memory implementations are not part of the core framework. They represent working examples of how to implement the IMemoryManager interface for different storage needs. You can:

  • Use them directly if they meet your requirements

  • Modify them for your specific storage needs

  • Learn patterns to build your own memory implementations

  • Combine multiple approaches for different use cases

Available Reference Implementations

InMemoryManager (In-Memory Manager)

Simple in-memory storage with automatic cleanup. Perfect for development, testing, and simple applications that don’t require persistence.

RedisMemoryManager (Redis Memory Manager)

Redis-backed persistent storage with TTL support. Ideal for production applications that need scalable, persistent memory storage.

Memory Management Patterns

TTL-Based Cleanup

Automatic expiration of old memory entries to prevent unlimited growth.

Key-Based Organization

Structured key patterns for efficient storage and retrieval of conversation memories.

Metadata Handling

Storing additional context and metadata alongside working memory content.

Error Recovery

Graceful handling of storage failures and missing data scenarios.

Async Operations

Non-blocking memory operations that integrate seamlessly with agent processing.

Interface Compliance

All reference memory implementations follow the IMemoryManager interface:

from arshai.core.interfaces.imemorymanager import IMemoryManager, IMemoryInput

class IMemoryManager:
    """Interface for memory management implementations."""

    def store(self, input: IMemoryInput) -> str:
        """Store memory data and return storage key."""
        pass

    def retrieve(self, input: IMemoryInput) -> List[IWorkingMemory]:
        """Retrieve memory data matching the input criteria."""
        pass

    def update(self, input: IMemoryInput) -> None:
        """Update existing memory data."""
        pass

    def delete(self, input: IMemoryInput) -> None:
        """Delete memory data."""
        pass

Memory Input Structure

from arshai.core.interfaces.imemorymanager import IMemoryInput, IWorkingMemory
from arshai.memory.memory_types import ConversationMemoryType

# Creating memory input for storage
memory_input = IMemoryInput(
    conversation_id="user_123",
    memory_type=ConversationMemoryType.WORKING,
    data=[IWorkingMemory(working_memory="User prefers technical details")],
    metadata={"user_type": "developer", "session_id": "abc123"}
)

Usage Patterns

Basic Memory Operations

from arshai.memory.working_memory.in_memory_manager import InMemoryManager
from arshai.core.interfaces.imemorymanager import IMemoryInput, IWorkingMemory
from arshai.memory.memory_types import ConversationMemoryType

# Create memory manager
memory_manager = InMemoryManager(ttl=3600)  # 1 hour TTL

# Store memory
memory_data = IMemoryInput(
    conversation_id="conversation_123",
    memory_type=ConversationMemoryType.WORKING,
    data=[IWorkingMemory(working_memory="User is interested in machine learning")],
    metadata={"user_id": "user_456"}
)

key = memory_manager.store(memory_data)
print(f"Stored memory with key: {key}")

# Retrieve memory
query = IMemoryInput(
    conversation_id="conversation_123",
    memory_type=ConversationMemoryType.WORKING
)

memories = memory_manager.retrieve(query)
if memories:
    print(f"Retrieved memory: {memories[0].working_memory}")

Agent Integration

from arshai.agents.hub.working_memory import WorkingMemoryAgent
from arshai.memory.working_memory.redis_memory_manager import RedisMemoryManager

# Create Redis-backed memory manager
memory_manager = RedisMemoryManager(
    storage_url="redis://localhost:6379/1",
    ttl=60*60*24  # 24 hours
)

# Create memory-enabled agent
memory_agent = WorkingMemoryAgent(
    llm_client=llm_client,
    memory_manager=memory_manager
)

# Agent automatically manages memory
result = await memory_agent.process(IAgentInput(
    message="User mentioned they're a Python developer",
    metadata={"conversation_id": "dev_chat_123"}
))

Custom Memory Implementation

from arshai.core.interfaces.imemorymanager import IMemoryManager, IMemoryInput, IWorkingMemory
from typing import List
import sqlite3
import json
from datetime import datetime

class SQLiteMemoryManager(IMemoryManager):
    """Custom SQLite-based memory implementation."""

    def __init__(self, db_path: str = "memory.db"):
        self.db_path = db_path
        self._init_db()

    def _init_db(self):
        """Initialize SQLite database."""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS memories (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                conversation_id TEXT NOT NULL,
                memory_type TEXT NOT NULL,
                working_memory TEXT NOT NULL,
                metadata TEXT,
                created_at TEXT NOT NULL,
                last_update TEXT NOT NULL,
                UNIQUE(conversation_id, memory_type)
            )
        """)
        conn.commit()
        conn.close()

    def store(self, input: IMemoryInput) -> str:
        """Store memory in SQLite."""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        for data in input.data:
            cursor.execute("""
                INSERT OR REPLACE INTO memories
                (conversation_id, memory_type, working_memory, metadata, created_at, last_update)
                VALUES (?, ?, ?, ?, ?, ?)
            """, (
                input.conversation_id,
                str(input.memory_type),
                data.working_memory,
                json.dumps(input.metadata or {}),
                datetime.now().isoformat(),
                datetime.now().isoformat()
            ))

        conn.commit()
        conn.close()

        return f"{input.conversation_id}:{input.memory_type}"

    def retrieve(self, input: IMemoryInput) -> List[IWorkingMemory]:
        """Retrieve memory from SQLite."""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute("""
            SELECT working_memory FROM memories
            WHERE conversation_id = ? AND memory_type = ?
        """, (input.conversation_id, str(input.memory_type)))

        result = cursor.fetchone()
        conn.close()

        if result:
            return [IWorkingMemory(working_memory=result[0])]
        return []

    def update(self, input: IMemoryInput) -> None:
        """Update memory in SQLite."""
        # For SQLite, update is the same as store due to INSERT OR REPLACE
        self.store(input)

    def delete(self, input: IMemoryInput) -> None:
        """Delete memory from SQLite."""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute("""
            DELETE FROM memories
            WHERE conversation_id = ? AND memory_type = ?
        """, (input.conversation_id, str(input.memory_type)))

        conn.commit()
        conn.close()

Memory Storage Strategies

Choosing Storage Backend

In-Memory Storage
  • Use when: Development, testing, simple applications

  • Pros: Fast, no external dependencies, automatic cleanup

  • Cons: Not persistent, limited by application memory

  • Best for: Prototypes, single-instance applications

Redis Storage
  • Use when: Production applications, multiple instances, scalability needs

  • Pros: Persistent, scalable, shared across instances, built-in TTL

  • Cons: Requires Redis server, network dependency

  • Best for: Production systems, multi-instance deployments

Database Storage
  • Use when: Complex queries, reporting, data analysis needs

  • Pros: Rich queries, transactions, data integrity, reporting

  • Cons: More complex setup, potential performance overhead

  • Best for: Enterprise applications, complex memory requirements

Hybrid Approaches
  • Use when: Different memory types have different requirements

  • Pattern: In-memory for short-term, persistent for long-term

  • Example: Cache recent memories in-memory, archive to database

Memory Optimization Patterns

TTL Management

class TTLOptimizedMemoryManager:
    """Memory manager with intelligent TTL management."""

    def __init__(self, base_manager):
        self.base_manager = base_manager
        self.ttl_strategies = {
            "active_user": 60 * 60 * 24,      # 24 hours for active users
            "inactive_user": 60 * 60 * 2,     # 2 hours for inactive users
            "premium_user": 60 * 60 * 24 * 7,  # 7 days for premium users
        }

    def _get_ttl(self, metadata: dict) -> int:
        """Determine TTL based on user characteristics."""
        user_type = metadata.get("user_type", "active_user")
        return self.ttl_strategies.get(user_type, 60 * 60 * 12)

    def store(self, input: IMemoryInput) -> str:
        # Set appropriate TTL
        ttl = self._get_ttl(input.metadata or {})
        self.base_manager.ttl = ttl
        return self.base_manager.store(input)

Memory Compression

import gzip
import base64
from typing import List

class CompressedMemoryWrapper:
    """Wrapper that compresses memory content."""

    def __init__(self, base_manager):
        self.base_manager = base_manager

    def _compress(self, text: str) -> str:
        """Compress text using gzip."""
        compressed = gzip.compress(text.encode('utf-8'))
        return base64.b64encode(compressed).decode('utf-8')

    def _decompress(self, compressed_text: str) -> str:
        """Decompress text."""
        compressed = base64.b64decode(compressed_text.encode('utf-8'))
        return gzip.decompress(compressed).decode('utf-8')

    def store(self, input: IMemoryInput) -> str:
        # Compress memory content
        compressed_data = []
        for data in input.data:
            compressed_memory = self._compress(data.working_memory)
            compressed_data.append(IWorkingMemory(working_memory=compressed_memory))

        compressed_input = IMemoryInput(
            conversation_id=input.conversation_id,
            memory_type=input.memory_type,
            data=compressed_data,
            metadata={**input.metadata, "compressed": True}
        )

        return self.base_manager.store(compressed_input)

    def retrieve(self, input: IMemoryInput) -> List[IWorkingMemory]:
        memories = self.base_manager.retrieve(input)

        # Decompress if needed
        decompressed_memories = []
        for memory in memories:
            if input.metadata and input.metadata.get("compressed"):
                decompressed_content = self._decompress(memory.working_memory)
                decompressed_memories.append(IWorkingMemory(working_memory=decompressed_content))
            else:
                decompressed_memories.append(memory)

        return decompressed_memories

Memory Versioning

from datetime import datetime
from typing import List, Dict, Any

class VersionedMemoryManager:
    """Memory manager with version tracking."""

    def __init__(self, base_manager):
        self.base_manager = base_manager
        self.versions: Dict[str, List[Dict[str, Any]]] = {}

    def store(self, input: IMemoryInput) -> str:
        # Create version entry
        version_key = f"{input.conversation_id}:{input.memory_type}"

        if version_key not in self.versions:
            self.versions[version_key] = []

        # Store version metadata
        version_info = {
            "timestamp": datetime.now().isoformat(),
            "content": input.data[0].working_memory if input.data else "",
            "metadata": input.metadata or {}
        }

        self.versions[version_key].append(version_info)

        # Limit version history (keep last 10)
        if len(self.versions[version_key]) > 10:
            self.versions[version_key] = self.versions[version_key][-10:]

        return self.base_manager.store(input)

    def get_memory_history(self, conversation_id: str, memory_type) -> List[Dict[str, Any]]:
        """Get version history for a memory."""
        version_key = f"{conversation_id}:{memory_type}"
        return self.versions.get(version_key, [])

Testing Memory Implementations

Unit Testing

import pytest
from arshai.memory.working_memory.in_memory_manager import InMemoryManager
from arshai.core.interfaces.imemorymanager import IMemoryInput, IWorkingMemory
from arshai.memory.memory_types import ConversationMemoryType

@pytest.fixture
def memory_manager():
    return InMemoryManager(ttl=60)  # 1 minute TTL for testing

@pytest.fixture
def sample_memory_input():
    return IMemoryInput(
        conversation_id="test_conversation",
        memory_type=ConversationMemoryType.WORKING,
        data=[IWorkingMemory(working_memory="Test memory content")],
        metadata={"test": True}
    )

def test_store_and_retrieve(memory_manager, sample_memory_input):
    # Store memory
    key = memory_manager.store(sample_memory_input)
    assert key is not None

    # Retrieve memory
    query = IMemoryInput(
        conversation_id="test_conversation",
        memory_type=ConversationMemoryType.WORKING
    )

    memories = memory_manager.retrieve(query)
    assert len(memories) == 1
    assert memories[0].working_memory == "Test memory content"

def test_update_memory(memory_manager, sample_memory_input):
    # Store initial memory
    memory_manager.store(sample_memory_input)

    # Update memory
    update_input = IMemoryInput(
        conversation_id="test_conversation",
        memory_type=ConversationMemoryType.WORKING,
        data=[IWorkingMemory(working_memory="Updated memory content")]
    )

    memory_manager.update(update_input)

    # Verify update
    query = IMemoryInput(
        conversation_id="test_conversation",
        memory_type=ConversationMemoryType.WORKING
    )

    memories = memory_manager.retrieve(query)
    assert memories[0].working_memory == "Updated memory content"

def test_delete_memory(memory_manager, sample_memory_input):
    # Store memory
    memory_manager.store(sample_memory_input)

    # Delete memory
    memory_manager.delete(sample_memory_input)

    # Verify deletion
    query = IMemoryInput(
        conversation_id="test_conversation",
        memory_type=ConversationMemoryType.WORKING
    )

    memories = memory_manager.retrieve(query)
    assert len(memories) == 0

Integration Testing

import pytest
import time
from arshai.memory.working_memory.redis_memory_manager import RedisMemoryManager

@pytest.mark.integration
def test_redis_memory_integration():
    # Test with real Redis (requires Redis server)
    memory_manager = RedisMemoryManager(
        storage_url="redis://localhost:6379/15"  # Use test database
    )

    # Test basic operations
    memory_input = IMemoryInput(
        conversation_id="integration_test",
        memory_type=ConversationMemoryType.WORKING,
        data=[IWorkingMemory(working_memory="Integration test content")],
        metadata={"integration": True}
    )

    # Store and retrieve
    key = memory_manager.store(memory_input)
    assert key is not None

    memories = memory_manager.retrieve(memory_input)
    assert len(memories) == 1
    assert memories[0].working_memory == "Integration test content"

    # Cleanup
    memory_manager.delete(memory_input)

@pytest.mark.integration
def test_memory_ttl():
    memory_manager = InMemoryManager(ttl=1)  # 1 second TTL

    memory_input = IMemoryInput(
        conversation_id="ttl_test",
        memory_type=ConversationMemoryType.WORKING,
        data=[IWorkingMemory(working_memory="TTL test content")]
    )

    # Store memory
    memory_manager.store(memory_input)

    # Should be available immediately
    memories = memory_manager.retrieve(memory_input)
    assert len(memories) == 1

    # Wait for TTL expiration
    time.sleep(2)

    # Should be expired
    memories = memory_manager.retrieve(memory_input)
    assert len(memories) == 0

Performance Testing

import time
import asyncio
from concurrent.futures import ThreadPoolExecutor

def test_memory_performance():
    memory_manager = InMemoryManager()

    # Test storage performance
    start_time = time.time()

    for i in range(1000):
        memory_input = IMemoryInput(
            conversation_id=f"perf_test_{i}",
            memory_type=ConversationMemoryType.WORKING,
            data=[IWorkingMemory(working_memory=f"Performance test content {i}")]
        )
        memory_manager.store(memory_input)

    storage_time = time.time() - start_time
    print(f"Stored 1000 memories in {storage_time:.2f} seconds")

    # Test retrieval performance
    start_time = time.time()

    for i in range(1000):
        query = IMemoryInput(
            conversation_id=f"perf_test_{i}",
            memory_type=ConversationMemoryType.WORKING
        )
        memories = memory_manager.retrieve(query)
        assert len(memories) == 1

    retrieval_time = time.time() - start_time
    print(f"Retrieved 1000 memories in {retrieval_time:.2f} seconds")

Best Practices

Design Principles
  • Implement the IMemoryManager interface completely

  • Handle errors gracefully without throwing exceptions

  • Use appropriate logging for debugging and monitoring

  • Implement proper cleanup for resource management

Performance Optimization
  • Use connection pooling for database/Redis connections

  • Implement appropriate caching strategies

  • Batch operations when possible

  • Monitor memory usage and implement cleanup

Error Handling
  • Handle storage backend failures gracefully

  • Implement retry logic for transient failures

  • Provide meaningful error messages and logging

  • Consider fallback strategies for critical failures

Security Considerations
  • Validate input data to prevent injection attacks

  • Implement access controls for memory storage

  • Consider encryption for sensitive memory content

  • Audit memory access for compliance requirements

The reference memory implementations provide solid foundations for different memory management needs. Choose the implementation that best fits your requirements or use them as starting points for custom solutions.