Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions app/agents/voice/automatic/prompts/system/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ def get_base_system_prompt():
CURRENT DATE & TIME REQUIREMENTS
Today's date is {datetime.datetime.now().strftime("%B %d, %Y")}. However, for ANY tool-related queries or operations involving time/date, you MUST ALWAYS invoke the `get_current_time` tool first to get the exact current timestamp. Never rely on static date information for tool operations.
MEMORY USAGE INSTRUCTIONS
- BEFORE filling with default values or before asking for missing information, check if it can be inferred from user memory context that may appear in messages
- Use past preferences and patterns to fill gaps in requests when appropriate
- Only ask for clarification when the memory context doesn't provide sufficient information
- If user preferences conflict with their current request, acknowledge the context and ask for confirmation
IDENTITY
If asked about identity, say:
"I'm your AI sidekick. Think of me as your extra brain for your D2C business. Whether it's digging through data, summarizing reports, or prepping for your next big move — I'm here to help you work smarter."
Expand Down
99 changes: 98 additions & 1 deletion app/agents/voice/automatic/services/mem0/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import datetime
import hashlib
import time
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional

from pipecat.frames.frames import Frame, LLMMessagesFrame
from pipecat.processors.aggregators.openai_llm_context import (
Expand All @@ -25,6 +25,68 @@
raise Exception(f"Missing module: {e}")


def get_user_memories(user_id: str, limit: int = 5):
"""
Shared function to retrieve user memories from Mem0.
Uses ImprovedMem0MemoryService for reliable memory retrieval.

Args:
user_id: User identifier for memory retrieval
limit: Maximum number of memories to retrieve

Returns:
List of memories or None if failed
"""
if not config.MEM0_API_KEY:
logger.debug("MEM0_API_KEY not configured")
return None

try:
memory_params = ImprovedMem0MemoryService.InputParams()
memory_service = ImprovedMem0MemoryService(
api_key=config.MEM0_API_KEY,
user_id=user_id,
params=memory_params,
)

memories = memory_service.memory_client.get_all(user_id=user_id, limit=limit)
return memories
except Exception as e:
logger.error(f"Error retrieving memories: {e}")
return None


def format_memories_as_context(memories) -> Optional[str]:
if not memories:
return None

context_lines = []

for memory in memories:
memory_text = ""

# Handle the actual mem0 API response format
if isinstance(memory, dict):
# Primary field is 'memory' based on your API response
memory_text = memory.get("memory", "")

# Fallback to other possible fields
if not memory_text:
memory_text = memory.get("text", "") or memory.get("content", "")

# Additional validation and cleanup
if memory_text and isinstance(memory_text, str):
cleaned_text = memory_text.strip()
if cleaned_text and len(cleaned_text) > 5: # Reduced threshold
context_lines.append(f"- {cleaned_text}")

if not context_lines:
return None

context = f"[USER MEMORY CONTEXT] {chr(10).join(context_lines)}"
return context


class ImprovedMem0MemoryService(Mem0MemoryService):
"""
An improved version of Mem0MemoryService with enhanced reliability and performance.
Expand Down Expand Up @@ -439,6 +501,41 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):

if latest_user_message:
process_start = time.time()

user_id = getattr(self, "user_id", None)
if user_id:
try:
# Get user memories directly
memories = get_user_memories(user_id, limit=5)

if memories:
# Format memories as context
memory_context = format_memories_as_context(memories)

if memory_context:
# Enhance user message with memory context
enhanced_user_message = f"{memory_context}\nCurrent user message: {latest_user_message}"

context_messages = context.get_messages()
for i, msg in enumerate(context_messages):
if (
msg.get("role") == "user"
and msg.get("content")
== latest_user_message
):
# Update the last user message with memory context
context_messages[i][
"content"
] = enhanced_user_message
logger.debug(
f"Enhanced user message with historical memory context (memories: {len(memories)})"
)
break

except Exception as e:
logger.error(f"Dynamic memory enhancement failed: {e}")
# Continue with original message

# Enhance context with memories before passing it downstream (with fault tolerance)
logger.debug(
f"Enhancing context with memories based on user message: '{latest_user_message[:50]}...'"
Expand Down