Skip to content

Conversation

@Ol1ver0413
Copy link
Collaborator

@Ol1ver0413 Ol1ver0413 commented Sep 18, 2025

Description

Add pipeline running mode in workforce. #1663

Checklist

Go over all the following points, and put an x in all the boxes that apply.

  • I have read the CONTRIBUTION guide (required)
  • I have linked this PR to an issue using the Development section on the right sidebar or by adding Fixes #issue-number in the PR description (required)
  • I have checked if any dependencies need to be added or updated in pyproject.toml and uv lock
  • I have updated the tests accordingly (required for a bug fix or a new feature)
  • I have updated the documentation if needed:
  • I have added examples if this is a new feature

If you are unsure about any of these, don't hesitate to ask. We are here to help!

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Sep 18, 2025

Important

Review skipped

Auto reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch workforce_pipeline

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@Ol1ver0413 Ol1ver0413 self-assigned this Sep 18, 2025
@Ol1ver0413
Copy link
Collaborator Author

graph TD
    A[Task: Generate 5 AI/ML Papers] --> B[Literature Researcher]
    B --> C{Fork: 5 Parallel Tasks}
    
    C --> D1[Summary Specialist 1<br/>Summarize Paper 1]
    C --> D2[Summary Specialist 2<br/>Summarize Paper 2]
    C --> D3[Summary Specialist 3<br/>Summarize Paper 3]
    C --> D4[Summary Specialist 4<br/>Summarize Paper 4]
    C --> D5[Summary Specialist 5<br/>Summarize Paper 5]
    
    D1 --> E{Join: Collect Summaries}
    D2 --> E
    D3 --> E
    D4 --> E
    D5 --> E
    
    E --> F[Research Synthesizer<br/>Analyze AI/ML Trends]
    F --> G[Final Result]
    
    style A fill:#e1f5fe
    style B fill:#fff3e0
    style D1 fill:#f3e5f5
    style D2 fill:#f3e5f5
    style D3 fill:#f3e5f5
    style D4 fill:#f3e5f5
    style D5 fill:#f3e5f5
    style F fill:#e8f5e8
    style G fill:#fff9c4
Loading

@Ol1ver0413
Copy link
Collaborator Author

Ol1ver0413 commented Sep 21, 2025

graph TD
    A[Task: Generate 4 RESTful API Types] --> B[API Researcher]
    B --> C{Fork: 4 Parallel Tasks}
    
    C --> D1[API Analyst 1<br/>Analyze API 1]
    C --> D2[API Analyst 2<br/>Analyze API 2]
    C --> D3[API Analyst 3<br/>Analyze API 3]
    C --> D4[API Analyst 4<br/>Analyze API 4]
    
    D1 --> E{Join: Collect Analyses}
    D2 --> E
    D3 --> E
    D4 --> E
    
    E --> F[Documentation Writer<br/>Generate API Usage Guide]
    F --> G[Final Result]
    
    style A fill:#e1f5fe
    style B fill:#fff3e0
    style D1 fill:#f3e5f5
    style D2 fill:#f3e5f5
    style D3 fill:#f3e5f5
    style D4 fill:#f3e5f5
    style F fill:#e8f5e8
    style G fill:#fff9c4
Loading

@Ol1ver0413
Copy link
Collaborator Author

Ol1ver0413 commented Sep 21, 2025

graph TD
    A[Task: Code Review Analysis] --> B[Code Scanner]
    B --> C{Fork: 3 Parallel Tasks}
    
    C --> D1[Code Reviewer 1<br/>Review File 1]
    C --> D2[Code Reviewer 2<br/>Review File 2]
    C --> D3[Code Reviewer 3<br/>Review File 3]
    
    D1 --> E{Join: Collect Reviews}
    D2 --> E
    D3 --> E
    
    E --> F[Review Summarizer<br/>Generate Comprehensive Report]
    F --> G[Final Result]
    
    style A fill:#e1f5fe
    style B fill:#fff3e0
    style D1 fill:#f3e5f5
    style D2 fill:#f3e5f5
    style D3 fill:#f3e5f5
    style F fill:#e8f5e8
    style G fill:#fff9c4
Loading

@Ol1ver0413 Ol1ver0413 marked this pull request as ready for review September 21, 2025 15:01
@Ol1ver0413
Copy link
Collaborator Author

Hey @Wendong-Fan @fengju0213 ! I've already add pipeline mode in workforce, and maybe we can try more cases to test this mode.

@Wendong-Fan
Copy link
Member

Hey @Wendong-Fan @fengju0213 ! I've already add pipeline mode in workforce, and maybe we can try more cases to test this mode.

thanks @Ol1ver0413 !

@Wendong-Fan Wendong-Fan added the Review Required PR need to be reviewed label Sep 29, 2025
@Wendong-Fan Wendong-Fan added this to the Sprint 38 milestone Sep 29, 2025
@Wendong-Fan Wendong-Fan linked an issue Sep 29, 2025 that may be closed by this pull request
2 tasks
Copy link
Collaborator

@hesamsheikh hesamsheikh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comprehensive PR @Ol1ver0413. I added a comment. Also I think the method namings could be a bit more consistent and remembrable.

  # before
  workforce.add_pipeline_task()
  workforce.fork_pipeline()
  workforce.add_parallel_pipeline_tasks()
  builder.fork()  
  builder.add_parallel_tasks() 

  # after
  workforce.pipeline_add()
  workforce.pipeline_fork()
  workforce.pipeline_join()
  workforce.pipeline_build()
  builder.add()
  builder.fork()
  builder.join()
  builder.build()

Comment on lines 1533 to 1540
expected_task_ids = {task.id for task in self._pending_tasks}
expected_task_ids.update(task.id for task in self._completed_tasks)

completed_successful_ids = {
task.id for task in self._completed_tasks
if task.state == TaskState.DONE
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't this create a situation if which if any of the branched tasks fails, the whole pipeline is marked as failed and the join task never executes?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't this create a situation if which if any of the branched tasks fails, the whole pipeline is marked as failed and the join task never executes?

Thanks @hesamsheikh ! Maybe I need to add some branch task failure handling mechanisms into the pipeline.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @hesamsheikh. Since the original design follows a pipeline pattern, I didn’t perform task restructuring or decomposition as in the decompose mode. Therefore, I decided that if a task fails, it would still be considered successful, and the error message would be passed to the join stage. We can discuss whether there might be a better approach.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well this method seems a bit confusing and hard to understand anyway. Could you maybe simplify it, or explain it a bit in the comments? I know how it works and what it does, but not the intent of it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't this create a situation if which if any of the branched tasks fails, the whole pipeline is marked as failed and the join task never executes?

Yeah, I'll add more comments.

Comment on lines +702 to +707

# Clear existing tasks and dependencies
self._pending_tasks.clear()
self._task_dependencies.clear()
self._assignees.clear()

Copy link
Collaborator

@a7m-1st a7m-1st Oct 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ol1ver0413 , thanks for the PR, but as of now, Is the pipeline designed that it could interrupt running tasks or is it only with predefined tasks only? i.e. before workforce.start()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ol1ver0413 , thanks for the PR, but as of now, Is the pipeline designed that it could interrupt running tasks or is it only with predefined tasks only? i.e. before workforce.start()

Thanks @a7m-1st. The pipeline mode was designed for sequential predefined tasks, which cannot be interrupted by human. But maybe we can have a further discussion for it. Thanks again!

@Wendong-Fan Wendong-Fan added Waiting for Update PR has been reviewed, need to be updated based on review comment and removed Review Required PR need to be reviewed labels Oct 19, 2025
@Wendong-Fan
Copy link
Member

hi @Ol1ver0413

Hope you're doing well!

Just wanted to check in on this PR. It looks like there's some feedback that needs to be addressed, along with some failing in pre-commit checks.

Please let us know if you have any questions or need a hand with anything. We're looking forward to getting this merged!

Cheers

@Ol1ver0413
Copy link
Collaborator Author

Ol1ver0413 commented Oct 19, 2025

hi @Ol1ver0413

Hope you're doing well!

Just wanted to check in on this PR. It looks like there's some feedback that needs to be addressed, along with some failing in pre-commit checks.

Please let us know if you have any questions or need a hand with anything. We're looking forward to getting this merged!

Cheers

Hi, @Wendong-Fan!
I’ve been handling some work transitions lately, which may have caused me to overlook enhancing the running mode in the workforce. I’ll make sure to complete it soon.
Thanks again.

@review-notebook-app
Copy link

Check out this pull request on  ReviewNB

See visual diffs & provide feedback on Jupyter Notebooks.


Powered by ReviewNB

@a7m-1st
Copy link
Collaborator

a7m-1st commented Oct 21, 2025

@Ol1ver0413 There is merge conflicts I guess as I am seeing 140+ files changes.
cc: @Wendong-Fan

@Ol1ver0413
Copy link
Collaborator Author

Ol1ver0413 commented Oct 22, 2025

@Ol1ver0413 There is merge conflicts I guess as I am seeing 140+ files changes. cc: @Wendong-Fan

Yeah, I see that too. It's quite weird because I only modified workforce.py and example file. Let me take a check.

@a7m-1st
Copy link
Collaborator

a7m-1st commented Oct 22, 2025

Cool. all good now, thanks for the update will review it asap !

Copy link
Collaborator

@hesamsheikh hesamsheikh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your updates. I checked the PR again and added a few more comments.

if hasattr(self, '_last_parallel_tasks') and self._last_parallel_tasks:
wait_for = self._last_parallel_tasks
# Clear the parallel tasks after using them
delattr(self, '_last_parallel_tasks')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why delete the whole attribute? consider emptying the attribute or setting it to none.

self.task_counter = 0
self._last_task_id = None
if hasattr(self, '_last_parallel_tasks'):
delattr(self, '_last_parallel_tasks')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, pretty unusual to delete the attribute

Comment on lines 2048 to 2052
async def _process_task_with_hybrid(self, task: Task) -> Task:
"""Process task using hybrid approach (partial auto-decomposition)."""
# For now, fall back to auto-decompose mode
# This can be extended to support more sophisticated hybrid logic
return await self._process_task_with_auto_decompose(task)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this for future use? it's unused

Comment on lines 2064 to 2065
expected_task_ids = {task.id for task in self._pending_tasks}
expected_task_ids.update(task.id for task in self._completed_tasks)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are pending tasks counted as completed ones? seems like an issue.

@Ol1ver0413
Copy link
Collaborator Author

Thanks for your updates. I checked the PR again and added a few more comments.

Thanks @hesamsheikh for reviewing. Let me take a check.

@Wendong-Fan Wendong-Fan added Review Required PR need to be reviewed and removed Waiting for Update PR has been reviewed, need to be updated based on review comment labels Oct 26, 2025
@fengju0213 fengju0213 modified the milestones: Sprint 38, Sprint 41 Oct 27, 2025
Copy link
Collaborator

@a7m-1st a7m-1st left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Ol1ver0413 for your work. All worked well with my side, except some small points I would like your opinion on, and a small change perhaps overlooked when merging conflicts 🙌

Comment on lines +657 to +659
auto_depend: bool = True,
) -> 'Workforce':
"""Add a task to the pipeline with support for chaining.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we don't need to wrap the return in quotes?

Comment on lines +728 to +731

def pipeline_fork(self, task_contents: List[str]) -> 'Workforce':
"""Create parallel branches from the current task.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see that those parallel tasks only support text, would have been nice to perhaps support list of Tasks. Or is that not viable?

Comment on lines +2053 to +2059
# Collect results from all pipeline tasks
task.result = self._collect_pipeline_results()
task.state = (
TaskState.DONE if self._all_pipeline_tasks_successful()
else TaskState.FAILED
)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can reset the mode to AUTO_DECOMPOSE after completing? Perhaps another function to change the mode of workforce after init()

As when using any of pipeline's functions it switches mode automatically in _ensure_pipeline_builder() to PIPELINE

Comment on lines -3327 to -3332
and (
task.additional_info is None
or not task.additional_info.get(
"_needs_decomposition", False
)
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we revert this @Ol1ver0413?
More details on this on #3238.

This fix was added as we are mixing independent & subtasks in the same _pending_queue differentiated by _needs_decomposition flag. So we delay positing the independent tasks.

@Ol1ver0413
Copy link
Collaborator Author

Thanks @Ol1ver0413 for your work. All worked well with my side, except some small points I would like your opinion on, and a small change perhaps overlooked when merging conflicts 🙌

Thanks @a7m-1st . I'll take a review asap.

Copy link
Collaborator

@hesamsheikh hesamsheikh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please re-request a review after making the necessary changes @Ol1ver0413

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Review Required PR need to be reviewed

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

[Feature Request] Support more running mode in workforce

6 participants