Skip to content

Commit 125861b

Browse files
Siete-FSiete Frouwskausmeows
authored
[fix] Implement weaviate knowledge upsert. Improve AgentKnowledge class. (#3784)
### Sidenotes first: - `add_document_to_knowledge_base` is only used once and named inconsistently, therefore I renamed it to `load_document` (there is also an `async_load_document`, calling `async_load_documents` with a list with one document). - `upsert` should receive a list of Documents (2 out of all the calls to upsert are in a loop, passing one doc at a time, and the other calls are with the list of documents... There is no reason for that distinction). Leaving the looping to the upsert function allows for some additional flexibility inside the upsert, like obtaining the complete document content for example or checking the total number of chunks. - `num_documents` was not used, the final logging should have likely been unindented, printing it's value. ## Summary Upsert was practically not implemented, so kind of a feature. As I tried to state in the issue #3700, it is technically NOT possible to implement a proper "per-chunk" upsert. with the following assumptions: - Changed vector db documents must be deleted (for the given document name that is being 'upserted') to keep the vector db clean. - There is no (consistent) way to determine if a full document is changed somehow after chunking. - It's not common (or backwards compatible) to add a **full document hash** as meta data. - Weaviate `update` operations cannot be used since the document ID is actually the content hash, so processing a "change" can only be done by a delete and insert. Because of this I resorted to the following approach: - For the first chunk (or when not chunking) I can check if content changed only by checking if a certain (chunk) content hash is available in the vector db. - Then, since it changed, the only way to find the related documents is by name. Since I dont know if the chunking strategy changed, we can only delete all documents with that name. - For a second chunk, the name exists, but, after the prior deletion, the content-hash (document id) does not occur anymore... so it is considered a "change". But we should NOT delete the chunks by name... Only add the chunk. - Because of this, I resorted to only deleting all the documents by name when it is the first chunk (or the (only) document with no chunks). This means that changes in chunks 2 and higher will be recognized and added to the knowledge base, but the old chunk 2 will remain... I am open to feedback and am willing to have a discussion on a better implementation. As stated in the issue, for a better implementation I would need: > Long story short, I think we need to **include the hash of the content of the full document+metadata** in the document as metadata, so that we can determine if the hash of the document content + metadata is the same or changed. If changed, then delete all documents with that name, and start looping for the upsert (INSIDE the Weaviate module, not outside the module in the Agent class, which is the current state). The benefit is that on the first chunk you can relyably say with the full-document hash (any of all the chunks) changed. Then delete all 'documents' from the vector db with that name and insert. Beside that, I would recomment just using a random UUID4 as document id. ## Type of change - [x] Bug fix - [ ] New feature - [ ] Breaking change - [x] Improvement - [ ] Model update - [ ] Other: --- ## Checklist - [x] Code complies with style guidelines - [ ] Ran format/validation scripts (`./scripts/format.sh` and `./scripts/validate.sh`) - [x] Self-review completed - [x] Documentation updated (comments, docstrings) - [ ] Examples and guides: Relevant cookbook examples have been included or updated (if applicable) - [ ] Tested in clean environment - [x] Tests added/updated (if applicable) --- --------- Co-authored-by: Siete Frouws <siete.frouws@bincy.nl> Co-authored-by: Kaustubh <shuklakaustubh84@gmail.com>
1 parent d9b4f8c commit 125861b

File tree

5 files changed

+249
-91
lines changed

5 files changed

+249
-91
lines changed
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
"""
2+
This example demonstrates using Weaviate as a vector database.
3+
4+
Installation:
5+
pip install weaviate-client
6+
7+
You can use either Weaviate Cloud or a local instance.
8+
9+
Weaviate Cloud Setup:
10+
1. Create account at https://console.weaviate.cloud/
11+
2. Create a cluster and copy the "REST endpoint" and "Admin" API Key. Then set environment variables:
12+
export WCD_URL="your-cluster-url"
13+
export WCD_API_KEY="your-api-key"
14+
15+
Local Development Setup:
16+
1. Install Docker from https://docs.docker.com/get-docker/
17+
2. Run Weaviate locally:
18+
docker run -d \
19+
-p 8080:8080 \
20+
-p 50051:50051 \
21+
--name weaviate \
22+
cr.weaviate.io/semitechnologies/weaviate:1.28.4
23+
or use the script `cookbook/scripts/run_weviate.sh` to start a local instance.
24+
3. Remember to set `local=True` on the Weaviate instantiation.
25+
"""
26+
27+
from agno.knowledge.pdf_url import PDFUrlKnowledgeBase
28+
from agno.knowledge.document import DocumentKnowledgeBase
29+
from agno.document import Document
30+
from agno.vectordb.search import SearchType
31+
from agno.vectordb.weaviate import Distance, VectorIndex, Weaviate
32+
from agno.utils.log import set_log_level_to_debug
33+
34+
from agno.embedder.sentence_transformer import SentenceTransformerEmbedder
35+
embedder = SentenceTransformerEmbedder()
36+
37+
vector_db = Weaviate(
38+
collection="recipes",
39+
search_type=SearchType.hybrid,
40+
vector_index=VectorIndex.HNSW,
41+
distance=Distance.COSINE,
42+
embedder=embedder,
43+
local=True, # Set to False if using Weaviate Cloud and True if using local instance
44+
)
45+
# Create knowledge base
46+
knowledge_base = PDFUrlKnowledgeBase(
47+
urls=["https://agno-public.s3.amazonaws.com/recipes/ThaiRecipes.pdf"],
48+
vector_db=vector_db,
49+
)
50+
51+
vector_db.drop()
52+
set_log_level_to_debug()
53+
54+
knowledge_base.load(recreate=False, upsert=True)
55+
56+
print("Knowledge base loaded with PDF content. Loading the same data again will not recreate it.")
57+
knowledge_base.load(recreate=False, upsert=True)
58+
59+
print("First example finished. Now dropping the knowledge base.")
60+
vector_db.drop()
61+
62+
doc1 = Document(content="my first content", name="doc1")
63+
doc1_modified = Document(content="my first content corrected", name="doc1")
64+
doc2 = Document(content="my second content", name="doc2")
65+
66+
knowledge_base = DocumentKnowledgeBase(
67+
documents=[doc1, doc2],
68+
vector_db=vector_db,
69+
)
70+
knowledge_base_changed = DocumentKnowledgeBase(
71+
documents=[doc1_modified, doc2],
72+
vector_db=vector_db,
73+
)
74+
75+
print("\n\nStart second example. Load initial data...")
76+
knowledge_base.load(recreate=False, upsert=True)
77+
print("\nNow uploading the changed data...")
78+
knowledge_base_changed.load(recreate=False, upsert=True)
79+
print("Example finished. Now dropping the knowledge base.")
80+
vector_db.drop()

libs/agno/agno/agent/agent.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6872,7 +6872,7 @@ def add_to_knowledge(self, query: str, result: str) -> str:
68726872
document_name = query.replace(" ", "_").replace("?", "").replace("!", "").replace(".", "")
68736873
document_content = json.dumps({"query": query, "result": result})
68746874
log_info(f"Adding document to knowledge base: {document_name}: {document_content}")
6875-
self.knowledge.add_document_to_knowledge_base(
6875+
self.knowledge.load_document(
68766876
document=Document(
68776877
name=document_name,
68786878
content=document_content,

libs/agno/agno/knowledge/agent.py

Lines changed: 68 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,53 @@ async def async_document_lists(self) -> AsyncIterator[List[Document]]:
5050
"""
5151
raise NotImplementedError
5252

53+
def _upsert_warning(self, upsert) -> None:
54+
"""Log a warning if upsert is not available"""
55+
if upsert and self.vector_db is not None and not self.vector_db.upsert_available():
56+
log_info(
57+
f"Vector db '{self.vector_db.__class__.__module__}' does not support upsert. Falling back to insert."
58+
)
59+
60+
def _load_init(self, recreate: bool, upsert: bool) -> None:
61+
"""Initial setup for loading knowledge base"""
62+
if self.vector_db is None:
63+
logger.warning("No vector db provided")
64+
return
65+
66+
if recreate:
67+
log_info("Dropping collection")
68+
self.vector_db.drop()
69+
70+
if not self.vector_db.exists():
71+
log_info("Creating collection")
72+
self.vector_db.create()
73+
74+
self._upsert_warning(upsert)
75+
76+
async def _aload_init(self, recreate: bool, upsert: bool) -> None:
77+
"""Initial async setup for loading knowledge base"""
78+
if self.vector_db is None:
79+
logger.warning("No vector db provided")
80+
return
81+
82+
if recreate:
83+
log_info("Dropping collection")
84+
try:
85+
await self.vector_db.async_drop()
86+
except NotImplementedError:
87+
logger.warning("Vector db does not support async drop, falling back to sync drop")
88+
self.vector_db.drop()
89+
90+
if not self.vector_db.exists():
91+
log_info("Creating collection")
92+
try:
93+
await self.vector_db.async_create()
94+
except NotImplementedError:
95+
logger.warning("Vector db does not support async create, falling back to sync create")
96+
self.vector_db.create()
97+
98+
self._upsert_warning(upsert)
99+
53100
def search(
54101
self, query: str, num_documents: Optional[int] = None, filters: Optional[Dict[str, Any]] = None
55102
) -> List[Document]:
@@ -80,7 +127,7 @@ async def async_search(
80127
try:
81128
return await self.vector_db.async_search(query=query, limit=_num_documents, filters=filters)
82129
except NotImplementedError:
83-
logger.info("Vector db does not support async search")
130+
log_info("Vector db does not support async search")
84131
return self.search(query=query, num_documents=_num_documents, filters=filters)
85132
except Exception as e:
86133
logger.error(f"Error searching for documents: {e}")
@@ -99,18 +146,10 @@ def load(
99146
upsert (bool): If True, upserts documents to the vector db. Defaults to False.
100147
skip_existing (bool): If True, skips documents which already exist in the vector db when inserting. Defaults to True.
101148
"""
149+
self._load_init(recreate, upsert)
102150
if self.vector_db is None:
103-
logger.warning("No vector db provided")
104151
return
105152

106-
if recreate:
107-
log_info("Dropping collection")
108-
self.vector_db.drop()
109-
110-
if not self.vector_db.exists():
111-
log_info("Creating collection")
112-
self.vector_db.create()
113-
114153
log_info("Loading knowledge base")
115154
num_documents = 0
116155
for document_list in self.document_lists:
@@ -123,8 +162,7 @@ def load(
123162

124163
# Upsert documents if upsert is True and vector db supports upsert
125164
if upsert and self.vector_db.upsert_available():
126-
for doc in document_list:
127-
self.vector_db.upsert(documents=[doc], filters=doc.meta_data)
165+
self.vector_db.upsert(documents=documents_to_load, filters=doc.meta_data)
128166
# Insert documents
129167
else:
130168
# Filter out documents which already exist in the vector db
@@ -133,11 +171,10 @@ def load(
133171
documents_to_load = self.filter_existing_documents(document_list)
134172

135173
if documents_to_load:
136-
for doc in documents_to_load:
137-
self.vector_db.insert(documents=[doc], filters=doc.meta_data)
174+
self.vector_db.insert(documents=documents_to_load, filters=doc.meta_data)
138175

139176
num_documents += len(documents_to_load)
140-
log_info(f"Added {len(documents_to_load)} documents to knowledge base")
177+
log_info(f"Added {num_documents} documents to knowledge base")
141178

142179
async def aload(
143180
self,
@@ -152,19 +189,10 @@ async def aload(
152189
upsert (bool): If True, upserts documents to the vector db. Defaults to False.
153190
skip_existing (bool): If True, skips documents which already exist in the vector db when inserting. Defaults to True.
154191
"""
155-
192+
await self._aload_init(recreate, upsert)
156193
if self.vector_db is None:
157-
logger.warning("No vector db provided")
158194
return
159195

160-
if recreate:
161-
log_info("Dropping collection")
162-
await self.vector_db.async_drop()
163-
164-
if not await self.vector_db.async_exists():
165-
log_info("Creating collection")
166-
await self.vector_db.async_create()
167-
168196
log_info("Loading knowledge base")
169197
num_documents = 0
170198
document_iterator = self.async_document_lists
@@ -177,8 +205,7 @@ async def aload(
177205

178206
# Upsert documents if upsert is True and vector db supports upsert
179207
if upsert and self.vector_db.upsert_available():
180-
for doc in document_list:
181-
await self.vector_db.async_upsert(documents=[doc], filters=doc.meta_data)
208+
await self.vector_db.async_upsert(documents=documents_to_load, filters=doc.meta_data)
182209
# Insert documents
183210
else:
184211
# Filter out documents which already exist in the vector db
@@ -187,11 +214,10 @@ async def aload(
187214
documents_to_load = await self.async_filter_existing_documents(document_list)
188215

189216
if documents_to_load:
190-
for doc in documents_to_load:
191-
await self.vector_db.async_insert(documents=[doc], filters=doc.meta_data)
217+
await self.vector_db.async_insert(documents=documents_to_load, filters=doc.meta_data)
192218

193219
num_documents += len(documents_to_load)
194-
log_info(f"Added {len(documents_to_load)} documents to knowledge base")
220+
log_info(f"Added {num_documents} documents to knowledge base")
195221

196222
def load_documents(
197223
self,
@@ -208,15 +234,11 @@ def load_documents(
208234
skip_existing (bool): If True, skips documents which already exist in the vector db when inserting. Defaults to True.
209235
filters (Optional[Dict[str, Any]]): Filters to add to each row that can be used to limit results during querying. Defaults to None.
210236
"""
211-
212-
log_info("Loading knowledge base")
237+
self._load_init(recreate=False, upsert=upsert)
213238
if self.vector_db is None:
214-
logger.warning("No vector db provided")
215239
return
216-
217-
log_debug("Creating collection")
218-
self.vector_db.create()
219-
240+
241+
log_info("Loading knowledge base")
220242
# Upsert documents if upsert is True
221243
if upsert and self.vector_db.upsert_available():
222244
self.vector_db.upsert(documents=documents, filters=filters)
@@ -251,17 +273,11 @@ async def async_load_documents(
251273
skip_existing (bool): If True, skips documents which already exist in the vector db when inserting. Defaults to True.
252274
filters (Optional[Dict[str, Any]]): Filters to add to each row that can be used to limit results during querying. Defaults to None.
253275
"""
254-
log_info("Loading knowledge base")
276+
await self._aload_init(recreate=False, upsert=upsert)
255277
if self.vector_db is None:
256-
logger.warning("No vector db provided")
257278
return
258279

259-
log_debug("Creating collection")
260-
try:
261-
await self.vector_db.async_create()
262-
except NotImplementedError:
263-
logger.warning("Vector db does not support async create")
264-
self.vector_db.create()
280+
log_info("Loading knowledge base")
265281

266282
# Upsert documents if upsert is True
267283
if upsert and self.vector_db.upsert_available():
@@ -302,7 +318,7 @@ async def async_load_documents(
302318
else:
303319
log_info("No new documents to load")
304320

305-
def add_document_to_knowledge_base(
321+
def load_document(
306322
self,
307323
document: Document,
308324
upsert: bool = False,
@@ -414,8 +430,6 @@ def filter_existing_documents(self, documents: List[Document]) -> List[Document]
414430
Returns:
415431
List[Document]: Filtered list of documents that don't exist in the database
416432
"""
417-
from agno.utils.log import log_debug, log_info
418-
419433
if not self.vector_db:
420434
log_debug("No vector database configured, skipping document filtering")
421435
return documents
@@ -556,20 +570,9 @@ def prepare_load(
556570
self._track_metadata_structure(metadata)
557571

558572
# 3. Prepare vector DB
573+
self._load_init(recreate, upsert=False)
559574
if self.vector_db is None:
560-
logger.warning("Cannot load file: No vector db provided.")
561575
return False
562-
563-
# Recreate collection if requested
564-
if recreate:
565-
# log_info(f"Recreating collection.")
566-
self.vector_db.drop()
567-
568-
# Create collection if it doesn't exist
569-
if not self.vector_db.exists():
570-
# log_info(f"Collection does not exist. Creating.")
571-
self.vector_db.create()
572-
573576
return True
574577

575578
async def aprepare_load(
@@ -604,20 +607,9 @@ async def aprepare_load(
604607
self._track_metadata_structure(metadata)
605608

606609
# 3. Prepare vector DB
610+
await self._aload_init(recreate, upsert=False)
607611
if self.vector_db is None:
608-
logger.warning("Cannot load file: No vector db provided.")
609612
return False
610-
611-
# Recreate collection if requested
612-
if recreate:
613-
log_info("Recreating collection.")
614-
await self.vector_db.async_drop()
615-
616-
# Create collection if it doesn't exist
617-
if not await self.vector_db.async_exists():
618-
log_info("Collection does not exist. Creating.")
619-
await self.vector_db.async_create()
620-
621613
return True
622614

623615
def process_documents(
@@ -642,6 +634,8 @@ def process_documents(
642634

643635
log_info(f"Loading {len(documents)} documents from {source_info} with metadata: {metadata}")
644636

637+
self._upsert_warning(upsert)
638+
645639
# Decide loading strategy: upsert or insert (with optional skip)
646640
if upsert and self.vector_db.upsert_available(): # type: ignore
647641
log_debug(f"Upserting {len(documents)} documents.") # type: ignore
@@ -681,6 +675,8 @@ async def aprocess_documents(
681675
logger.warning(f"No documents were read from {source_info}")
682676
return
683677

678+
self._upsert_warning(upsert)
679+
684680
log_info(f"Loading {len(documents)} documents from {source_info} with metadata: {metadata}")
685681

686682
# Decide loading strategy: upsert or insert (with optional skip)

0 commit comments

Comments
 (0)