Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .semversioner/next-release/patch-20240911201935470388.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"type": "patch",
"description": "Calculate new inputs and deleted inputs on update"
}
2 changes: 1 addition & 1 deletion graphrag/index/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from graphrag.config import CacheType, enable_logging_with_config, load_config

from .api import build_index, update_index
from .api import build_index
from .graph.extractors.claims.prompts import CLAIM_EXTRACTION_PROMPT
from .graph.extractors.community_reports.prompts import COMMUNITY_REPORT_PROMPT
from .graph.extractors.graph.prompts import GRAPH_EXTRACTION_PROMPT
Expand Down
57 changes: 39 additions & 18 deletions graphrag/index/run/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from graphrag.index.typing import PipelineRunResult

# Register all verbs
from graphrag.index.update.files import get_delta_docs
from graphrag.index.verbs import * # noqa
from graphrag.index.workflows import (
VerbDefinitions,
Expand Down Expand Up @@ -111,9 +112,6 @@ async def run_pipeline_with_config(
else await _create_input(config.input, progress_reporter, root_dir)
)

if is_update_run:
# TODO: Filter dataset to only include new data (this should be done in the input module)
pass
post_process_steps = input_post_process_steps or _create_postprocess_steps(
config.input
)
Expand All @@ -123,21 +121,44 @@ async def run_pipeline_with_config(
msg = "No dataset provided!"
raise ValueError(msg)

async for table in run_pipeline(
workflows=workflows,
dataset=dataset,
storage=storage,
cache=cache,
callbacks=callbacks,
input_post_process_steps=post_process_steps,
memory_profile=memory_profile,
additional_verbs=additional_verbs,
additional_workflows=additional_workflows,
progress_reporter=progress_reporter,
emit=emit,
is_resume_run=is_resume_run,
):
yield table
if is_update_run:
delta_dataset = await get_delta_docs(dataset, storage)

delta_storage = storage.child("delta")

# Run the pipeline on the new documents
async for table in run_pipeline(
workflows=workflows,
dataset=delta_dataset.new_inputs,
storage=delta_storage,
cache=cache,
callbacks=callbacks,
input_post_process_steps=post_process_steps,
memory_profile=memory_profile,
additional_verbs=additional_verbs,
additional_workflows=additional_workflows,
progress_reporter=progress_reporter,
emit=emit,
is_resume_run=False,
):
yield table

else:
async for table in run_pipeline(
workflows=workflows,
dataset=dataset,
storage=storage,
cache=cache,
callbacks=callbacks,
input_post_process_steps=post_process_steps,
memory_profile=memory_profile,
additional_verbs=additional_verbs,
additional_workflows=additional_workflows,
progress_reporter=progress_reporter,
emit=emit,
is_resume_run=is_resume_run,
):
yield table


async def run_pipeline(
Expand Down
4 changes: 4 additions & 0 deletions graphrag/index/update/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License

"""Incremental Indexing main module definition."""
61 changes: 61 additions & 0 deletions graphrag/index/update/files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License

"""File management and utils for Incremental Indexing."""

from dataclasses import dataclass

import pandas as pd

from graphrag.index.storage.typing import PipelineStorage
from graphrag.utils.storage import _load_table_from_storage


@dataclass
class InputDelta:
"""Dataclass to hold the input delta.

Attributes
----------
new_inputs : pd.DataFrame
The new inputs.
deleted_inputs : pd.DataFrame
The deleted inputs.
"""

new_inputs: pd.DataFrame
deleted_inputs: pd.DataFrame


async def get_delta_docs(
input_dataset: pd.DataFrame, storage: PipelineStorage
) -> InputDelta:
"""Get the delta between the input dataset and the final documents.

Parameters
----------
input_dataset : pd.DataFrame
The input dataset.
storage : PipelineStorage
The Pipeline storage.

Returns
-------
InputDelta
The input delta. With new inputs and deleted inputs.
"""
final_docs = await _load_table_from_storage(
"create_final_documents.parquet", storage
)

# Select distinct title from final docs and from dataset
previous_docs: list[str] = final_docs["title"].unique().tolist()
dataset_docs: list[str] = input_dataset["title"].unique().tolist()

# Get the new documents (using loc to ensure DataFrame)
new_docs = input_dataset.loc[~input_dataset["title"].isin(previous_docs)]

# Get the deleted documents (again using loc to ensure DataFrame)
deleted_docs = final_docs.loc[~final_docs["title"].isin(dataset_docs)]

return InputDelta(new_docs, deleted_docs)
Loading