Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .semversioner/next-release/patch-20240911201935470388.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"type": "patch",
"description": "Calculate new inputs and deleted inputs on update"
}
2 changes: 1 addition & 1 deletion graphrag/index/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from graphrag.config import CacheType, enable_logging_with_config, load_config

from .api import build_index, update_index
from .api import build_index
from .graph.extractors.claims.prompts import CLAIM_EXTRACTION_PROMPT
from .graph.extractors.community_reports.prompts import COMMUNITY_REPORT_PROMPT
from .graph.extractors.graph.prompts import GRAPH_EXTRACTION_PROMPT
Expand Down
58 changes: 40 additions & 18 deletions graphrag/index/run/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from graphrag.index.typing import PipelineRunResult

# Register all verbs
from graphrag.index.update.files import get_delta_docs
from graphrag.index.verbs import * # noqa
from graphrag.index.workflows import (
VerbDefinitions,
Expand Down Expand Up @@ -111,9 +112,6 @@ async def run_pipeline_with_config(
else await _create_input(config.input, progress_reporter, root_dir)
)

if is_update_run:
# TODO: Filter dataset to only include new data (this should be done in the input module)
pass
post_process_steps = input_post_process_steps or _create_postprocess_steps(
config.input
)
Expand All @@ -123,21 +121,45 @@ async def run_pipeline_with_config(
msg = "No dataset provided!"
raise ValueError(msg)

async for table in run_pipeline(
workflows=workflows,
dataset=dataset,
storage=storage,
cache=cache,
callbacks=callbacks,
input_post_process_steps=post_process_steps,
memory_profile=memory_profile,
additional_verbs=additional_verbs,
additional_workflows=additional_workflows,
progress_reporter=progress_reporter,
emit=emit,
is_resume_run=is_resume_run,
):
yield table
if is_update_run:
delta_docs = await get_delta_docs(dataset, storage)
new_docs_dataset = delta_docs.new_inputs

delta_storage = storage.child("delta")

# Run the pipeline on the new documents
async for table in run_pipeline(
workflows=workflows,
dataset=new_docs_dataset,
storage=delta_storage,
cache=cache,
callbacks=callbacks,
input_post_process_steps=post_process_steps,
memory_profile=memory_profile,
additional_verbs=additional_verbs,
additional_workflows=additional_workflows,
progress_reporter=progress_reporter,
emit=emit,
is_resume_run=False,
):
yield table

else:
async for table in run_pipeline(
workflows=workflows,
dataset=dataset,
storage=storage,
cache=cache,
callbacks=callbacks,
input_post_process_steps=post_process_steps,
memory_profile=memory_profile,
additional_verbs=additional_verbs,
additional_workflows=additional_workflows,
progress_reporter=progress_reporter,
emit=emit,
is_resume_run=is_resume_run,
):
yield table


async def run_pipeline(
Expand Down
4 changes: 4 additions & 0 deletions graphrag/index/update/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License

"""Incremental Indexing main module definition."""
50 changes: 50 additions & 0 deletions graphrag/index/update/files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License

"""File management and utils for Incremental Indexing."""

from typing import NamedTuple

import pandas as pd

from graphrag.index.storage.typing import PipelineStorage
from graphrag.utils.storage import _load_table_from_storage


# Output delta named tuple
class InputDelta(NamedTuple):
"""Named tuple for output delta."""

new_inputs: pd.DataFrame
deleted_inputs: pd.DataFrame


async def get_delta_docs(
input_dataset: pd.DataFrame, storage: PipelineStorage
) -> InputDelta:
"""Get the delta between the input dataset and the final documents.

Parameters
----------
input_dataset : pd.DataFrame
The input dataset.
storage : PipelineStorage
The Pipeline storage.

Returns
-------
InputDelta
The input delta. With new inputs and deleted inputs.
"""
final_docs = await _load_table_from_storage(
"create_final_documents.parquet", storage
)

# Select distinct title from final docs and from dataset
previous_docs = final_docs["title"].unique()
dataset_docs = input_dataset["title"].unique()

# Get the new documents
new_docs = input_dataset[~input_dataset["title"].isin(previous_docs)]
deleted_docs = final_docs[~final_docs["title"].isin(dataset_docs)]
return InputDelta(new_docs, deleted_docs)
Loading