Storage context persistance with RedisIndexStore and RedisDocumentStore #19332
Replies: 2 comments
-
Persistence with RedisIndexStore and RedisDocumentStore is automatic—every CRUD operation writes directly to Redis using the Redis client's hset/hdel methods, so there's no need for an explicit flush, save, or persist step like you have with SimpleDocumentStore or SimpleIndexStore. As soon as you add, update, or delete data, it's immediately stored in Redis and will persist as long as your Redis instance is running and configured for persistence (AOF or RDB) on the server side. No extra LlamaIndex-specific persistence calls are needed—just use the stores as usual and your data is durable in Redis after each operation. This is a key difference from the local/file-based stores, which require explicit save/persist calls to ensure data survives process restarts. To reply, just mention @dosu. How did I do? Good | Irrelevant | Incorrect | Verbose | Hallucination | Report 🐛 | Other |
Beta Was this translation helpful? Give feedback.
-
With such implementation of the DocumentStore like the IndexStore: from uuid import UUID
from redis import Redis
from llama_index.storage.docstore.redis import RedisDocumentStore
from utils.logger import logger
def _dkey(uid: UUID, sid: UUID, doc_id: str):
return f"docstore:{uid}:{sid}:{doc_id}"
class DocumentStore:
"""
Manages the lifecycle of session-specific document stores in Redis.
This class is responsible for providing a namespaced LlamaIndex
RedisDocumentStore for a given conversation and for cleaning up all
associated documents when the conversation is deleted.
"""
def __init__(self, redis_client: Redis) -> None:
"""
Initializes the document store manager.
Args:
redis_client: A shared, active redis.Redis client for cleanup operations.
"""
self._redis_client = redis_client
logger.info(
f"DocumentStore manager initialized with a shared Redis client: {redis_client}."
)
def get_store(
self, user_id: UUID, session_id: UUID, doc_id: str
) -> RedisDocumentStore:
"""
Returns a LlamaIndex RedisDocumentStore instance uniquely namespaced
for the given user and session.
This store should be passed to the LlamaIndex IngestionPipeline.
"""
# TODO: retrieve this doc ID
return RedisDocumentStore.from_redis_client(
redis_client=self._redis_client,
namespace=_dkey(user_id, session_id, doc_id),
)
def delete_documents(self, user_id: UUID, session_id: UUID, doc_id: str):
"""
Deletes all document keys associated with a specific conversation
from Redis using pattern matching on the namespace.
"""
# The LlamaIndex Redis stores prepend the namespace to the keys.
pattern = f"docstore:{user_id}:{session_id}:{doc_id}:*"
# Use the shared client to efficiently find and delete all matching keys.
keys_to_delete = [key for key in self._redis_client.scan_iter(pattern)]
if keys_to_delete:
self._redis_client.delete(*keys_to_delete)
logger.info(
f"Deleted {len(keys_to_delete)} document keys for session {session_id}"
)
else:
logger.debug(f"No document keys found to delete for session {session_id}") and such implementation of the client connection: from redis import exceptions
from redis.asyncio import Redis
from rag.schemas import REDISConfig
from utils.logger import logger
def connect_client(redis_config: REDISConfig) -> Redis:
"""
Initializes and returns a reusable Redis client instance from the application configuration.
Args:
config: The Redis configuration object.
Returns:
An active and connected redis.Redis client instance.
Raises:
redis.exceptions.ConnectionError: If the connection to Redis fails.
"""
try:
r = Redis(
host=redis_config.host,
port=int(redis_config.port),
db=int(redis_config.db),
password=redis_config.password,
socket_timeout=int(redis_config.timeout),
)
r.ping()
logger.info(
f"Successfully connected to Redis at {redis_config.host}:{redis_config.port}"
)
except exceptions.ConnectionError as e:
logger.error(
f"Failed to connect to Redis at {redis_config.host}:{redis_config.port}. Please check the server and config. Error: {e}"
)
raise
return r Question: How to pass the async client to the DocumentStore and the IndexStore to handle upserts on the DocumentStore? Since this error arises:
|
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Hello everyone, I have an implementation of a storage_context like so:
But I'd like to migrate to a RedisIndexStore and a RedisDocument store, both instanciated from a common Redis client I have, that creates also a ChatStore.
The IndexStore would be like so:
And I would have a similar implementation of the document_store.
My question is: How can I mirror the same persistence done with the SimpleDocumentStore and the SimpleIndexStore for Redis? Is it done automatically since the Simple ones are local and so in-memory?
@dosu
Thanks for the help!
Beta Was this translation helpful? Give feedback.
All reactions