Skip to content

Expose context from the orchestrator to the components #301

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Mar 20, 2025
Merged
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,19 @@

## Next

### Added

- Added the `run_with_context` method to `Component`. This method includes a `context_` parameter, which provides information about the pipeline from which the component is executed (e.g., the `run_id`). It also enables the component to send events to the pipeline's callback function.


## 1.6.0

### Added

- Added optional schema enforcement as a validation layer after entity and relation extraction.
- Introduced a linear hybrid search ranker for HybridRetriever and HybridCypherRetriever, allowing customizable ranking with an `alpha` parameter.
- Introduced SearchQueryParseError for handling invalid Lucene query strings in HybridRetriever and HybridCypherRetriever.
- Components can now be called with the `run_with_context` method that gets an extra `context_` argument containing information about the pipeline it's run from: the `run_id`, `task_name` and a `notify` function that can be used to send `TASK_PROGRESS` events to the same callback as the pipeline events.

### Fixed

Expand Down
8 changes: 6 additions & 2 deletions docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,18 @@ API Documentation
Components
**********

Component
=========

.. autoclass:: neo4j_graphrag.experimental.pipeline.component.Component
:members: run, run_with_context

DataLoader
==========

.. autoclass:: neo4j_graphrag.experimental.components.pdf_loader.DataLoader
:members: run, get_document_metadata


PdfLoader
=========

Expand Down Expand Up @@ -59,7 +64,6 @@ LexicalGraphBuilder
:members:
:exclude-members: component_inputs, component_outputs


Neo4jChunkReader
================

Expand Down
8 changes: 4 additions & 4 deletions docs/source/types.rst
Original file line number Diff line number Diff line change
Expand Up @@ -158,22 +158,22 @@ ParamFromEnvConfig
EventType
=========

.. autoenum:: neo4j_graphrag.experimental.pipeline.types.EventType
.. autoenum:: neo4j_graphrag.experimental.pipeline.notification.EventType


PipelineEvent
==============

.. autoclass:: neo4j_graphrag.experimental.pipeline.types.PipelineEvent
.. autoclass:: neo4j_graphrag.experimental.pipeline.notification.PipelineEvent

TaskEvent
==============

.. autoclass:: neo4j_graphrag.experimental.pipeline.types.TaskEvent
.. autoclass:: neo4j_graphrag.experimental.pipeline.notification.TaskEvent


EventCallbackProtocol
=====================

.. autoclass:: neo4j_graphrag.experimental.pipeline.types.EventCallbackProtocol
.. autoclass:: neo4j_graphrag.experimental.pipeline.notification.EventCallbackProtocol
:members: __call__
36 changes: 34 additions & 2 deletions docs/source/user_guide_pipeline.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ their own by following these steps:

1. Create a subclass of the Pydantic `neo4j_graphrag.experimental.pipeline.DataModel` to represent the data being returned by the component
2. Create a subclass of `neo4j_graphrag.experimental.pipeline.Component`
3. Create a run method in this new class and specify the required inputs and output model using the just created `DataModel`
3. Create a `run_with_context` method in this new class and specify the required inputs and output model using the just created `DataModel`
4. Implement the run method: it's an `async` method, allowing tasks to be parallelized and awaited within this method.

An example is given below, where a `ComponentAdd` is created to add two numbers together and return
Expand All @@ -31,12 +31,13 @@ the resulting sum:
.. code:: python

from neo4j_graphrag.experimental.pipeline import Component, DataModel
from neo4j_graphrag.experimental.pipeline.types.context import RunContext

class IntResultModel(DataModel):
result: int

class ComponentAdd(Component):
async def run(self, number1: int, number2: int = 1) -> IntResultModel:
async def run_with_context(self, context_: RunContext, number1: int, number2: int = 1) -> IntResultModel:
return IntResultModel(result = number1 + number2)

Read more about :ref:`components-section` in the API Documentation.
Expand Down Expand Up @@ -141,6 +142,7 @@ It is possible to add a callback to receive notification about pipeline progress
- `PIPELINE_STARTED`, when pipeline starts
- `PIPELINE_FINISHED`, when pipeline ends
- `TASK_STARTED`, when a task starts
- `TASK_PROGRESS`, sent by each component (depends on component's implementation, see below)
- `TASK_FINISHED`, when a task ends


Expand Down Expand Up @@ -172,3 +174,33 @@ See :ref:`pipelineevent` and :ref:`taskevent` to see what is sent in each event
# ... add components, connect them as usual

await pipeline.run(...)


Send Events from Components
===========================

Components can send progress notifications using the `notify` function from
`context_` by implementing the `run_from_context` method:

.. code:: python

from neo4j_graphrag.experimental.pipeline import Component, DataModel
from neo4j_graphrag.experimental.pipeline.types.context import RunContext

class IntResultModel(DataModel):
result: int

class ComponentAdd(Component):
async def run_with_context(self, context_: RunContext, number1: int, number2: int = 1) -> IntResultModel:
for fake_iteration in range(10):
await context_.notify(
message=f"Starting iteration {fake_iteration} out of 10",
data={"iteration": fake_iteration, "total": 10}
)
return IntResultModel(result = number1 + number2)

This will send an `TASK_PROGRESS` event to the pipeline callback.

.. note::

In a future release, the `context_` parameter will be added to the `run` method.
1 change: 1 addition & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ are listed in [the last section of this file](#customize).
- [Export lexical graph creation into another pipeline](./customize/build_graph/pipeline/text_to_lexical_graph_to_entity_graph_two_pipelines.py)
- [Build pipeline from config file](customize/build_graph/pipeline/from_config_files/pipeline_from_config_file.py)
- [Add event listener to get notification about Pipeline progress](./customize/build_graph/pipeline/pipeline_with_notifications.py)
- [Use component context to send notifications about Component progress](./customize/build_graph/pipeline/pipeline_with_component_notifications.py)


#### Components
Expand Down
2 changes: 1 addition & 1 deletion examples/build_graph/simple_kg_builder_from_text.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from neo4j_graphrag.embeddings import OpenAIEmbeddings
from neo4j_graphrag.experimental.pipeline.kg_builder import SimpleKGPipeline
from neo4j_graphrag.experimental.pipeline.pipeline import PipelineResult
from neo4j_graphrag.experimental.pipeline.types import (
from neo4j_graphrag.experimental.pipeline.types.schema import (
EntityInputType,
RelationInputType,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
"""This example demonstrates how to use event callback to receive notifications
about the component progress.
"""

from __future__ import annotations

import asyncio
import logging
from typing import Any

from neo4j_graphrag.experimental.pipeline import Pipeline, Component, DataModel
from neo4j_graphrag.experimental.pipeline.notification import Event, EventType
from neo4j_graphrag.experimental.pipeline.types.context import RunContext

logger = logging.getLogger(__name__)
logging.basicConfig()
logger.setLevel(logging.INFO)


class MultiplyComponentResult(DataModel):
result: list[int]


class MultiplicationComponent(Component):
def __init__(self, f: int) -> None:
self.f = f

async def multiply_number(
self,
context_: RunContext,
number: int,
) -> int:
await context_.notify(
message=f"Processing number {number}",
data={"number_processed": number},
)
return self.f * number

# implementing `run_with_context` to get access to
# the pipeline's RunContext:
async def run_with_context(
self,
context_: RunContext,
numbers: list[int],
**kwargs: Any,
) -> MultiplyComponentResult:
result = await asyncio.gather(
*[
self.multiply_number(
context_,
number,
)
for number in numbers
]
)
return MultiplyComponentResult(result=result)


async def event_handler(event: Event) -> None:
"""Function can do anything about the event,
here we're just logging it if it's a pipeline-level event.
"""
if event.event_type == EventType.TASK_PROGRESS:
logger.warning(event)
else:
logger.info(event)


async def main() -> None:
""" """
pipe = Pipeline(
callback=event_handler,
)
# define the components
pipe.add_component(
MultiplicationComponent(f=2),
"multiply_by_2",
)
pipe.add_component(
MultiplicationComponent(f=10),
"multiply_by_10",
)
# define the execution order of component
# and how the output of previous components must be used
pipe.connect(
"multiply_by_2",
"multiply_by_10",
input_config={"numbers": "multiply_by_2.result"},
)
# user input:
pipe_inputs_1 = {
"multiply_by_2": {
"numbers": [1, 2, 5, 4],
},
}
pipe_inputs_2 = {
"multiply_by_2": {
"numbers": [3, 10, 1],
}
}
# run the pipeline
await asyncio.gather(
pipe.run(pipe_inputs_1),
pipe.run(pipe_inputs_2),
)


if __name__ == "__main__":
asyncio.run(main())
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
)
from neo4j_graphrag.experimental.pipeline import Pipeline
from neo4j_graphrag.experimental.pipeline.pipeline import PipelineResult
from neo4j_graphrag.experimental.pipeline.types import Event
from neo4j_graphrag.experimental.pipeline.notification import Event

logger = logging.getLogger(__name__)
logging.basicConfig()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
# limitations under the License.
from __future__ import annotations

import abc
import asyncio
import enum
import json
Expand Down Expand Up @@ -115,7 +114,7 @@ def fix_invalid_json(raw_json: str) -> str:
return repaired_json


class EntityRelationExtractor(Component, abc.ABC):
class EntityRelationExtractor(Component):
"""Abstract class for entity relation extraction components.

Args:
Expand All @@ -133,15 +132,14 @@ def __init__(
self.on_error = on_error
self.create_lexical_graph = create_lexical_graph

@abc.abstractmethod
async def run(
self,
chunks: TextChunks,
document_info: Optional[DocumentInfo] = None,
lexical_graph_config: Optional[LexicalGraphConfig] = None,
**kwargs: Any,
) -> Neo4jGraph:
pass
raise NotImplementedError()

def update_ids(
self,
Expand Down
6 changes: 2 additions & 4 deletions src/neo4j_graphrag/experimental/components/resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
from typing import Any, Optional

import neo4j
Expand All @@ -22,7 +21,7 @@
from neo4j_graphrag.utils import driver_config


class EntityResolver(Component, abc.ABC):
class EntityResolver(Component):
"""Entity resolution base class

Args:
Expand All @@ -38,9 +37,8 @@ def __init__(
self.driver = driver_config.override_user_agent(driver)
self.filter_query = filter_query

@abc.abstractmethod
async def run(self, *args: Any, **kwargs: Any) -> ResolutionStats:
pass
raise NotImplementedError()


class SinglePropertyExactMatchResolver(EntityResolver):
Expand Down
2 changes: 1 addition & 1 deletion src/neo4j_graphrag/experimental/components/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from neo4j_graphrag.exceptions import SchemaValidationError
from neo4j_graphrag.experimental.pipeline.component import Component, DataModel
from neo4j_graphrag.experimental.pipeline.types import (
from neo4j_graphrag.experimental.pipeline.types.schema import (
EntityInputType,
RelationInputType,
)
Expand Down
Loading