Skip to content

Commit f618867

Browse files
yakubova92nlarew
andauthored
(EAI-152) check for change to chunkAlgoHash when updating embeddings (#580)
* check for change in chunkAlgoHash * add pages with changed chunking to arg for next step * pulling pages to be updated based on embedded content * update embeddings if chunk algo changes, regardless of page changes * chunkAlgoHash creation moved up in function chain, value passed down * change implementation - query for data sources that use an old chunk algo hash --------- Co-authored-by: Nick Larew <nick.larew@mongodb.com>
1 parent 86a31e6 commit f618867

File tree

5 files changed

+370
-23
lines changed

5 files changed

+370
-23
lines changed

packages/mongodb-rag-core/src/contentStore/EmbeddedContent.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,14 @@ export type DeleteEmbeddedContentArgs = {
8484
inverseDataSources?: boolean;
8585
};
8686

87+
export interface GetSourcesMatchParams {
88+
sourceNames?: string[];
89+
chunkAlgoHash: {
90+
hashValue: string;
91+
operation: "equals" | "notEquals";
92+
};
93+
}
94+
8795
/**
8896
Filters for querying the embedded content vector store.
8997
*/
@@ -135,4 +143,9 @@ export type EmbeddedContentStore = VectorStore<EmbeddedContent> & {
135143
Initialize the store.
136144
*/
137145
init?: () => Promise<void>;
146+
147+
/**
148+
Get the names of ingested data sources that match the given query.
149+
*/
150+
getDataSources(matchQuery: GetSourcesMatchParams): Promise<string[]>;
138151
};

packages/mongodb-rag-core/src/contentStore/MongoDbEmbeddedContentStore.ts

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1-
import { pageIdentity } from ".";
1+
import { pageIdentity, PersistedPage } from ".";
22
import { DatabaseConnection } from "../DatabaseConnection";
33
import {
44
EmbeddedContent,
55
EmbeddedContentStore,
6+
GetSourcesMatchParams,
67
QueryFilters,
78
} from "./EmbeddedContent";
89
import { FindNearestNeighborsOptions, WithScore } from "../VectorStore";
@@ -62,6 +63,21 @@ export type MongoDbEmbeddedContentStore = EmbeddedContentStore &
6263
init(): Promise<void>;
6364
};
6465

66+
function makeMatchQuery({ sourceNames, chunkAlgoHash }: GetSourcesMatchParams) {
67+
const operator = chunkAlgoHash.operation === "equals" ? "$eq" : "$ne";
68+
return {
69+
chunkAlgoHash: { [operator]: chunkAlgoHash.hashValue },
70+
// run on specific source names if specified, run on all if not
71+
...(sourceNames
72+
? {
73+
sourceName: {
74+
$in: sourceNames,
75+
},
76+
}
77+
: undefined),
78+
};
79+
}
80+
6581
export function makeMongoDbEmbeddedContentStore({
6682
connectionUri,
6783
databaseName,
@@ -257,6 +273,23 @@ export function makeMongoDbEmbeddedContentStore({
257273
}
258274
}
259275
},
276+
277+
async getDataSources(matchQuery: GetSourcesMatchParams): Promise<string[]> {
278+
const result = await embeddedContentCollection
279+
.aggregate([
280+
{ $match: makeMatchQuery(matchQuery) },
281+
{
282+
$group: {
283+
_id: null,
284+
uniqueSources: { $addToSet: "$sourceName" },
285+
},
286+
},
287+
{ $project: { _id: 0, uniqueSources: 1 } },
288+
])
289+
.toArray();
290+
const uniqueSources = result.length > 0 ? result[0].uniqueSources : [];
291+
return uniqueSources;
292+
},
260293
};
261294
}
262295

packages/mongodb-rag-core/src/contentStore/updateEmbeddedContent.test.ts

Lines changed: 250 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,27 @@ import {
22
updateEmbeddedContent,
33
updateEmbeddedContentForPage,
44
} from "./updateEmbeddedContent";
5-
import { persistPages } from ".";
5+
import {
6+
makeMongoDbEmbeddedContentStore,
7+
makeMongoDbPageStore,
8+
MongoDbEmbeddedContentStore,
9+
MongoDbPageStore,
10+
persistPages,
11+
updatePages,
12+
} from ".";
613
import { makeMockPageStore } from "../test/MockPageStore";
714
import * as chunkPageModule from "../chunk/chunkPage";
8-
import { EmbeddedContentStore, EmbeddedContent } from "./EmbeddedContent";
15+
import {
16+
EmbeddedContentStore,
17+
EmbeddedContent,
18+
GetSourcesMatchParams,
19+
} from "./EmbeddedContent";
920
import { Embedder } from "../embed";
1021
import { Page, PersistedPage } from ".";
22+
import { strict as assert } from "assert";
23+
import { MongoMemoryReplSet } from "mongodb-memory-server";
24+
import { DataSource } from "../dataSources";
25+
import { MongoClient } from "mongodb";
1126

1227
export const makeMockEmbeddedContentStore = (): EmbeddedContentStore => {
1328
const content: Map<string /* page url */, EmbeddedContent[]> = new Map();
@@ -29,6 +44,9 @@ export const makeMockEmbeddedContentStore = (): EmbeddedContentStore => {
2944
metadata: {
3045
embeddingName: "test",
3146
},
47+
async getDataSources(matchQuery: GetSourcesMatchParams): Promise<string[]> {
48+
return [];
49+
},
3250
};
3351
};
3452

@@ -49,6 +67,7 @@ const embedder = {
4967
},
5068
};
5169

70+
// TODO: deprecate mock store and use mongodb-memory-server instead. https://jira.mongodb.org/browse/EAI-935
5271
describe("updateEmbeddedContent", () => {
5372
it("deletes embedded content for deleted page", async () => {
5473
const pageStore = makeMockPageStore();
@@ -207,6 +226,7 @@ describe("updateEmbeddedContent", () => {
207226
store: embeddedContentStore,
208227
page,
209228
concurrencyOptions: { createChunks: 2 },
229+
chunkAlgoHash: "testchunkalgohash",
210230
});
211231

212232
const embeddedContent = await embeddedContentStore.loadEmbeddedContent({
@@ -276,3 +296,231 @@ describe("updateEmbeddedContent", () => {
276296
});
277297
});
278298
});
299+
300+
// These tests use "mongodb-memory-server", not mockEmbeddedContentStore
301+
describe("updateEmbeddedContent updates chunks based on changes to copy or changes to the chunk algo", () => {
302+
let mongod: MongoMemoryReplSet | undefined;
303+
let pageStore: MongoDbPageStore;
304+
let embedStore: MongoDbEmbeddedContentStore;
305+
let uri: string;
306+
let databaseName: string;
307+
let mongoClient: MongoClient;
308+
let page1Embedding: EmbeddedContent[], page2Embedding: EmbeddedContent[];
309+
let pages: PersistedPage[] = [];
310+
311+
const embedder = {
312+
async embed() {
313+
return { embedding: [1, 2, 3] };
314+
},
315+
};
316+
const mockDataSources: DataSource[] = [
317+
{
318+
name: "source1",
319+
fetchPages: async () => [
320+
{
321+
url: "test1.com",
322+
format: "html",
323+
sourceName: "source1",
324+
body: "hello source 1",
325+
},
326+
],
327+
},
328+
{
329+
name: "source2",
330+
fetchPages: async () => [
331+
{
332+
url: "test2.com",
333+
format: "html",
334+
sourceName: "source2",
335+
body: "hello source 2",
336+
},
337+
],
338+
},
339+
];
340+
const mockDataSourceNames = mockDataSources.map(
341+
(dataSource) => dataSource.name
342+
);
343+
beforeAll(async () => {
344+
mongod = await MongoMemoryReplSet.create();
345+
uri = mongod.getUri();
346+
mongoClient = new MongoClient(uri);
347+
await mongoClient.connect();
348+
});
349+
beforeEach(async () => {
350+
// setup mongo client, page store, and embedded content store
351+
databaseName = "test-all-command";
352+
embedStore = makeMongoDbEmbeddedContentStore({
353+
connectionUri: uri,
354+
databaseName,
355+
searchIndex: { embeddingName: "test-embedding" },
356+
});
357+
pageStore = makeMongoDbPageStore({
358+
connectionUri: uri,
359+
databaseName,
360+
});
361+
// create pages and verify that they have been created
362+
await updatePages({ sources: mockDataSources, pageStore });
363+
pages = await pageStore.loadPages();
364+
assert(pages.length == 2);
365+
// create embeddings for the pages and verify that they have been created
366+
await updateEmbeddedContent({
367+
since: new Date(0),
368+
embeddedContentStore: embedStore,
369+
pageStore,
370+
sourceNames: mockDataSourceNames,
371+
embedder,
372+
});
373+
page1Embedding = await embedStore.loadEmbeddedContent({
374+
page: pages[0],
375+
});
376+
page2Embedding = await embedStore.loadEmbeddedContent({
377+
page: pages[1],
378+
});
379+
assert(page1Embedding.length);
380+
assert(page2Embedding.length);
381+
});
382+
383+
afterEach(async () => {
384+
await pageStore?.drop();
385+
await embedStore?.drop();
386+
});
387+
afterAll(async () => {
388+
await pageStore?.close();
389+
await embedStore?.close();
390+
await mongoClient?.close();
391+
await mongod?.stop();
392+
});
393+
394+
it("should update embedded content only for pages that have been updated (copy change) after the 'since' date provided", async () => {
395+
// Modify dates of pages and embedded content for testing
396+
const sinceDate = new Date("2024-01-01");
397+
const beforeSinceDate = new Date("2023-01-01");
398+
const afterSinceDate = new Date("2025-01-01");
399+
// set pages[0] to be last updated before sinceDate (should not be modified)
400+
await mongoClient
401+
.db(databaseName)
402+
.collection("pages")
403+
.updateOne({ ...pages[0] }, { $set: { updated: beforeSinceDate } });
404+
await mongoClient
405+
.db(databaseName)
406+
.collection("embedded_content")
407+
.updateOne(
408+
{ sourceName: mockDataSourceNames[0] },
409+
{ $set: { updated: beforeSinceDate } }
410+
);
411+
// set pages[1] to be last updated after sinceDate (should be re-chunked)
412+
await mongoClient
413+
.db(databaseName)
414+
.collection("pages")
415+
.updateOne({ ...pages[1] }, { $set: { updated: afterSinceDate } });
416+
await mongoClient
417+
.db(databaseName)
418+
.collection("embedded_content")
419+
.updateOne(
420+
{ sourceName: mockDataSourceNames[1] },
421+
{ $set: { updated: afterSinceDate } }
422+
);
423+
const originalPage1Embedding = await embedStore.loadEmbeddedContent({
424+
page: pages[0],
425+
});
426+
const originalPage2Embedding = await embedStore.loadEmbeddedContent({
427+
page: pages[1],
428+
});
429+
await updateEmbeddedContent({
430+
since: sinceDate,
431+
embeddedContentStore: embedStore,
432+
pageStore,
433+
sourceNames: mockDataSourceNames,
434+
embedder,
435+
});
436+
const updatedPage1Embedding = await embedStore.loadEmbeddedContent({
437+
page: pages[0],
438+
});
439+
const updatedPage2Embedding = await embedStore.loadEmbeddedContent({
440+
page: pages[1],
441+
});
442+
assert(updatedPage1Embedding.length);
443+
assert(updatedPage2Embedding.length);
444+
expect(updatedPage1Embedding[0].updated.getTime()).toBe(
445+
originalPage1Embedding[0].updated.getTime()
446+
);
447+
expect(updatedPage2Embedding[0].updated.getTime()).not.toBe(
448+
originalPage2Embedding[0].updated.getTime()
449+
);
450+
});
451+
it("should update embedded content when only chunk algo has changed", async () => {
452+
// change the chunking algo for the second page, but not the first
453+
await updateEmbeddedContent({
454+
since: new Date(),
455+
embeddedContentStore: embedStore,
456+
pageStore,
457+
sourceNames: [mockDataSourceNames[0]],
458+
embedder,
459+
});
460+
await updateEmbeddedContent({
461+
since: new Date(),
462+
embeddedContentStore: embedStore,
463+
pageStore,
464+
sourceNames: [mockDataSourceNames[1]],
465+
embedder,
466+
chunkOptions: { chunkOverlap: 2 },
467+
});
468+
const updatedPage1Embedding = await embedStore.loadEmbeddedContent({
469+
page: pages[0],
470+
});
471+
const updatedPage2Embedding = await embedStore.loadEmbeddedContent({
472+
page: pages[1],
473+
});
474+
assert(updatedPage1Embedding.length);
475+
assert(updatedPage2Embedding.length);
476+
expect(updatedPage1Embedding[0].chunkAlgoHash).toBe(
477+
page1Embedding[0].chunkAlgoHash
478+
);
479+
expect(updatedPage2Embedding[0].chunkAlgoHash).not.toBe(
480+
page2Embedding[0].chunkAlgoHash
481+
);
482+
});
483+
it("should update embedded content when either chunk algo has changed or copy has changed", async () => {
484+
// SETUP: Modify dates of pages and embedded content for this test case
485+
const sinceDate = new Date("2024-01-01");
486+
const afterSinceDate = new Date("2025-01-01");
487+
await mongoClient
488+
.db(databaseName)
489+
.collection("pages")
490+
.updateOne({ ...pages[0] }, { $set: { updated: afterSinceDate } });
491+
await mongoClient
492+
.db(databaseName)
493+
.collection("embedded_content")
494+
.updateOne(
495+
{ sourceName: mockDataSourceNames[0] },
496+
{ $set: { updated: afterSinceDate } }
497+
);
498+
const originalPage1Embedding = await embedStore.loadEmbeddedContent({
499+
page: pages[0],
500+
});
501+
// END SETUP
502+
await updateEmbeddedContent({
503+
since: sinceDate,
504+
embeddedContentStore: embedStore,
505+
pageStore,
506+
sourceNames: mockDataSourceNames,
507+
embedder,
508+
chunkOptions: { chunkOverlap: 2 },
509+
});
510+
const updatedPage1Embedding = await embedStore.loadEmbeddedContent({
511+
page: pages[0],
512+
});
513+
const updatedPage2Embedding = await embedStore.loadEmbeddedContent({
514+
page: pages[1],
515+
});
516+
assert(updatedPage1Embedding.length);
517+
assert(updatedPage2Embedding.length);
518+
// both pages should be updated
519+
expect(updatedPage1Embedding[0].chunkAlgoHash).not.toBe(
520+
originalPage1Embedding[0].chunkAlgoHash
521+
);
522+
expect(updatedPage2Embedding[0].chunkAlgoHash).not.toBe(
523+
page2Embedding[0].chunkAlgoHash
524+
);
525+
});
526+
});

0 commit comments

Comments
 (0)