WorkingMemoryAgent Reference Implementation

The WorkingMemoryAgent is a specialized agent that manages conversation working memory. It demonstrates how to build agents that maintain context across interactions using the framework’s memory management capabilities.

Note

Reference Implementation

This is a reference implementation showing how we’ve built memory-enabled agents. It’s not part of the core framework - use it as-is, modify it, or build your own memory management approach.

Overview

Purpose

Maintains and updates working memory for conversations, ensuring context is preserved across multiple interactions.

Location

arshai.agents.hub.working_memory.WorkingMemoryAgent

Key Capabilities
  • Retrieves existing conversation memory

  • Incorporates new interaction context

  • Generates updated memory summaries

  • Stores updated memory persistently

  • Handles missing dependencies gracefully

Integration Points
  • Works with any IMemoryManager implementation

  • Optional chat history integration

  • Uses metadata for conversation identification

  • Returns status information for system coordination

Basic Usage

Simple Usage

from arshai.agents.hub.working_memory import WorkingMemoryAgent
from arshai.memory.working_memory.in_memory_manager import InMemoryManager
from arshai.llms.openai import OpenAIClient
from arshai.core.interfaces.iagent import IAgentInput

# Create dependencies
llm_client = OpenAIClient(config)
memory_manager = InMemoryManager()

# Create memory agent
memory_agent = WorkingMemoryAgent(
    llm_client=llm_client,
    memory_manager=memory_manager
)

# Update memory with new interaction
result = await memory_agent.process(IAgentInput(
    message="User asked about product pricing and mentioned budget concerns",
    metadata={"conversation_id": "user_123"}
))

print(result)  # "success" if successful

With Redis Storage

from arshai.agents.hub.working_memory import WorkingMemoryAgent
from arshai.memory.working_memory.redis_memory_manager import RedisMemoryManager
from arshai.clients.redis_client import RedisClient

# Create Redis-backed memory manager
redis_client = RedisClient(host="localhost", port=6379)
memory_manager = RedisMemoryManager(redis_client)

# Create memory agent with persistent storage
memory_agent = WorkingMemoryAgent(
    llm_client=llm_client,
    memory_manager=memory_manager
)

# Memory will persist across application restarts
result = await memory_agent.process(IAgentInput(
    message="User completed purchase and requested support",
    metadata={"conversation_id": "user_123"}
))

Custom System Prompt

custom_prompt = """You are a specialized memory manager for customer service conversations.

Focus on:
- Customer preferences and history
- Previous issues and resolutions
- Current context and needs
- Action items and follow-ups

Keep memory concise but actionable for support agents."""

memory_agent = WorkingMemoryAgent(
    llm_client=llm_client,
    system_prompt=custom_prompt,
    memory_manager=memory_manager
)

Configuration Options

Constructor Parameters

WorkingMemoryAgent(
    llm_client: ILLM,                    # Required: LLM for memory generation
    system_prompt: str = None,           # Optional: Custom memory prompt
    memory_manager: IMemoryManager = None, # Optional: Storage backend
    chat_history_client: Any = None,     # Optional: Conversation history source
    **kwargs                             # Additional BaseAgent parameters
)
Default System Prompt

The agent includes a comprehensive default prompt optimized for memory management:

You are a memory management assistant responsible for maintaining conversation context.

Your tasks:
1. Analyze conversation history and current interaction
2. Extract key information, facts, and context
3. Generate a concise working memory summary
4. Focus on information relevant for future interactions

Keep the memory:
- Concise but comprehensive
- Focused on actionable information
- Updated with latest context
- Free from redundancy

Processing Flow

The WorkingMemoryAgent follows this processing flow:

1. Conversation ID Extraction

conversation_id = input.metadata.get("conversation_id")
if not conversation_id:
    return "error: no conversation_id provided"

2. Current Memory Retrieval

if self.memory_manager:
    memory_data = await self.memory_manager.retrieve({"conversation_id": conversation_id})
    current_memory = memory_data[0].working_memory if memory_data else ""

3. Conversation History Retrieval (Optional)

if self.chat_history:
    history = await self.chat_history.get(conversation_id)
    conversation_history = str(history) if history else ""

4. Context Preparation

context = f"""
Current Working Memory:
{current_memory if current_memory else "No existing memory"}

Conversation History:
{conversation_history if conversation_history else "No previous history"}

New Interaction:
{input.message}

Please generate an updated working memory that incorporates the new information...
"""

5. Memory Generation

llm_input = ILLMInput(
    system_prompt=self.system_prompt,
    user_message=context
)
result = await self.llm_client.chat(llm_input)
updated_memory = result.get('llm_response', '')

6. Memory Storage

if self.memory_manager:
    await self.memory_manager.store({
        "conversation_id": conversation_id,
        "working_memory": updated_memory,
        "metadata": input.metadata
    })

7. Status Return

return "success"  # or "error: <description>"

Response Format

The agent returns status strings to indicate processing results:

Success Response

"success" - Memory was successfully updated and stored

Error Responses
  • "error: no conversation_id provided" - No conversation ID in metadata

  • "error: empty memory response" - LLM returned empty response

  • "error: storage failed - <details>" - Memory storage failed

  • "error: <exception message>" - Other processing errors

Integration Patterns

As Background Task

Use WorkingMemoryAgent as a background task for automatic memory updates:

def update_memory(message: str, conversation_id: str):
    """Background task for memory updates"""
    memory_input = IAgentInput(
        message=message,
        metadata={"conversation_id": conversation_id}
    )
    # This runs in background, doesn't block conversation
    asyncio.create_task(memory_agent.process(memory_input))

# In your main agent
background_tasks = {"update_memory": update_memory}

llm_input = ILLMInput(
    system_prompt=main_prompt,
    user_message=user_message,
    background_tasks=background_tasks
)

# LLM can trigger memory updates automatically
result = await llm_client.chat(llm_input)
In Multi-Agent Systems

Coordinate memory updates across multiple agents:

class CustomerServiceSystem:
    def __init__(self, llm_client, memory_manager):
        self.memory_agent = WorkingMemoryAgent(llm_client, memory_manager=memory_manager)
        self.support_agent = SupportAgent(llm_client)
        self.escalation_agent = EscalationAgent(llm_client)

    async def handle_request(self, message: str, conversation_id: str):
        # Update memory with new interaction
        await self.memory_agent.process(IAgentInput(
            message=f"User interaction: {message}",
            metadata={"conversation_id": conversation_id}
        ))

        # Handle request with appropriate agent
        if self.needs_escalation(message):
            return await self.escalation_agent.process(IAgentInput(
                message=message,
                metadata={"conversation_id": conversation_id}
            ))
        else:
            return await self.support_agent.process(IAgentInput(
                message=message,
                metadata={"conversation_id": conversation_id}
            ))
With Workflow Systems

Integrate memory updates into workflow steps:

class MemoryUpdateNode(BaseNode):
    def __init__(self, memory_agent):
        self.memory_agent = memory_agent

    async def execute(self, context: dict) -> dict:
        # Update memory as part of workflow
        result = await self.memory_agent.process(IAgentInput(
            message=context.get("interaction_summary"),
            metadata={"conversation_id": context.get("conversation_id")}
        ))

        context["memory_update_status"] = result
        return context

Error Handling

The agent implements comprehensive error handling:

Graceful Degradation
  • Continues processing even if memory retrieval fails

  • Handles missing chat history gracefully

  • Works without memory manager (logs warnings)

Error Recovery
  • Catches and logs all exceptions

  • Returns descriptive error messages

  • Doesn’t crash on partial failures

Logging
  • Debug logs for successful operations

  • Warning logs for missing dependencies

  • Error logs for failures with details

# Example error handling in the implementation
try:
    memory_data = await self.memory_manager.retrieve({"conversation_id": conversation_id})
    # ... process memory data
except Exception as e:
    # Log warning but continue without current memory
    logger.warning(f"Failed to retrieve memory: {e}")
    current_memory = ""

Testing Patterns

Unit Testing with Mocks

import pytest
from unittest.mock import AsyncMock
from arshai.agents.hub.working_memory import WorkingMemoryAgent

@pytest.mark.asyncio
async def test_memory_agent_success():
    # Mock dependencies
    mock_llm = AsyncMock()
    mock_memory_manager = AsyncMock()

    # Configure mock responses
    mock_llm.chat.return_value = {"llm_response": "Updated memory content"}
    mock_memory_manager.retrieve.return_value = []
    mock_memory_manager.store.return_value = None

    # Create and test agent
    agent = WorkingMemoryAgent(mock_llm, memory_manager=mock_memory_manager)

    result = await agent.process(IAgentInput(
        message="Test interaction",
        metadata={"conversation_id": "test_123"}
    ))

    assert result == "success"
    mock_memory_manager.store.assert_called_once()

Integration Testing

@pytest.mark.asyncio
async def test_memory_agent_integration():
    # Test with real components
    llm_client = OpenAIClient(test_config)
    memory_manager = InMemoryManager()

    agent = WorkingMemoryAgent(llm_client, memory_manager=memory_manager)

    # First interaction
    result1 = await agent.process(IAgentInput(
        message="User wants to buy a laptop",
        metadata={"conversation_id": "integration_test"}
    ))
    assert result1 == "success"

    # Second interaction - should include previous context
    result2 = await agent.process(IAgentInput(
        message="User asked about warranty options",
        metadata={"conversation_id": "integration_test"}
    ))
    assert result2 == "success"

    # Verify memory persistence
    memory_data = await memory_manager.retrieve({"conversation_id": "integration_test"})
    assert len(memory_data) > 0
    assert "laptop" in memory_data[0].working_memory
    assert "warranty" in memory_data[0].working_memory

Error Handling Tests

@pytest.mark.asyncio
async def test_memory_agent_error_handling():
    mock_llm = AsyncMock()
    mock_memory_manager = AsyncMock()

    # Test missing conversation ID
    agent = WorkingMemoryAgent(mock_llm, memory_manager=mock_memory_manager)
    result = await agent.process(IAgentInput(message="test"))
    assert result == "error: no conversation_id provided"

    # Test storage failure
    mock_llm.chat.return_value = {"llm_response": "Updated memory"}
    mock_memory_manager.retrieve.return_value = []
    mock_memory_manager.store.side_effect = Exception("Storage failed")

    result = await agent.process(IAgentInput(
        message="test",
        metadata={"conversation_id": "test"}
    ))
    assert result.startswith("error: storage failed")

Customization Patterns

Custom Memory Processing

class DomainSpecificMemoryAgent(WorkingMemoryAgent):
    """Memory agent specialized for e-commerce conversations"""

    def __init__(self, llm_client, memory_manager, product_catalog):
        custom_prompt = """You are an e-commerce memory manager.

        Focus on:
        - Product interests and preferences
        - Purchase history and patterns
        - Budget constraints and concerns
        - Support interactions and resolutions"""

        super().__init__(llm_client, custom_prompt, memory_manager)
        self.product_catalog = product_catalog

    async def process(self, input: IAgentInput) -> str:
        # Add product context before processing
        if self.should_add_product_context(input.message):
            enhanced_message = await self.add_product_context(input.message)
            enhanced_input = IAgentInput(
                message=enhanced_message,
                metadata=input.metadata
            )
            return await super().process(enhanced_input)

        return await super().process(input)

Memory Validation

class ValidatingMemoryAgent(WorkingMemoryAgent):
    """Memory agent with content validation"""

    async def process(self, input: IAgentInput) -> str:
        # Process memory normally
        result = await super().process(input)

        if result == "success":
            # Validate stored memory
            conversation_id = input.metadata.get("conversation_id")
            if await self.validate_memory_quality(conversation_id):
                return "success"
            else:
                return "error: memory validation failed"

        return result

    async def validate_memory_quality(self, conversation_id: str) -> bool:
        # Custom validation logic
        memory_data = await self.memory_manager.retrieve({"conversation_id": conversation_id})
        if not memory_data:
            return False

        memory_content = memory_data[0].working_memory
        return len(memory_content) > 10 and not self.contains_sensitive_data(memory_content)

Best Practices

Memory Content
  • Keep memory concise but comprehensive

  • Focus on actionable information for future interactions

  • Remove redundant or outdated information

  • Include context that affects future decisions

Error Handling
  • Always check for conversation_id in metadata

  • Handle storage failures gracefully

  • Log appropriate information for debugging

  • Return meaningful status information

Performance
  • Consider memory size limits for large conversations

  • Implement memory cleanup for old conversations

  • Use appropriate storage backends for your scale

  • Monitor memory update frequency

Security
  • Avoid storing sensitive information in working memory

  • Implement access controls for memory storage

  • Consider encryption for persistent storage

  • Validate and sanitize memory content

Limitations and Considerations

Current Limitations
  • No built-in memory size limits

  • No automatic cleanup of old memories

  • Single conversation context only

  • No memory versioning or history

Performance Considerations
  • Memory updates are synchronous operations

  • Large conversation histories may slow processing

  • Storage backend performance affects agent performance

  • Memory retrieval happens on every update

Scaling Considerations
  • Consider memory storage patterns for high-volume systems

  • Implement memory archiving for long-running conversations

  • Monitor storage backend performance and capacity

  • Consider distributed memory storage for multi-instance deployments

The WorkingMemoryAgent demonstrates a practical approach to conversation memory management using Arshai’s building blocks. Use it as a starting point for your own memory management needs or as inspiration for different approaches.