Skip to content
5 changes: 4 additions & 1 deletion camel/societies/workforce/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@

from .role_playing_worker import RolePlayingWorker
from .single_agent_worker import SingleAgentWorker
from .utils import PipelineTaskBuilder
from .workforce import Workforce, WorkforceMode
from .workflow_memory_manager import WorkflowSelectionMethod
from .workforce import Workforce

__all__ = [
"Workforce",
"WorkforceMode",
"PipelineTaskBuilder",
"SingleAgentWorker",
"RolePlayingWorker",
"WorkflowSelectionMethod",
Expand Down
279 changes: 279 additions & 0 deletions camel/societies/workforce/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,285 @@ def quality_sufficient(self) -> bool:
)


class PipelineTaskBuilder:
r"""Helper class for building pipeline tasks with dependencies."""

def __init__(self):
"""Initialize an empty pipeline task builder."""
from camel.tasks import Task
self._TaskClass = Task
self.task_list = []
self.task_counter = 0
self._task_registry = {} # task_id -> Task mapping for fast lookup
self._last_task_id = None # Track the last added task for chain inference
self._last_parallel_tasks = [] # Track the last added parallel tasks for sync

def add(
self,
content: str,
task_id: Optional[str] = None,
dependencies: Optional[List[str]] = None,
additional_info: Optional[dict] = None,
auto_depend: bool = True,
) -> 'PipelineTaskBuilder':
"""Add a task to the pipeline with support for chaining.

Args:
content (str): The content/description of the task.
task_id (str, optional): Unique identifier for the task. If None,
a unique ID will be generated. (default: :obj:`None`)
dependencies (List[str], optional): List of task IDs that this
task depends on. If None and auto_depend=True, will depend on
the last added task. (default: :obj:`None`)
additional_info (dict, optional): Additional information
for the task. (default: :obj:`None`)
auto_depend (bool, optional): If True and dependencies is None,
automatically depend on the last added task. (default: :obj:`True`)

Returns:
PipelineTaskBuilder: Self for method chaining.

Raises:
ValueError: If task_id already exists or if any dependency is not found.

Example:
>>> builder.add("Step 1").add("Step 2").add("Step 3")
# Step 2 depends on Step 1, Step 3 depends on Step 2
"""
# Generate or validate task_id
task_id = task_id or f"pipeline_task_{self.task_counter}"

# Check ID uniqueness
if task_id in self._task_registry:
raise ValueError(f"Task ID '{task_id}' already exists")

# Auto-infer dependencies if not specified
if dependencies is None and auto_depend and self._last_task_id is not None:
dependencies = [self._last_task_id]

# Validate dependencies exist
dep_tasks = []
if dependencies:
missing_deps = [dep for dep in dependencies if dep not in self._task_registry]
if missing_deps:
raise ValueError(f"Dependencies not found: {missing_deps}")
dep_tasks = [self._task_registry[dep] for dep in dependencies]

# Create task
task = self._TaskClass(
content=content,
id=task_id,
dependencies=dep_tasks,
additional_info=additional_info,
)

self.task_list.append(task)
self._task_registry[task_id] = task
self._last_task_id = task_id # Update last task for chaining
self.task_counter += 1
return self

def add_parallel_tasks(
self,
task_contents: List[str],
dependencies: Optional[List[str]] = None,
task_id_prefix: str = "parallel",
auto_depend: bool = True,
) -> 'PipelineTaskBuilder':
"""Add multiple parallel tasks that can execute simultaneously.

Args:
task_contents (List[str]): List of task content strings.
dependencies (List[str], optional): Common dependencies for all
parallel tasks. If None and auto_depend=True, will depend on
the last added task. (default: :obj:`None`)
task_id_prefix (str, optional): Prefix for generated task IDs.
(default: :obj:`"parallel"`)
auto_depend (bool, optional): If True and dependencies is None,
automatically depend on the last added task. (default: :obj:`True`)

Returns:
PipelineTaskBuilder: Self for method chaining.

Raises:
ValueError: If any task_id already exists or if any dependency is not found.

Example:
>>> builder.add("Collect Data").add_parallel_tasks([
... "Technical Analysis", "Fundamental Analysis"
... ]).add_sync_task("Generate Report")
"""
if not task_contents:
raise ValueError("task_contents cannot be empty")

# Auto-infer dependencies if not specified
if dependencies is None and auto_depend and self._last_task_id is not None:
dependencies = [self._last_task_id]

parallel_task_ids = []
base_counter = self.task_counter # Save current counter for consistent naming

for i, content in enumerate(task_contents):
task_id = f"{task_id_prefix}_{i}_{base_counter}"
# Use auto_depend=False since we're manually managing dependencies
self.add(content, task_id, dependencies, auto_depend=False)
parallel_task_ids.append(task_id)

# Set the last task to None since we have multiple parallel endings
# The next task will need to explicitly specify dependencies
self._last_task_id = None
# Store parallel task IDs for potential sync operations
self._last_parallel_tasks = parallel_task_ids

return self

def add_sync_task(
self,
content: str,
wait_for: Optional[List[str]] = None,
task_id: Optional[str] = None,
) -> 'PipelineTaskBuilder':
"""Add a synchronization task that waits for multiple tasks.

Args:
content (str): Content of the synchronization task.
wait_for (List[str], optional): List of task IDs to wait for.
If None, will automatically wait for the last parallel tasks.
(default: :obj:`None`)
task_id (str, optional): ID for the sync task. If None, a unique
ID will be generated. (default: :obj:`None`)

Returns:
PipelineTaskBuilder: Self for method chaining.

Raises:
ValueError: If task_id already exists or if any dependency is not found.

Example:
>>> builder.add_parallel_tasks(["Task A", "Task B"]).add_sync_task("Merge Results")
# Automatically waits for both parallel tasks
"""
# Auto-infer wait_for from last parallel tasks
if wait_for is None:
if self._last_parallel_tasks:
wait_for = self._last_parallel_tasks
# Clear the parallel tasks after using them
self._last_parallel_tasks = []
else:
raise ValueError("wait_for cannot be empty for sync task and no parallel tasks found")

if not wait_for:
raise ValueError("wait_for cannot be empty for sync task")

return self.add(content, task_id, dependencies=wait_for, auto_depend=False)


def build(self) -> List:
"""Build and return the complete task list with dependencies.

Returns:
List[Task]: List of tasks with proper dependency relationships.

Raises:
ValueError: If there are circular dependencies or other validation errors.
"""
if not self.task_list:
raise ValueError("No tasks defined in pipeline")

# Validate no circular dependencies
self._validate_dependencies()

return self.task_list.copy()

def clear(self) -> None:
"""Clear all tasks from the builder."""
self.task_list.clear()
self._task_registry.clear()
self.task_counter = 0
self._last_task_id = None
self._last_parallel_tasks = []

def fork(self, task_contents: List[str]) -> 'PipelineTaskBuilder':
"""Create parallel branches from the current task (alias for add_parallel_tasks).

Args:
task_contents (List[str]): List of task content strings for parallel execution.

Returns:
PipelineTaskBuilder: Self for method chaining.

Example:
>>> builder.add("Collect Data").fork([
... "Technical Analysis", "Fundamental Analysis"
... ]).join("Generate Report")
"""
return self.add_parallel_tasks(task_contents)

def join(self, content: str, task_id: Optional[str] = None) -> 'PipelineTaskBuilder':
"""Join parallel branches with a synchronization task (alias for add_sync_task).

Args:
content (str): Content of the join/sync task.
task_id (str, optional): ID for the sync task.

Returns:
PipelineTaskBuilder: Self for method chaining.

Example:
>>> builder.fork(["Task A", "Task B"]).join("Merge Results")
"""
return self.add_sync_task(content, task_id=task_id)


def _validate_dependencies(self) -> None:
"""Validate that there are no circular dependencies.

Raises:
ValueError: If circular dependencies are detected.
"""
# Use DFS to detect cycles
visited = set()
rec_stack = set()

def has_cycle(task_id: str) -> bool:
visited.add(task_id)
rec_stack.add(task_id)

task = self._task_registry[task_id]
for dep in task.dependencies:
if dep.id not in visited:
if has_cycle(dep.id):
return True
elif dep.id in rec_stack:
return True

rec_stack.remove(task_id)
return False

for task_id in self._task_registry:
if task_id not in visited:
if has_cycle(task_id):
raise ValueError(f"Circular dependency detected involving task: {task_id}")

def get_task_info(self) -> dict:
"""Get information about all tasks in the pipeline.

Returns:
dict: Dictionary containing task count and task details.
"""
return {
"task_count": len(self.task_list),
"tasks": [
{
"id": task.id,
"content": task.content,
"dependencies": [dep.id for dep in task.dependencies]
}
for task in self.task_list
]
}


def check_if_running(
running: bool,
max_retries: int = 3,
Expand Down
Loading
Loading