-
Notifications
You must be signed in to change notification settings - Fork 1.6k
[Feat] Support more running mode in workforce #3157
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the ✨ Finishing touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
graph TD
A[Task: Generate 5 AI/ML Papers] --> B[Literature Researcher]
B --> C{Fork: 5 Parallel Tasks}
C --> D1[Summary Specialist 1<br/>Summarize Paper 1]
C --> D2[Summary Specialist 2<br/>Summarize Paper 2]
C --> D3[Summary Specialist 3<br/>Summarize Paper 3]
C --> D4[Summary Specialist 4<br/>Summarize Paper 4]
C --> D5[Summary Specialist 5<br/>Summarize Paper 5]
D1 --> E{Join: Collect Summaries}
D2 --> E
D3 --> E
D4 --> E
D5 --> E
E --> F[Research Synthesizer<br/>Analyze AI/ML Trends]
F --> G[Final Result]
style A fill:#e1f5fe
style B fill:#fff3e0
style D1 fill:#f3e5f5
style D2 fill:#f3e5f5
style D3 fill:#f3e5f5
style D4 fill:#f3e5f5
style D5 fill:#f3e5f5
style F fill:#e8f5e8
style G fill:#fff9c4
|
graph TD
A[Task: Generate 4 RESTful API Types] --> B[API Researcher]
B --> C{Fork: 4 Parallel Tasks}
C --> D1[API Analyst 1<br/>Analyze API 1]
C --> D2[API Analyst 2<br/>Analyze API 2]
C --> D3[API Analyst 3<br/>Analyze API 3]
C --> D4[API Analyst 4<br/>Analyze API 4]
D1 --> E{Join: Collect Analyses}
D2 --> E
D3 --> E
D4 --> E
E --> F[Documentation Writer<br/>Generate API Usage Guide]
F --> G[Final Result]
style A fill:#e1f5fe
style B fill:#fff3e0
style D1 fill:#f3e5f5
style D2 fill:#f3e5f5
style D3 fill:#f3e5f5
style D4 fill:#f3e5f5
style F fill:#e8f5e8
style G fill:#fff9c4
|
graph TD
A[Task: Code Review Analysis] --> B[Code Scanner]
B --> C{Fork: 3 Parallel Tasks}
C --> D1[Code Reviewer 1<br/>Review File 1]
C --> D2[Code Reviewer 2<br/>Review File 2]
C --> D3[Code Reviewer 3<br/>Review File 3]
D1 --> E{Join: Collect Reviews}
D2 --> E
D3 --> E
E --> F[Review Summarizer<br/>Generate Comprehensive Report]
F --> G[Final Result]
style A fill:#e1f5fe
style B fill:#fff3e0
style D1 fill:#f3e5f5
style D2 fill:#f3e5f5
style D3 fill:#f3e5f5
style F fill:#e8f5e8
style G fill:#fff9c4
|
|
Hey @Wendong-Fan @fengju0213 ! I've already add pipeline mode in workforce, and maybe we can try more cases to test this mode. |
thanks @Ol1ver0413 ! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the comprehensive PR @Ol1ver0413. I added a comment. Also I think the method namings could be a bit more consistent and remembrable.
# before
workforce.add_pipeline_task()
workforce.fork_pipeline()
workforce.add_parallel_pipeline_tasks()
builder.fork()
builder.add_parallel_tasks()
# after
workforce.pipeline_add()
workforce.pipeline_fork()
workforce.pipeline_join()
workforce.pipeline_build()
builder.add()
builder.fork()
builder.join()
builder.build()| expected_task_ids = {task.id for task in self._pending_tasks} | ||
| expected_task_ids.update(task.id for task in self._completed_tasks) | ||
|
|
||
| completed_successful_ids = { | ||
| task.id for task in self._completed_tasks | ||
| if task.state == TaskState.DONE | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't this create a situation if which if any of the branched tasks fails, the whole pipeline is marked as failed and the join task never executes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't this create a situation if which if any of the branched tasks fails, the whole pipeline is marked as failed and the join task never executes?
Thanks @hesamsheikh ! Maybe I need to add some branch task failure handling mechanisms into the pipeline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @hesamsheikh. Since the original design follows a pipeline pattern, I didn’t perform task restructuring or decomposition as in the decompose mode. Therefore, I decided that if a task fails, it would still be considered successful, and the error message would be passed to the join stage. We can discuss whether there might be a better approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well this method seems a bit confusing and hard to understand anyway. Could you maybe simplify it, or explain it a bit in the comments? I know how it works and what it does, but not the intent of it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't this create a situation if which if any of the branched tasks fails, the whole pipeline is marked as failed and the join task never executes?
Yeah, I'll add more comments.
|
|
||
| # Clear existing tasks and dependencies | ||
| self._pending_tasks.clear() | ||
| self._task_dependencies.clear() | ||
| self._assignees.clear() | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Ol1ver0413 , thanks for the PR, but as of now, Is the pipeline designed that it could interrupt running tasks or is it only with predefined tasks only? i.e. before workforce.start()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Ol1ver0413 , thanks for the PR, but as of now, Is the pipeline designed that it could interrupt running tasks or is it only with predefined tasks only? i.e. before workforce.start()
Thanks @a7m-1st. The pipeline mode was designed for sequential predefined tasks, which cannot be interrupted by human. But maybe we can have a further discussion for it. Thanks again!
|
hi @Ol1ver0413 Hope you're doing well! Just wanted to check in on this PR. It looks like there's some feedback that needs to be addressed, along with some failing in pre-commit checks. Please let us know if you have any questions or need a hand with anything. We're looking forward to getting this merged! Cheers |
Hi, @Wendong-Fan! |
|
Check out this pull request on See visual diffs & provide feedback on Jupyter Notebooks. Powered by ReviewNB |
…into workforce_pipeline
|
@Ol1ver0413 There is merge conflicts I guess as I am seeing 140+ files changes. |
Yeah, I see that too. It's quite weird because I only modified workforce.py and example file. Let me take a check. |
|
Cool. all good now, thanks for the update will review it asap ! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your updates. I checked the PR again and added a few more comments.
camel/societies/workforce/utils.py
Outdated
| if hasattr(self, '_last_parallel_tasks') and self._last_parallel_tasks: | ||
| wait_for = self._last_parallel_tasks | ||
| # Clear the parallel tasks after using them | ||
| delattr(self, '_last_parallel_tasks') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why delete the whole attribute? consider emptying the attribute or setting it to none.
camel/societies/workforce/utils.py
Outdated
| self.task_counter = 0 | ||
| self._last_task_id = None | ||
| if hasattr(self, '_last_parallel_tasks'): | ||
| delattr(self, '_last_parallel_tasks') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
again, pretty unusual to delete the attribute
| async def _process_task_with_hybrid(self, task: Task) -> Task: | ||
| """Process task using hybrid approach (partial auto-decomposition).""" | ||
| # For now, fall back to auto-decompose mode | ||
| # This can be extended to support more sophisticated hybrid logic | ||
| return await self._process_task_with_auto_decompose(task) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this for future use? it's unused
| expected_task_ids = {task.id for task in self._pending_tasks} | ||
| expected_task_ids.update(task.id for task in self._completed_tasks) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are pending tasks counted as completed ones? seems like an issue.
Thanks @hesamsheikh for reviewing. Let me take a check. |
…into workforce_pipeline
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Ol1ver0413 for your work. All worked well with my side, except some small points I would like your opinion on, and a small change perhaps overlooked when merging conflicts 🙌
| auto_depend: bool = True, | ||
| ) -> 'Workforce': | ||
| """Add a task to the pipeline with support for chaining. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we don't need to wrap the return in quotes?
|
|
||
| def pipeline_fork(self, task_contents: List[str]) -> 'Workforce': | ||
| """Create parallel branches from the current task. | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can see that those parallel tasks only support text, would have been nice to perhaps support list of Tasks. Or is that not viable?
| # Collect results from all pipeline tasks | ||
| task.result = self._collect_pipeline_results() | ||
| task.state = ( | ||
| TaskState.DONE if self._all_pipeline_tasks_successful() | ||
| else TaskState.FAILED | ||
| ) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can reset the mode to AUTO_DECOMPOSE after completing? Perhaps another function to change the mode of workforce after init()
As when using any of pipeline's functions it switches mode automatically in _ensure_pipeline_builder() to PIPELINE
| and ( | ||
| task.additional_info is None | ||
| or not task.additional_info.get( | ||
| "_needs_decomposition", False | ||
| ) | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we revert this @Ol1ver0413?
More details on this on #3238.
This fix was added as we are mixing independent & subtasks in the same _pending_queue differentiated by _needs_decomposition flag. So we delay positing the independent tasks.
Thanks @a7m-1st . I'll take a review asap. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please re-request a review after making the necessary changes @Ol1ver0413
Description
Add pipeline running mode in workforce. #1663
Checklist
Go over all the following points, and put an
xin all the boxes that apply.Fixes #issue-numberin the PR description (required)pyproject.tomlanduv lockIf you are unsure about any of these, don't hesitate to ask. We are here to help!