File tree Expand file tree Collapse file tree 2 files changed +16
-2
lines changed
src/neo4j_graphrag/experimental/pipeline Expand file tree Collapse file tree 2 files changed +16
-2
lines changed Original file line number Diff line number Diff line change 23
23
import time
24
24
from pathlib import Path
25
25
from typing import List
26
+ from dotenv import load_dotenv
27
+
28
+ load_dotenv ()
26
29
27
30
from neo4j_graphrag .experimental .pipeline .config .runner import PipelineRunner
28
31
from neo4j_graphrag .experimental .pipeline .executors import LocalExecutor , RayExecutor
Original file line number Diff line number Diff line change @@ -112,9 +112,20 @@ async def submit(
112
112
113
113
import cloudpickle
114
114
115
- task_bytes = cloudpickle .dumps (task )
115
+ try :
116
+ task_bytes = cloudpickle .dumps (task )
117
+ except Exception as exc : # noqa: BLE001
118
+ # Component (e.g., holding Neo4j driver) is not picklable – run locally.
119
+ # This keeps the pipeline working even if some tasks cannot be off-loaded.
120
+ import logging
121
+
122
+ logging .getLogger (__name__ ).warning (
123
+ "RayExecutor fallback to LocalExecutor for %s: %s" , task .name , exc
124
+ )
125
+ return await super ().submit (task , context , inputs )
126
+
116
127
obj_ref = self ._remote_runner .remote (task_bytes , inputs )
117
- res = await self ._ray .get (obj_ref )
128
+ res = self ._ray .get (obj_ref )
118
129
if res is None :
119
130
return None
120
131
return RunResult (result = res )
You can’t perform that action at this time.
0 commit comments