From 057f3fe0abd66af77c8c54d0a144838ada2523c6 Mon Sep 17 00:00:00 2001 From: Alonso Guevara Date: Wed, 11 Sep 2024 14:19:05 -0600 Subject: [PATCH 1/6] Calculate new inputs and deleted inputs on update --- graphrag/index/run/run.py | 63 +++++++++++++++++++++---------- graphrag/index/update/__init__.py | 4 ++ graphrag/index/update/files.py | 37 ++++++++++++++++++ 3 files changed, 85 insertions(+), 19 deletions(-) create mode 100644 graphrag/index/update/__init__.py create mode 100644 graphrag/index/update/files.py diff --git a/graphrag/index/run/run.py b/graphrag/index/run/run.py index 55ce71eac6..0775724fda 100644 --- a/graphrag/index/run/run.py +++ b/graphrag/index/run/run.py @@ -8,6 +8,7 @@ import time import traceback from collections.abc import AsyncIterable +from pathlib import Path from typing import cast import pandas as pd @@ -46,13 +47,14 @@ from graphrag.index.typing import PipelineRunResult # Register all verbs +from graphrag.index.update.files import get_delta_docs from graphrag.index.verbs import * # noqa from graphrag.index.workflows import ( VerbDefinitions, WorkflowDefinitions, load_workflows, ) -from graphrag.utils.storage import _create_storage +from graphrag.utils.storage import _create_storage, _load_table_from_storage log = logging.getLogger(__name__) @@ -111,9 +113,6 @@ async def run_pipeline_with_config( else await _create_input(config.input, progress_reporter, root_dir) ) - if is_update_run: - # TODO: Filter dataset to only include new data (this should be done in the input module) - pass post_process_steps = input_post_process_steps or _create_postprocess_steps( config.input ) @@ -123,21 +122,47 @@ async def run_pipeline_with_config( msg = "No dataset provided!" raise ValueError(msg) - async for table in run_pipeline( - workflows=workflows, - dataset=dataset, - storage=storage, - cache=cache, - callbacks=callbacks, - input_post_process_steps=post_process_steps, - memory_profile=memory_profile, - additional_verbs=additional_verbs, - additional_workflows=additional_workflows, - progress_reporter=progress_reporter, - emit=emit, - is_resume_run=is_resume_run, - ): - yield table + if is_update_run: + + delta_docs = await get_delta_docs(dataset, storage) + new_docs_dataset = delta_docs.new_inputs + + delta_storage = storage.child("delta") + + # Run the pipeline on the new documents + async for table in run_pipeline( + workflows=workflows, + dataset=new_docs_dataset, + storage=delta_storage, + cache=cache, + callbacks=callbacks, + input_post_process_steps=post_process_steps, + memory_profile=memory_profile, + additional_verbs=additional_verbs, + additional_workflows=additional_workflows, + progress_reporter=progress_reporter, + emit=emit, + is_resume_run=False, + ): + yield table + + else: + + async for table in run_pipeline( + workflows=workflows, + dataset=dataset, + storage=storage, + cache=cache, + callbacks=callbacks, + input_post_process_steps=post_process_steps, + memory_profile=memory_profile, + additional_verbs=additional_verbs, + additional_workflows=additional_workflows, + progress_reporter=progress_reporter, + emit=emit, + is_resume_run=is_resume_run, + ): + yield table async def run_pipeline( diff --git a/graphrag/index/update/__init__.py b/graphrag/index/update/__init__.py new file mode 100644 index 0000000000..e6966408fb --- /dev/null +++ b/graphrag/index/update/__init__.py @@ -0,0 +1,4 @@ +# Copyright (c) 2024 Microsoft Corporation. +# Licensed under the MIT License + +"""Incremental Indexing main module definition.""" diff --git a/graphrag/index/update/files.py b/graphrag/index/update/files.py new file mode 100644 index 0000000000..620ad2ef55 --- /dev/null +++ b/graphrag/index/update/files.py @@ -0,0 +1,37 @@ +# Copyright (c) 2024 Microsoft Corporation. +# Licensed under the MIT License + +"""File management and utils for Incremental Indexing.""" + +from typing import NamedTuple + +import pandas as pd + +from graphrag.index.config.pipeline import PipelineConfig +from graphrag.index.storage.typing import PipelineStorage +from graphrag.utils.storage import _load_table_from_storage + + +# Output delta named tuple +class InputDelta(NamedTuple): + """Named tuple for output delta.""" + + new_inputs: pd.DataFrame + deleted_inputs: pd.DataFrame + + +async def get_delta_docs( + input_dataset: pd.DataFrame, storage: PipelineStorage +) -> InputDelta: + final_docs = await _load_table_from_storage( + "create_final_documents.parquet", storage + ) + + # Select distinct title from final docs and from dataset + previous_docs = final_docs["title"].unique() + dataset_docs = input_dataset["title"].unique() + + # Get the new documents + new_docs = input_dataset[~input_dataset["title"].isin(previous_docs)] + deleted_docs = final_docs[~final_docs["title"].isin(dataset_docs)] + return InputDelta(new_docs, deleted_docs) From 91418d5b9cca96252c0df144c87261cf5c0cbb51 Mon Sep 17 00:00:00 2001 From: Alonso Guevara Date: Wed, 11 Sep 2024 14:19:50 -0600 Subject: [PATCH 2/6] Semver --- .semversioner/next-release/patch-20240911201935470388.json | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 .semversioner/next-release/patch-20240911201935470388.json diff --git a/.semversioner/next-release/patch-20240911201935470388.json b/.semversioner/next-release/patch-20240911201935470388.json new file mode 100644 index 0000000000..36b24ff432 --- /dev/null +++ b/.semversioner/next-release/patch-20240911201935470388.json @@ -0,0 +1,4 @@ +{ + "type": "patch", + "description": "Calculate new inputs and deleted inputs on update" +} From 039d9a57ef645af8a7ca653b897db400bb1321c5 Mon Sep 17 00:00:00 2001 From: Alonso Guevara Date: Wed, 11 Sep 2024 14:28:18 -0600 Subject: [PATCH 3/6] Clear ruff checks --- graphrag/index/cli.py | 2 +- graphrag/index/run/run.py | 5 +---- graphrag/index/update/files.py | 15 ++++++++++++++- 3 files changed, 16 insertions(+), 6 deletions(-) diff --git a/graphrag/index/cli.py b/graphrag/index/cli.py index 869c642ad8..4e99f8e7bf 100644 --- a/graphrag/index/cli.py +++ b/graphrag/index/cli.py @@ -13,7 +13,7 @@ from graphrag.config import CacheType, enable_logging_with_config, load_config -from .api import build_index, update_index +from .api import build_index from .graph.extractors.claims.prompts import CLAIM_EXTRACTION_PROMPT from .graph.extractors.community_reports.prompts import COMMUNITY_REPORT_PROMPT from .graph.extractors.graph.prompts import GRAPH_EXTRACTION_PROMPT diff --git a/graphrag/index/run/run.py b/graphrag/index/run/run.py index 0775724fda..8cd8e3477b 100644 --- a/graphrag/index/run/run.py +++ b/graphrag/index/run/run.py @@ -8,7 +8,6 @@ import time import traceback from collections.abc import AsyncIterable -from pathlib import Path from typing import cast import pandas as pd @@ -54,7 +53,7 @@ WorkflowDefinitions, load_workflows, ) -from graphrag.utils.storage import _create_storage, _load_table_from_storage +from graphrag.utils.storage import _create_storage log = logging.getLogger(__name__) @@ -123,7 +122,6 @@ async def run_pipeline_with_config( raise ValueError(msg) if is_update_run: - delta_docs = await get_delta_docs(dataset, storage) new_docs_dataset = delta_docs.new_inputs @@ -147,7 +145,6 @@ async def run_pipeline_with_config( yield table else: - async for table in run_pipeline( workflows=workflows, dataset=dataset, diff --git a/graphrag/index/update/files.py b/graphrag/index/update/files.py index 620ad2ef55..54b644f465 100644 --- a/graphrag/index/update/files.py +++ b/graphrag/index/update/files.py @@ -7,7 +7,6 @@ import pandas as pd -from graphrag.index.config.pipeline import PipelineConfig from graphrag.index.storage.typing import PipelineStorage from graphrag.utils.storage import _load_table_from_storage @@ -23,6 +22,20 @@ class InputDelta(NamedTuple): async def get_delta_docs( input_dataset: pd.DataFrame, storage: PipelineStorage ) -> InputDelta: + """Get the delta between the input dataset and the final documents. + + Parameters + ---------- + input_dataset : pd.DataFrame + The input dataset. + storage : PipelineStorage + The Pipeline storage. + + Returns + ------- + InputDelta + The input delta. With new inputs and deleted inputs. + """ final_docs = await _load_table_from_storage( "create_final_documents.parquet", storage ) From 3e7f7328f90f416231d7356c0ffa6db2236225cc Mon Sep 17 00:00:00 2001 From: Alonso Guevara Date: Wed, 11 Sep 2024 14:45:10 -0600 Subject: [PATCH 4/6] Fix pyright --- graphrag/index/update/files.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/graphrag/index/update/files.py b/graphrag/index/update/files.py index 54b644f465..b506e065dd 100644 --- a/graphrag/index/update/files.py +++ b/graphrag/index/update/files.py @@ -41,8 +41,8 @@ async def get_delta_docs( ) # Select distinct title from final docs and from dataset - previous_docs = final_docs["title"].unique() - dataset_docs = input_dataset["title"].unique() + previous_docs: list[str] = final_docs["title"].unique().tolist() + dataset_docs: list[str] = input_dataset["title"].unique().tolist() # Get the new documents new_docs = input_dataset[~input_dataset["title"].isin(previous_docs)] From d7474f97676008b508bc97664a81933afba337e7 Mon Sep 17 00:00:00 2001 From: Alonso Guevara Date: Wed, 11 Sep 2024 15:07:02 -0600 Subject: [PATCH 5/6] Fix PyRight --- graphrag/index/run/run.py | 5 ++--- graphrag/index/update/files.py | 26 +++++++++++++++++++------- 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/graphrag/index/run/run.py b/graphrag/index/run/run.py index 8cd8e3477b..d54e27c3a9 100644 --- a/graphrag/index/run/run.py +++ b/graphrag/index/run/run.py @@ -122,15 +122,14 @@ async def run_pipeline_with_config( raise ValueError(msg) if is_update_run: - delta_docs = await get_delta_docs(dataset, storage) - new_docs_dataset = delta_docs.new_inputs + delta_dataset = await get_delta_docs(dataset, storage) delta_storage = storage.child("delta") # Run the pipeline on the new documents async for table in run_pipeline( workflows=workflows, - dataset=new_docs_dataset, + dataset=delta_dataset.new_inputs, storage=delta_storage, cache=cache, callbacks=callbacks, diff --git a/graphrag/index/update/files.py b/graphrag/index/update/files.py index b506e065dd..1787721b8e 100644 --- a/graphrag/index/update/files.py +++ b/graphrag/index/update/files.py @@ -3,7 +3,8 @@ """File management and utils for Incremental Indexing.""" -from typing import NamedTuple +from collections import namedtuple +from dataclasses import dataclass import pandas as pd @@ -11,9 +12,17 @@ from graphrag.utils.storage import _load_table_from_storage -# Output delta named tuple -class InputDelta(NamedTuple): - """Named tuple for output delta.""" +@dataclass +class InputDelta: + """Dataclass to hold the input delta. + + Attributes + ---------- + new_inputs : pd.DataFrame + The new inputs. + deleted_inputs : pd.DataFrame + The deleted inputs. + """ new_inputs: pd.DataFrame deleted_inputs: pd.DataFrame @@ -44,7 +53,10 @@ async def get_delta_docs( previous_docs: list[str] = final_docs["title"].unique().tolist() dataset_docs: list[str] = input_dataset["title"].unique().tolist() - # Get the new documents - new_docs = input_dataset[~input_dataset["title"].isin(previous_docs)] - deleted_docs = final_docs[~final_docs["title"].isin(dataset_docs)] + # Get the new documents (using loc to ensure DataFrame) + new_docs = input_dataset.loc[~input_dataset["title"].isin(previous_docs)] + + # Get the deleted documents (again using loc to ensure DataFrame) + deleted_docs = final_docs.loc[~final_docs["title"].isin(dataset_docs)] + return InputDelta(new_docs, deleted_docs) From 35a50079af454f407db1251e98162ab042f758ed Mon Sep 17 00:00:00 2001 From: Alonso Guevara Date: Wed, 11 Sep 2024 15:10:11 -0600 Subject: [PATCH 6/6] Ruff again --- graphrag/index/update/files.py | 1 - 1 file changed, 1 deletion(-) diff --git a/graphrag/index/update/files.py b/graphrag/index/update/files.py index 1787721b8e..2807fe8104 100644 --- a/graphrag/index/update/files.py +++ b/graphrag/index/update/files.py @@ -3,7 +3,6 @@ """File management and utils for Incremental Indexing.""" -from collections import namedtuple from dataclasses import dataclass import pandas as pd