@@ -22,7 +22,7 @@ their own by following these steps:
22
22
23
23
1. Create a subclass of the Pydantic `neo4j_graphrag.experimental.pipeline.DataModel ` to represent the data being returned by the component
24
24
2. Create a subclass of `neo4j_graphrag.experimental.pipeline.Component `
25
- 3. Create a run method in this new class and specify the required inputs and output model using the just created `DataModel `
25
+ 3. Create a ` run_with_context ` method in this new class and specify the required inputs and output model using the just created `DataModel `
26
26
4. Implement the run method: it's an `async ` method, allowing tasks to be parallelized and awaited within this method.
27
27
28
28
An example is given below, where a `ComponentAdd ` is created to add two numbers together and return
@@ -31,12 +31,13 @@ the resulting sum:
31
31
.. code :: python
32
32
33
33
from neo4j_graphrag.experimental.pipeline import Component, DataModel
34
+ from neo4j_graphrag.experimental.pipeline.types.context import RunContext
34
35
35
36
class IntResultModel (DataModel ):
36
37
result: int
37
38
38
39
class ComponentAdd (Component ):
39
- async def run (self , number1 : int , number2 : int = 1 ) -> IntResultModel:
40
+ async def run_with_context (self , context_ : RunContext , number1 : int , number2 : int = 1 ) -> IntResultModel:
40
41
return IntResultModel(result = number1 + number2)
41
42
42
43
Read more about :ref: `components-section ` in the API Documentation.
@@ -141,6 +142,7 @@ It is possible to add a callback to receive notification about pipeline progress
141
142
- `PIPELINE_STARTED `, when pipeline starts
142
143
- `PIPELINE_FINISHED `, when pipeline ends
143
144
- `TASK_STARTED `, when a task starts
145
+ - `TASK_PROGRESS `, sent by each component (depends on component's implementation, see below)
144
146
- `TASK_FINISHED `, when a task ends
145
147
146
148
@@ -172,3 +174,29 @@ See :ref:`pipelineevent` and :ref:`taskevent` to see what is sent in each event
172
174
# ... add components, connect them as usual
173
175
174
176
await pipeline.run(... )
177
+
178
+
179
+ Send Events from Components
180
+ ===========================
181
+
182
+ Components can send notifications about their progress using the `notify ` function from
183
+ the `context_ `:
184
+
185
+ .. code :: python
186
+
187
+ from neo4j_graphrag.experimental.pipeline import Component, DataModel
188
+ from neo4j_graphrag.experimental.pipeline.types.context import RunContext
189
+
190
+ class IntResultModel (DataModel ):
191
+ result: int
192
+
193
+ class ComponentAdd (Component ):
194
+ async def run_with_context (self , context_ : RunContext, number1 : int , number2 : int = 1 ) -> IntResultModel:
195
+ for fake_iteration in range (10 ):
196
+ await context_.notify(
197
+ message = f " Starting iteration { fake_iteration} out of 10 " ,
198
+ data = {" iteration" : fake_iteration, " total" : 10 }
199
+ )
200
+ return IntResultModel(result = number1 + number2)
201
+
202
+ This will send an `TASK_PROGRESS ` event to the pipeline callback.
0 commit comments