Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
570755b
initial attempt to implement Weaviate Upsert. But without a full file…
Jul 7, 2025
a2534ab
Merge remote-tracking branch 'origin/main' into implement-weaviate-kn…
Jul 7, 2025
85c7d12
Some mypy issues. The "document.name" is an Optional, therefore check…
Jul 8, 2025
d1e88b8
Added a final condition. This is the best upsert solution with the li…
Jul 8, 2025
5a3eef1
typo?
Jul 9, 2025
93d2052
Merge branch 'main' into implement-weaviate-knowledge-upsert
Siete-F Jul 9, 2025
493093d
Merge branch 'main' into implement-weaviate-knowledge-upsert
Siete-F Jul 14, 2025
67e923d
Changed upsert error to warning and implemented it everywhere.
Jul 14, 2025
32169df
Centralized init code for AgentKnowledge methods. Ran ruff formatter …
Jul 14, 2025
a34bfdb
Weaviate upsert example added. ready for merge.
Jul 15, 2025
61a52ea
Merge branch 'main' into implement-weaviate-knowledge-upsert
kausmeows Jul 15, 2025
ae42642
Changed to info log
Jul 16, 2025
ea783ea
Merge branch 'implement-weaviate-knowledge-upsert' of https://github.…
Jul 16, 2025
5f359e5
Merge branch 'main' into implement-weaviate-knowledge-upsert
Siete-F Jul 16, 2025
c9a922b
Merge branch 'main' into implement-weaviate-knowledge-upsert
Siete-F Jul 21, 2025
8be2758
Missed the potentially un-defined vector_db
Siete-F Jul 22, 2025
1a6efef
It doesn't recognize the check for the missing vector_db in the sub f…
Siete-F Jul 22, 2025
d65e1d2
Added await's for async functions.
Siete-F Jul 22, 2025
3f4cea5
Merge branch 'main' into implement-weaviate-knowledge-upsert
Siete-F Jul 22, 2025
c25a8ea
Merge branch 'main' into implement-weaviate-knowledge-upsert
Siete-F Jul 29, 2025
110b6b4
Merge branch 'main' into implement-weaviate-knowledge-upsert
kausmeows Aug 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""
This example demonstrates using Weaviate as a vector database.

Installation:
pip install weaviate-client

You can use either Weaviate Cloud or a local instance.

Weaviate Cloud Setup:
1. Create account at https://console.weaviate.cloud/
2. Create a cluster and copy the "REST endpoint" and "Admin" API Key. Then set environment variables:
export WCD_URL="your-cluster-url"
export WCD_API_KEY="your-api-key"

Local Development Setup:
1. Install Docker from https://docs.docker.com/get-docker/
2. Run Weaviate locally:
docker run -d \
-p 8080:8080 \
-p 50051:50051 \
--name weaviate \
cr.weaviate.io/semitechnologies/weaviate:1.28.4
or use the script `cookbook/scripts/run_weviate.sh` to start a local instance.
3. Remember to set `local=True` on the Weaviate instantiation.
"""

from agno.knowledge.pdf_url import PDFUrlKnowledgeBase
from agno.knowledge.document import DocumentKnowledgeBase
from agno.document import Document
from agno.vectordb.search import SearchType
from agno.vectordb.weaviate import Distance, VectorIndex, Weaviate
from agno.utils.log import set_log_level_to_debug

from agno.embedder.sentence_transformer import SentenceTransformerEmbedder
embedder = SentenceTransformerEmbedder()

vector_db = Weaviate(
collection="recipes",
search_type=SearchType.hybrid,
vector_index=VectorIndex.HNSW,
distance=Distance.COSINE,
embedder=embedder,
local=True, # Set to False if using Weaviate Cloud and True if using local instance
)
# Create knowledge base
knowledge_base = PDFUrlKnowledgeBase(
urls=["https://agno-public.s3.amazonaws.com/recipes/ThaiRecipes.pdf"],
vector_db=vector_db,
)

vector_db.drop()
set_log_level_to_debug()

knowledge_base.load(recreate=False, upsert=True)

print("Knowledge base loaded with PDF content. Loading the same data again will not recreate it.")
knowledge_base.load(recreate=False, upsert=True)

print("First example finished. Now dropping the knowledge base.")
vector_db.drop()

doc1 = Document(content="my first content", name="doc1")
doc1_modified = Document(content="my first content corrected", name="doc1")
doc2 = Document(content="my second content", name="doc2")

knowledge_base = DocumentKnowledgeBase(
documents=[doc1, doc2],
vector_db=vector_db,
)
knowledge_base_changed = DocumentKnowledgeBase(
documents=[doc1_modified, doc2],
vector_db=vector_db,
)

print("\n\nStart second example. Load initial data...")
knowledge_base.load(recreate=False, upsert=True)
print("\nNow uploading the changed data...")
knowledge_base_changed.load(recreate=False, upsert=True)
print("Example finished. Now dropping the knowledge base.")
vector_db.drop()
2 changes: 1 addition & 1 deletion libs/agno/agno/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -6872,7 +6872,7 @@ def add_to_knowledge(self, query: str, result: str) -> str:
document_name = query.replace(" ", "_").replace("?", "").replace("!", "").replace(".", "")
document_content = json.dumps({"query": query, "result": result})
log_info(f"Adding document to knowledge base: {document_name}: {document_content}")
self.knowledge.add_document_to_knowledge_base(
self.knowledge.load_document(
document=Document(
name=document_name,
content=document_content,
Expand Down
140 changes: 68 additions & 72 deletions libs/agno/agno/knowledge/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,53 @@ async def async_document_lists(self) -> AsyncIterator[List[Document]]:
"""
raise NotImplementedError

def _upsert_warning(self, upsert) -> None:
"""Log a warning if upsert is not available"""
if upsert and self.vector_db is not None and not self.vector_db.upsert_available():
log_info(
f"Vector db '{self.vector_db.__class__.__module__}' does not support upsert. Falling back to insert."
)

def _load_init(self, recreate: bool, upsert: bool) -> None:
"""Initial setup for loading knowledge base"""
if self.vector_db is None:
logger.warning("No vector db provided")
return

if recreate:
log_info("Dropping collection")
self.vector_db.drop()

if not self.vector_db.exists():
log_info("Creating collection")
self.vector_db.create()

self._upsert_warning(upsert)

async def _aload_init(self, recreate: bool, upsert: bool) -> None:
"""Initial async setup for loading knowledge base"""
if self.vector_db is None:
logger.warning("No vector db provided")
return

if recreate:
log_info("Dropping collection")
try:
await self.vector_db.async_drop()
except NotImplementedError:
logger.warning("Vector db does not support async drop, falling back to sync drop")
self.vector_db.drop()

if not self.vector_db.exists():
log_info("Creating collection")
try:
await self.vector_db.async_create()
except NotImplementedError:
logger.warning("Vector db does not support async create, falling back to sync create")
self.vector_db.create()

self._upsert_warning(upsert)

def search(
self, query: str, num_documents: Optional[int] = None, filters: Optional[Dict[str, Any]] = None
) -> List[Document]:
Expand Down Expand Up @@ -80,7 +127,7 @@ async def async_search(
try:
return await self.vector_db.async_search(query=query, limit=_num_documents, filters=filters)
except NotImplementedError:
logger.info("Vector db does not support async search")
log_info("Vector db does not support async search")
return self.search(query=query, num_documents=_num_documents, filters=filters)
except Exception as e:
logger.error(f"Error searching for documents: {e}")
Expand All @@ -99,18 +146,10 @@ def load(
upsert (bool): If True, upserts documents to the vector db. Defaults to False.
skip_existing (bool): If True, skips documents which already exist in the vector db when inserting. Defaults to True.
"""
self._load_init(recreate, upsert)
if self.vector_db is None:
logger.warning("No vector db provided")
return

if recreate:
log_info("Dropping collection")
self.vector_db.drop()

if not self.vector_db.exists():
log_info("Creating collection")
self.vector_db.create()

log_info("Loading knowledge base")
num_documents = 0
for document_list in self.document_lists:
Expand All @@ -123,8 +162,7 @@ def load(

# Upsert documents if upsert is True and vector db supports upsert
if upsert and self.vector_db.upsert_available():
for doc in document_list:
self.vector_db.upsert(documents=[doc], filters=doc.meta_data)
self.vector_db.upsert(documents=documents_to_load, filters=doc.meta_data)
# Insert documents
else:
# Filter out documents which already exist in the vector db
Expand All @@ -133,11 +171,10 @@ def load(
documents_to_load = self.filter_existing_documents(document_list)

if documents_to_load:
for doc in documents_to_load:
self.vector_db.insert(documents=[doc], filters=doc.meta_data)
self.vector_db.insert(documents=documents_to_load, filters=doc.meta_data)

num_documents += len(documents_to_load)
log_info(f"Added {len(documents_to_load)} documents to knowledge base")
log_info(f"Added {num_documents} documents to knowledge base")

async def aload(
self,
Expand All @@ -152,19 +189,10 @@ async def aload(
upsert (bool): If True, upserts documents to the vector db. Defaults to False.
skip_existing (bool): If True, skips documents which already exist in the vector db when inserting. Defaults to True.
"""

await self._aload_init(recreate, upsert)
if self.vector_db is None:
logger.warning("No vector db provided")
return

if recreate:
log_info("Dropping collection")
await self.vector_db.async_drop()

if not await self.vector_db.async_exists():
log_info("Creating collection")
await self.vector_db.async_create()

log_info("Loading knowledge base")
num_documents = 0
document_iterator = self.async_document_lists
Expand All @@ -177,8 +205,7 @@ async def aload(

# Upsert documents if upsert is True and vector db supports upsert
if upsert and self.vector_db.upsert_available():
for doc in document_list:
await self.vector_db.async_upsert(documents=[doc], filters=doc.meta_data)
await self.vector_db.async_upsert(documents=documents_to_load, filters=doc.meta_data)
# Insert documents
else:
# Filter out documents which already exist in the vector db
Expand All @@ -187,11 +214,10 @@ async def aload(
documents_to_load = await self.async_filter_existing_documents(document_list)

if documents_to_load:
for doc in documents_to_load:
await self.vector_db.async_insert(documents=[doc], filters=doc.meta_data)
await self.vector_db.async_insert(documents=documents_to_load, filters=doc.meta_data)

num_documents += len(documents_to_load)
log_info(f"Added {len(documents_to_load)} documents to knowledge base")
log_info(f"Added {num_documents} documents to knowledge base")

def load_documents(
self,
Expand All @@ -208,15 +234,11 @@ def load_documents(
skip_existing (bool): If True, skips documents which already exist in the vector db when inserting. Defaults to True.
filters (Optional[Dict[str, Any]]): Filters to add to each row that can be used to limit results during querying. Defaults to None.
"""

log_info("Loading knowledge base")
self._load_init(recreate=False, upsert=upsert)
if self.vector_db is None:
logger.warning("No vector db provided")
return

log_debug("Creating collection")
self.vector_db.create()


log_info("Loading knowledge base")
# Upsert documents if upsert is True
if upsert and self.vector_db.upsert_available():
self.vector_db.upsert(documents=documents, filters=filters)
Expand Down Expand Up @@ -251,17 +273,11 @@ async def async_load_documents(
skip_existing (bool): If True, skips documents which already exist in the vector db when inserting. Defaults to True.
filters (Optional[Dict[str, Any]]): Filters to add to each row that can be used to limit results during querying. Defaults to None.
"""
log_info("Loading knowledge base")
await self._aload_init(recreate=False, upsert=upsert)
if self.vector_db is None:
logger.warning("No vector db provided")
return

log_debug("Creating collection")
try:
await self.vector_db.async_create()
except NotImplementedError:
logger.warning("Vector db does not support async create")
self.vector_db.create()
log_info("Loading knowledge base")

# Upsert documents if upsert is True
if upsert and self.vector_db.upsert_available():
Expand Down Expand Up @@ -302,7 +318,7 @@ async def async_load_documents(
else:
log_info("No new documents to load")

def add_document_to_knowledge_base(
def load_document(
self,
document: Document,
upsert: bool = False,
Expand Down Expand Up @@ -414,8 +430,6 @@ def filter_existing_documents(self, documents: List[Document]) -> List[Document]
Returns:
List[Document]: Filtered list of documents that don't exist in the database
"""
from agno.utils.log import log_debug, log_info

if not self.vector_db:
log_debug("No vector database configured, skipping document filtering")
return documents
Expand Down Expand Up @@ -556,20 +570,9 @@ def prepare_load(
self._track_metadata_structure(metadata)

# 3. Prepare vector DB
self._load_init(recreate, upsert=False)
if self.vector_db is None:
logger.warning("Cannot load file: No vector db provided.")
return False

# Recreate collection if requested
if recreate:
# log_info(f"Recreating collection.")
self.vector_db.drop()

# Create collection if it doesn't exist
if not self.vector_db.exists():
# log_info(f"Collection does not exist. Creating.")
self.vector_db.create()

return True

async def aprepare_load(
Expand Down Expand Up @@ -604,20 +607,9 @@ async def aprepare_load(
self._track_metadata_structure(metadata)

# 3. Prepare vector DB
await self._aload_init(recreate, upsert=False)
if self.vector_db is None:
logger.warning("Cannot load file: No vector db provided.")
return False

# Recreate collection if requested
if recreate:
log_info("Recreating collection.")
await self.vector_db.async_drop()

# Create collection if it doesn't exist
if not await self.vector_db.async_exists():
log_info("Collection does not exist. Creating.")
await self.vector_db.async_create()

return True

def process_documents(
Expand All @@ -642,6 +634,8 @@ def process_documents(

log_info(f"Loading {len(documents)} documents from {source_info} with metadata: {metadata}")

self._upsert_warning(upsert)

# Decide loading strategy: upsert or insert (with optional skip)
if upsert and self.vector_db.upsert_available(): # type: ignore
log_debug(f"Upserting {len(documents)} documents.") # type: ignore
Expand Down Expand Up @@ -681,6 +675,8 @@ async def aprocess_documents(
logger.warning(f"No documents were read from {source_info}")
return

self._upsert_warning(upsert)

log_info(f"Loading {len(documents)} documents from {source_info} with metadata: {metadata}")

# Decide loading strategy: upsert or insert (with optional skip)
Expand Down
Loading