Skip to content

Make pipeline work with parallel runs #119

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
Sep 8, 2024

Conversation

stellasia
Copy link
Contributor

@stellasia stellasia commented Sep 4, 2024

Description

Initial issue

The component's results were saved using the component name as key, and the component status were saved in the component directly, which makes the following not behaving as expected (and returning the same result for both 'runs'):

    pipe = Pipeline()
    pipe.add_component(ComponentAdd(), "add")
    run_params = [[1, 20], [10, 2]]
    runs = []
    for a, b in run_params:
        runs.append(pipe.run({"add": {"number1": a, "number2": b}}))
    results = await asyncio.gather(*runs)
    print(result)

(note: this code has been turned into a unit test)

What's in this PR?

  • This PR introduces a run_id (uuid) in the Orchestrator and this run ID is used to build the key to access the components' status and results.

  • By doing so, it appeared that the callback was not needed anymore and was removed, all the processing now happen in the Orchestrator.run_task method that can call its own on_task_complete method without need for partial and protocol and such.

  • In order to make the intermediate results still available after a run is done, this PR introduces the ResultStore, that's able to find the results for a given run ID and a given component. Only InMemoryStore implemented so far. Pipeline.run method also return the run_id in a new PipelineResult object (breaking change).

  • Not mandatory but possible follow-up PR, that moves the status to the store instead of having them hanging in the Task.

Type of Change

  • New feature
  • Bug fix
  • Breaking change
  • Documentation update
  • Project configuration change

Complexity

Note

Please provide an estimated complexity of this PR of either Low, Medium or High

Complexity: Medium

How Has This Been Tested?

  • Unit tests
  • E2E tests
  • Manual tests

Checklist

The following requirements should have been met (depending on the changes in the branch):

  • Documentation has been updated
  • Unit tests have been updated
  • E2E tests have been updated
  • Examples have been updated
  • New files have copyright header
  • CLA (https://neo4j.com/developer/cla/) has been signed
  • CHANGELOG.md updated if appropriate

@stellasia stellasia marked this pull request as ready for review September 4, 2024 15:12


class PromptTemplateComponent(Component):
def __init__(self, prompt: PromptTemplate) -> None:
self.prompt = prompt

async def run(self, query: str, context: list[str]) -> StringDataModel:
async def run(self, query: str, context: List[str]) -> ComponentResultDataModel:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to use list[str] here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't work with python 3.8 when we do the introspection to find the expected inputs.

Copy link
Contributor

@willtai willtai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 🏔️

@stellasia stellasia merged commit c284b08 into neo4j:main Sep 8, 2024
11 checks passed
@stellasia stellasia mentioned this pull request Sep 8, 2024
15 tasks
a-s-g93 pushed a commit to a-s-g93/neo4j-genai-python-dev that referenced this pull request Sep 13, 2024
* Add failing test

* Define a "run_id" in Orchestrator - save results per run_id

* Make unit test work

* Make intermediate results accessible from outside pipeline for investigation

* Remove unused imports

* Update examples and CHANGELOG

* Cleaning: remove deprecated code

* Fix ruff

* Fix examples

* Fix examples again

* PR reviews

* Removing useless status assignment
a-s-g93 pushed a commit to a-s-g93/neo4j-genai-python-dev that referenced this pull request Sep 13, 2024
* Add failing test

* Define a "run_id" in Orchestrator - save results per run_id

* Make unit test work

* Make intermediate results accessible from outside pipeline for investigation

* Remove unused imports

* Update examples and CHANGELOG

* Cleaning: remove deprecated code

* Fix ruff

* Fix examples

* Fix examples again

* PR reviews

* Removing useless status assignment
@stellasia stellasia deleted the fix/async-pipeline branch September 16, 2024 08:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants