Skip to content

Commit 25008ab

Browse files
authored
Pipeline status notifier (#254)
* WIP: notifier * Add callback protocol * Call notifier + add example * ruff * ruff * import annotation to make Protocol | None work * mypy * Add unit test * Move types to types.py * Doc and changelog * ruff & co * ruff
1 parent e0b5e86 commit 25008ab

File tree

14 files changed

+1151
-257
lines changed

14 files changed

+1151
-257
lines changed

CHANGELOG.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,13 @@
22

33
## Next
44

5+
### Added
6+
7+
- Ability to add event listener to get notifications about Pipeline progress.
8+
59
### Changed
610
- Changed the default behaviour of `FixedSizeSplitter` to avoid words cut-off in the chunks whenever it is possible.
711

8-
912
## 1.4.2
1013

1114
### Fixed

docs/source/conf.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
"sphinx.ext.napoleon",
4646
"sphinx.ext.viewcode",
4747
"sphinx.ext.autosectionlabel",
48+
"enum_tools.autoenum",
4849
]
4950

5051
# The suffix(es) of source filenames.

docs/source/types.rst

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,3 +153,27 @@ ParamFromEnvConfig
153153
==================
154154

155155
.. autoclass:: neo4j_graphrag.experimental.pipeline.config.param_resolver.ParamFromEnvConfig
156+
157+
158+
EventType
159+
=========
160+
161+
.. autoenum:: neo4j_graphrag.experimental.pipeline.types.EventType
162+
163+
164+
PipelineEvent
165+
==============
166+
167+
.. autoclass:: neo4j_graphrag.experimental.pipeline.types.PipelineEvent
168+
169+
TaskEvent
170+
==============
171+
172+
.. autoclass:: neo4j_graphrag.experimental.pipeline.types.TaskEvent
173+
174+
175+
EventCallbackProtocol
176+
=====================
177+
178+
.. autoclass:: neo4j_graphrag.experimental.pipeline.types.EventCallbackProtocol
179+
:members: __call__

docs/source/user_guide_pipeline.rst

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,6 @@ Pipelines can be visualized using the `draw` method:
106106

107107
.. code:: python
108108
109-
import asyncio
110109
from neo4j_graphrag.experimental.pipeline import Pipeline
111110
112111
pipe = Pipeline()
@@ -131,3 +130,45 @@ Here is an example of final result:
131130

132131
.. image:: images/pipeline_full.png
133132
:alt: Pipeline visualisation
133+
134+
135+
************************
136+
Adding an Event Callback
137+
************************
138+
139+
It is possible to add a callback to receive notification about pipeline progress:
140+
141+
- `PIPELINE_STARTED`, when pipeline starts
142+
- `PIPELINE_FINISHED`, when pipeline ends
143+
- `TASK_STARTED`, when a task starts
144+
- `TASK_FINISHED`, when a task ends
145+
146+
147+
See :ref:`pipelineevent` and :ref:`taskevent` to see what is sent in each event type.
148+
149+
.. code:: python
150+
151+
import asyncio
152+
import logging
153+
154+
from neo4j_graphrag.experimental.pipeline import Pipeline
155+
from neo4j_graphrag.experimental.pipeline.types import Event
156+
157+
logger = logging.getLogger(__name__)
158+
logging.basicConfig()
159+
logger.setLevel(logging.WARNING)
160+
161+
162+
async def event_handler(event: Event) -> None:
163+
"""Function can do anything about the event,
164+
here we're just logging it if it's a pipeline-level event.
165+
"""
166+
if event.event_type.is_pipeline_event:
167+
logger.warning(event)
168+
169+
pipeline = Pipeline(
170+
callback=event_handler,
171+
)
172+
# ... add components, connect them as usual
173+
174+
await pipeline.run(...)

examples/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ are listed in [the last section of this file](#customize).
101101
- [Process multiple documents](./customize/build_graph/pipeline/kg_builder_two_documents_entity_resolution.py)
102102
- [Export lexical graph creation into another pipeline](./customize/build_graph/pipeline/text_to_lexical_graph_to_entity_graph_two_pipelines.py)
103103
- [Build pipeline from config file](customize/build_graph/pipeline/from_config_files/pipeline_from_config_file.py)
104+
- [Add event listener to get notification about Pipeline progress](./customize/build_graph/pipeline/pipeline_with_notifications.py)
104105

105106

106107
#### Components
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
"""This example demonstrates how to use event callback to receive notifications
2+
about the pipeline progress.
3+
"""
4+
5+
from __future__ import annotations
6+
7+
import asyncio
8+
import logging
9+
10+
import neo4j
11+
from neo4j_graphrag.experimental.components.kg_writer import Neo4jWriter
12+
from neo4j_graphrag.experimental.components.lexical_graph import LexicalGraphBuilder
13+
from neo4j_graphrag.experimental.components.text_splitters.fixed_size_splitter import (
14+
FixedSizeSplitter,
15+
)
16+
from neo4j_graphrag.experimental.pipeline import Pipeline
17+
from neo4j_graphrag.experimental.pipeline.pipeline import PipelineResult
18+
from neo4j_graphrag.experimental.pipeline.types import Event
19+
20+
logger = logging.getLogger(__name__)
21+
logging.basicConfig()
22+
logger.setLevel(logging.WARNING)
23+
24+
25+
async def event_handler(event: Event) -> None:
26+
"""Function can do anything about the event,
27+
here we're just logging it if it's a pipeline-level event.
28+
"""
29+
if event.event_type.is_pipeline_event:
30+
logger.warning(event)
31+
32+
33+
async def main(neo4j_driver: neo4j.Driver) -> PipelineResult:
34+
"""This is where we define and run the Lexical Graph builder pipeline, instantiating
35+
a few components:
36+
37+
- Text Splitter: to split the text into manageable chunks of fixed size
38+
- Chunk Embedder: to embed the chunks' text
39+
- Lexical Graph Builder: to build the lexical graph, ie creating the chunk nodes and relationships between them
40+
- KG writer: save the lexical graph to Neo4j
41+
"""
42+
pipe = Pipeline(
43+
callback=event_handler,
44+
)
45+
# define the components
46+
pipe.add_component(
47+
FixedSizeSplitter(chunk_size=300, chunk_overlap=10, approximate=False),
48+
"splitter",
49+
)
50+
pipe.add_component(
51+
LexicalGraphBuilder(),
52+
"lexical_graph_builder",
53+
)
54+
pipe.add_component(Neo4jWriter(neo4j_driver), "writer")
55+
# define the execution order of component
56+
# and how the output of previous components must be used
57+
pipe.connect(
58+
"splitter", "lexical_graph_builder", input_config={"text_chunks": "splitter"}
59+
)
60+
pipe.connect(
61+
"lexical_graph_builder",
62+
"writer",
63+
input_config={
64+
"graph": "lexical_graph_builder.graph",
65+
"lexical_graph_config": "lexical_graph_builder.config",
66+
},
67+
)
68+
# user input:
69+
# the initial text
70+
# and the list of entities and relations we are looking for
71+
pipe_inputs = {
72+
"splitter": {
73+
"text": """Albert Einstein was a German physicist born in 1879 who
74+
wrote many groundbreaking papers especially about general relativity
75+
and quantum mechanics. He worked for many different institutions, including
76+
the University of Bern in Switzerland and the University of Oxford."""
77+
},
78+
"lexical_graph_builder": {
79+
"document_info": {
80+
# 'path' can be anything
81+
"path": "example/pipeline_with_notifications"
82+
},
83+
},
84+
}
85+
# run the pipeline
86+
return await pipe.run(pipe_inputs)
87+
88+
89+
if __name__ == "__main__":
90+
with neo4j.GraphDatabase.driver(
91+
"bolt://localhost:7687", auth=("neo4j", "password")
92+
) as driver:
93+
print(asyncio.run(main(driver)))

0 commit comments

Comments
 (0)