-
Notifications
You must be signed in to change notification settings - Fork 108
Make pipeline work with parallel runs #119
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…to fix/async-pipeline
|
||
|
||
class PromptTemplateComponent(Component): | ||
def __init__(self, prompt: PromptTemplate) -> None: | ||
self.prompt = prompt | ||
|
||
async def run(self, query: str, context: list[str]) -> StringDataModel: | ||
async def run(self, query: str, context: List[str]) -> ComponentResultDataModel: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to use list[str]
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't work with python 3.8
when we do the introspection to find the expected inputs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM 🏔️
* Add failing test * Define a "run_id" in Orchestrator - save results per run_id * Make unit test work * Make intermediate results accessible from outside pipeline for investigation * Remove unused imports * Update examples and CHANGELOG * Cleaning: remove deprecated code * Fix ruff * Fix examples * Fix examples again * PR reviews * Removing useless status assignment
* Add failing test * Define a "run_id" in Orchestrator - save results per run_id * Make unit test work * Make intermediate results accessible from outside pipeline for investigation * Remove unused imports * Update examples and CHANGELOG * Cleaning: remove deprecated code * Fix ruff * Fix examples * Fix examples again * PR reviews * Removing useless status assignment
Description
Initial issue
The component's results were saved using the component name as key, and the component status were saved in the component directly, which makes the following not behaving as expected (and returning the same result for both 'runs'):
(note: this code has been turned into a unit test)
What's in this PR?
This PR introduces a
run_id
(uuid) in theOrchestrator
and this run ID is used to build the key to access the components' status and results.By doing so, it appeared that the
callback
was not needed anymore and was removed, all the processing now happen in theOrchestrator.run_task
method that can call its ownon_task_complete
method without need for partial and protocol and such.In order to make the intermediate results still available after a run is done, this PR introduces the
ResultStore
, that's able to find the results for a given run ID and a given component. Only InMemoryStore implemented so far.Pipeline.run
method also return therun_id
in a newPipelineResult
object (breaking change).Not mandatory but possible follow-up PR, that moves the status to the
store
instead of having them hanging in theTask
.Type of Change
Complexity
Complexity: Medium
How Has This Been Tested?
Checklist
The following requirements should have been met (depending on the changes in the branch):