diff --git a/.semversioner/next-release/patch-20240911201935470388.json b/.semversioner/next-release/patch-20240911201935470388.json new file mode 100644 index 0000000000..36b24ff432 --- /dev/null +++ b/.semversioner/next-release/patch-20240911201935470388.json @@ -0,0 +1,4 @@ +{ + "type": "patch", + "description": "Calculate new inputs and deleted inputs on update" +} diff --git a/graphrag/index/cli.py b/graphrag/index/cli.py index 869c642ad8..4e99f8e7bf 100644 --- a/graphrag/index/cli.py +++ b/graphrag/index/cli.py @@ -13,7 +13,7 @@ from graphrag.config import CacheType, enable_logging_with_config, load_config -from .api import build_index, update_index +from .api import build_index from .graph.extractors.claims.prompts import CLAIM_EXTRACTION_PROMPT from .graph.extractors.community_reports.prompts import COMMUNITY_REPORT_PROMPT from .graph.extractors.graph.prompts import GRAPH_EXTRACTION_PROMPT diff --git a/graphrag/index/run/run.py b/graphrag/index/run/run.py index 55ce71eac6..d54e27c3a9 100644 --- a/graphrag/index/run/run.py +++ b/graphrag/index/run/run.py @@ -46,6 +46,7 @@ from graphrag.index.typing import PipelineRunResult # Register all verbs +from graphrag.index.update.files import get_delta_docs from graphrag.index.verbs import * # noqa from graphrag.index.workflows import ( VerbDefinitions, @@ -111,9 +112,6 @@ async def run_pipeline_with_config( else await _create_input(config.input, progress_reporter, root_dir) ) - if is_update_run: - # TODO: Filter dataset to only include new data (this should be done in the input module) - pass post_process_steps = input_post_process_steps or _create_postprocess_steps( config.input ) @@ -123,21 +121,44 @@ async def run_pipeline_with_config( msg = "No dataset provided!" raise ValueError(msg) - async for table in run_pipeline( - workflows=workflows, - dataset=dataset, - storage=storage, - cache=cache, - callbacks=callbacks, - input_post_process_steps=post_process_steps, - memory_profile=memory_profile, - additional_verbs=additional_verbs, - additional_workflows=additional_workflows, - progress_reporter=progress_reporter, - emit=emit, - is_resume_run=is_resume_run, - ): - yield table + if is_update_run: + delta_dataset = await get_delta_docs(dataset, storage) + + delta_storage = storage.child("delta") + + # Run the pipeline on the new documents + async for table in run_pipeline( + workflows=workflows, + dataset=delta_dataset.new_inputs, + storage=delta_storage, + cache=cache, + callbacks=callbacks, + input_post_process_steps=post_process_steps, + memory_profile=memory_profile, + additional_verbs=additional_verbs, + additional_workflows=additional_workflows, + progress_reporter=progress_reporter, + emit=emit, + is_resume_run=False, + ): + yield table + + else: + async for table in run_pipeline( + workflows=workflows, + dataset=dataset, + storage=storage, + cache=cache, + callbacks=callbacks, + input_post_process_steps=post_process_steps, + memory_profile=memory_profile, + additional_verbs=additional_verbs, + additional_workflows=additional_workflows, + progress_reporter=progress_reporter, + emit=emit, + is_resume_run=is_resume_run, + ): + yield table async def run_pipeline( diff --git a/graphrag/index/update/__init__.py b/graphrag/index/update/__init__.py new file mode 100644 index 0000000000..e6966408fb --- /dev/null +++ b/graphrag/index/update/__init__.py @@ -0,0 +1,4 @@ +# Copyright (c) 2024 Microsoft Corporation. +# Licensed under the MIT License + +"""Incremental Indexing main module definition.""" diff --git a/graphrag/index/update/files.py b/graphrag/index/update/files.py new file mode 100644 index 0000000000..2807fe8104 --- /dev/null +++ b/graphrag/index/update/files.py @@ -0,0 +1,61 @@ +# Copyright (c) 2024 Microsoft Corporation. +# Licensed under the MIT License + +"""File management and utils for Incremental Indexing.""" + +from dataclasses import dataclass + +import pandas as pd + +from graphrag.index.storage.typing import PipelineStorage +from graphrag.utils.storage import _load_table_from_storage + + +@dataclass +class InputDelta: + """Dataclass to hold the input delta. + + Attributes + ---------- + new_inputs : pd.DataFrame + The new inputs. + deleted_inputs : pd.DataFrame + The deleted inputs. + """ + + new_inputs: pd.DataFrame + deleted_inputs: pd.DataFrame + + +async def get_delta_docs( + input_dataset: pd.DataFrame, storage: PipelineStorage +) -> InputDelta: + """Get the delta between the input dataset and the final documents. + + Parameters + ---------- + input_dataset : pd.DataFrame + The input dataset. + storage : PipelineStorage + The Pipeline storage. + + Returns + ------- + InputDelta + The input delta. With new inputs and deleted inputs. + """ + final_docs = await _load_table_from_storage( + "create_final_documents.parquet", storage + ) + + # Select distinct title from final docs and from dataset + previous_docs: list[str] = final_docs["title"].unique().tolist() + dataset_docs: list[str] = input_dataset["title"].unique().tolist() + + # Get the new documents (using loc to ensure DataFrame) + new_docs = input_dataset.loc[~input_dataset["title"].isin(previous_docs)] + + # Get the deleted documents (again using loc to ensure DataFrame) + deleted_docs = final_docs.loc[~final_docs["title"].isin(dataset_docs)] + + return InputDelta(new_docs, deleted_docs)