Skip to content

Commit 62da8c5

Browse files
committed
Add workflow management
Refactoring following review
1 parent 47a1eb6 commit 62da8c5

File tree

11 files changed

+5599
-0
lines changed

11 files changed

+5599
-0
lines changed

libs/labelbox/src/labelbox/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,3 +101,4 @@
101101
from labelbox.schema.taskstatus import TaskStatus
102102
from labelbox.schema.api_key import ApiKey
103103
from labelbox.schema.timeunit import TimeUnit
104+
from labelbox.schema.workflow import ProjectWorkflow

libs/labelbox/src/labelbox/schema/project.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
ProjectOverview,
6060
ProjectOverviewDetailed,
6161
)
62+
from labelbox.schema.workflow import ProjectWorkflow
6263
from labelbox.schema.resource_tag import ResourceTag
6364
from labelbox.schema.task import Task
6465
from labelbox.schema.task_queue import TaskQueue
@@ -1702,6 +1703,41 @@ def get_labeling_service_dashboard(self) -> LabelingServiceDashboard:
17021703
"""
17031704
return LabelingServiceDashboard.get(self.client, self.uid)
17041705

1706+
def get_workflow(self):
1707+
"""Get the workflow configuration for this project.
1708+
1709+
Workflows are automatically created when projects are created.
1710+
1711+
Returns:
1712+
ProjectWorkflow: A ProjectWorkflow object containing the project workflow information.
1713+
"""
1714+
from labelbox.schema.workflow import ProjectWorkflow
1715+
1716+
return ProjectWorkflow.get_workflow(self.client, self.uid)
1717+
1718+
def clone_workflow_from(self, source_project_id: str) -> "ProjectWorkflow":
1719+
"""Clones a workflow from another project to this project.
1720+
1721+
Args:
1722+
source_project_id (str): The ID of the project to clone the workflow from
1723+
1724+
Returns:
1725+
ProjectWorkflow: The cloned workflow in this project
1726+
"""
1727+
from labelbox.schema.workflow import ProjectWorkflow
1728+
1729+
# Get the source workflow
1730+
source_workflow = ProjectWorkflow.get_workflow(
1731+
self.client, source_project_id
1732+
)
1733+
1734+
# Use copy_workflow_structure to clone the workflow
1735+
return ProjectWorkflow.copy_workflow_structure(
1736+
source_workflow=source_workflow,
1737+
target_client=self.client,
1738+
target_project_id=self.uid,
1739+
)
1740+
17051741

17061742
class ProjectMember(DbObject):
17071743
user = Relationship.ToOne("User", cache=True)
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
"""
2+
This module contains classes for managing project workflows in Labelbox.
3+
It provides strongly-typed classes for nodes, edges, and workflow configuration.
4+
"""
5+
6+
# Import all workflow classes to expose them at the package level
7+
from labelbox.schema.workflow.enums import (
8+
WorkflowDefinitionId,
9+
NodeOutput,
10+
NodeInput,
11+
MatchFilters,
12+
Scope,
13+
FilterField,
14+
FilterOperator,
15+
)
16+
from labelbox.schema.workflow.base import (
17+
BaseWorkflowNode,
18+
NodePosition,
19+
)
20+
from labelbox.schema.workflow.nodes import (
21+
InitialLabelingNode,
22+
InitialReworkNode,
23+
ReviewNode,
24+
ReworkNode,
25+
LogicNode,
26+
DoneNode,
27+
CustomReworkNode,
28+
AutoQANode,
29+
UnknownWorkflowNode,
30+
)
31+
from labelbox.schema.workflow.edges import (
32+
WorkflowEdge,
33+
WorkflowEdgeFactory,
34+
)
35+
from labelbox.schema.workflow.graph import ProjectWorkflowGraph
36+
from labelbox.schema.workflow.workflow import ProjectWorkflow, NodeType
37+
38+
# Import Fluent DSL for improved filter construction
39+
from labelbox.schema.workflow.project_filter import (
40+
# Filter construction functions
41+
created_by,
42+
labeled_by,
43+
annotation,
44+
sample,
45+
dataset,
46+
batch,
47+
issue_category,
48+
model_prediction,
49+
natural_language,
50+
# Metadata helper functions (NEW - recommended)
51+
metadata,
52+
condition,
53+
# Field instances for chaining
54+
labeled_at,
55+
labeling_time,
56+
review_time,
57+
consensus_average,
58+
feature_consensus_average,
59+
# Main filter class
60+
ProjectWorkflowFilter,
61+
# Utility functions
62+
convert_to_api_format,
63+
)
64+
65+
# Re-export key classes at the module level
66+
__all__ = [
67+
# Core workflow components
68+
"WorkflowDefinitionId",
69+
"NodeOutput",
70+
"NodeInput",
71+
"MatchFilters",
72+
"Scope",
73+
"FilterField",
74+
"FilterOperator",
75+
"BaseWorkflowNode",
76+
"NodePosition",
77+
"InitialLabelingNode",
78+
"InitialReworkNode",
79+
"ReviewNode",
80+
"ReworkNode",
81+
"LogicNode",
82+
"DoneNode",
83+
"CustomReworkNode",
84+
"AutoQANode",
85+
"UnknownWorkflowNode",
86+
"WorkflowEdge",
87+
"WorkflowEdgeFactory",
88+
"ProjectWorkflow",
89+
"NodeType",
90+
"ProjectWorkflowGraph",
91+
"ProjectWorkflowFilter",
92+
# Fluent DSL exports
93+
"created_by",
94+
"labeled_by",
95+
"annotation",
96+
"sample",
97+
"dataset",
98+
"batch",
99+
"issue_category",
100+
"model_prediction",
101+
"natural_language",
102+
"labeled_at",
103+
"labeling_time",
104+
"review_time",
105+
"consensus_average",
106+
"feature_consensus_average",
107+
"metadata",
108+
"condition",
109+
# Utility functions
110+
"convert_to_api_format",
111+
]
112+
113+
# Define a mapping of node types for backward compatibility
114+
NODE_TYPE_MAP = {
115+
WorkflowDefinitionId.InitialLabelingTask: InitialLabelingNode,
116+
WorkflowDefinitionId.InitialReworkTask: InitialReworkNode,
117+
WorkflowDefinitionId.ReviewTask: ReviewNode,
118+
WorkflowDefinitionId.SendToRework: ReworkNode,
119+
WorkflowDefinitionId.Logic: LogicNode,
120+
WorkflowDefinitionId.Done: DoneNode,
121+
WorkflowDefinitionId.CustomReworkTask: CustomReworkNode,
122+
WorkflowDefinitionId.AutoQA: AutoQANode,
123+
WorkflowDefinitionId.Unknown: UnknownWorkflowNode,
124+
}
Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
"""Base classes and mixins for Project Workflow nodes in Labelbox."""
2+
3+
import logging
4+
import random
5+
import string
6+
from typing import Dict, List, Any, Optional
7+
from abc import abstractmethod
8+
from pydantic import BaseModel, Field, ConfigDict
9+
10+
from labelbox.schema.workflow.enums import WorkflowDefinitionId, NodeOutput
11+
12+
logger = logging.getLogger(__name__)
13+
14+
15+
def format_time_duration(seconds: int) -> str:
16+
"""Convert seconds to human-readable time format.
17+
18+
Args:
19+
seconds: Time duration in seconds
20+
21+
Returns:
22+
Human-readable time string (e.g., "1h 30m", "5m 30s", "45s")
23+
"""
24+
if seconds >= 3600: # >= 1 hour
25+
hours = seconds // 3600
26+
if seconds % 3600 == 0:
27+
return f"{hours}h"
28+
else:
29+
minutes = (seconds % 3600) // 60
30+
if minutes == 0:
31+
return f"{hours}h"
32+
else:
33+
return f"{hours}h {minutes}m"
34+
elif seconds >= 60: # >= 1 minute
35+
minutes = seconds // 60
36+
if seconds % 60 == 0:
37+
return f"{minutes}m"
38+
else:
39+
return f"{minutes}m {seconds % 60}s"
40+
else:
41+
return f"{seconds}s"
42+
43+
44+
def format_metadata_operator(operator: str) -> tuple[str, str]:
45+
"""Format metadata operator for display and JSON.
46+
47+
Args:
48+
operator: Raw operator string
49+
50+
Returns:
51+
Tuple of (display_operator, json_operator)
52+
"""
53+
operator_mappings = {
54+
"contains": ("CONTAINS", "contains"),
55+
"contain": ("CONTAINS", "contains"),
56+
"does_not_contain": ("DOES NOT CONTAIN", "does_not_contain"),
57+
"startswith": ("STARTS WITH", "starts_with"),
58+
"starts_with": ("STARTS WITH", "starts_with"),
59+
"start": ("STARTS WITH", "starts_with"),
60+
"endswith": ("ENDS WITH", "ends_with"),
61+
"ends_with": ("ENDS WITH", "ends_with"),
62+
"end": ("ENDS WITH", "ends_with"),
63+
"is_any": ("IS ANY", "is_any"),
64+
"is_not_any": ("IS NOT ANY", "is_not_any"),
65+
}
66+
67+
return operator_mappings.get(operator, (operator.upper(), operator))
68+
69+
70+
def generate_filter_id() -> str:
71+
"""Generate a unique filter ID for metadata filters."""
72+
return "".join(random.choices(string.ascii_lowercase + string.digits, k=6))
73+
74+
75+
class NodePosition(BaseModel):
76+
"""Represents the position of a node in the workflow canvas."""
77+
78+
x: float = Field(default=0.0, description="X coordinate")
79+
y: float = Field(default=0.0, description="Y coordinate")
80+
81+
82+
class InstructionsMixin:
83+
"""Mixin to handle instructions syncing with custom_fields.description."""
84+
85+
def sync_instructions_with_custom_fields(self):
86+
"""Sync instructions with customFields.description."""
87+
# First, try to load instructions from customFields.description if not already set
88+
instructions = getattr(self, "instructions", None)
89+
custom_fields = getattr(self, "custom_fields", None)
90+
91+
if (
92+
instructions is None
93+
and custom_fields
94+
and "description" in custom_fields
95+
):
96+
# Use object.__setattr__ to bypass the frozen field restriction
97+
object.__setattr__(
98+
self, "instructions", custom_fields["description"]
99+
)
100+
101+
# Then sync instructions to customFields if instructions is set
102+
instructions = getattr(self, "instructions", None)
103+
if instructions is not None:
104+
custom_fields = getattr(self, "custom_fields", None)
105+
if custom_fields is None:
106+
object.__setattr__(self, "custom_fields", {})
107+
custom_fields = getattr(self, "custom_fields")
108+
custom_fields["description"] = instructions
109+
return self
110+
111+
112+
class WorkflowSyncMixin:
113+
"""Mixin to handle syncing node changes back to workflow config."""
114+
115+
def _sync_to_workflow(self) -> None:
116+
"""Sync node properties to the workflow config."""
117+
workflow = getattr(self, "raw_data", {}).get("_workflow")
118+
if workflow and hasattr(workflow, "config"):
119+
for node_data in workflow.config.get("nodes", []):
120+
if node_data.get("id") == getattr(self, "id", None):
121+
# Update label
122+
if hasattr(self, "label"):
123+
node_data["label"] = getattr(self, "label")
124+
# Update instructions via customFields
125+
instructions = getattr(self, "instructions", None)
126+
if instructions is not None:
127+
if "customFields" not in node_data:
128+
node_data["customFields"] = {}
129+
node_data["customFields"]["description"] = instructions
130+
# Update customFields
131+
custom_fields = getattr(self, "custom_fields", None)
132+
if custom_fields:
133+
node_data["customFields"] = custom_fields
134+
# Update filters if present
135+
filters = getattr(self, "filters", None)
136+
if filters:
137+
node_data["filters"] = filters
138+
# Update config if present
139+
node_config = getattr(self, "node_config", None)
140+
if node_config:
141+
node_data["config"] = node_config
142+
break
143+
144+
def sync_property_change(self, property_name: str) -> None:
145+
"""Handle property changes that need workflow syncing."""
146+
if property_name == "instructions" and hasattr(self, "id"):
147+
# Also update custom_fields on the node object itself
148+
instructions = getattr(self, "instructions", None)
149+
if instructions is not None:
150+
custom_fields = getattr(self, "custom_fields", None)
151+
if custom_fields is None:
152+
object.__setattr__(self, "custom_fields", {})
153+
custom_fields = getattr(self, "custom_fields")
154+
custom_fields["description"] = instructions
155+
self._sync_to_workflow()
156+
157+
158+
class BaseWorkflowNode(BaseModel, InstructionsMixin, WorkflowSyncMixin):
159+
"""Base class for all workflow nodes with common functionality."""
160+
161+
id: str = Field(description="Unique identifier for the node")
162+
position: NodePosition = Field(
163+
default_factory=NodePosition, description="Node position on canvas"
164+
)
165+
definition_id: WorkflowDefinitionId = Field(
166+
alias="definitionId", description="Type of workflow node"
167+
)
168+
inputs: List[str] = Field(
169+
default_factory=lambda: [], description="List of input node IDs"
170+
)
171+
output_if: Optional[str] = Field(
172+
default=None, description="ID of node connected to 'if' output"
173+
)
174+
output_else: Optional[str] = Field(
175+
default=None, description="ID of node connected to 'else' output"
176+
)
177+
raw_data: Dict[str, Any] = Field(
178+
default_factory=dict, description="Raw configuration data"
179+
)
180+
181+
model_config = ConfigDict(
182+
populate_by_name=True,
183+
arbitrary_types_allowed=True,
184+
extra="forbid",
185+
)
186+
187+
def __init__(self, **data):
188+
super().__init__(**data)
189+
# Sync instructions after initialization
190+
self.sync_instructions_with_custom_fields()
191+
192+
@property
193+
@abstractmethod
194+
def supported_outputs(self) -> List[NodeOutput]:
195+
"""Returns the list of supported output types for this node."""
196+
pass
197+
198+
@property
199+
def name(self) -> Optional[str]:
200+
"""Get the node's name (label)."""
201+
return getattr(self, "label", None) or self.raw_data.get("label")
202+
203+
@name.setter
204+
def name(self, value: str) -> None:
205+
"""Set the node's name (updates label)."""
206+
if hasattr(self, "label"):
207+
object.__setattr__(self, "label", value)
208+
self._sync_to_workflow()
209+
210+
def __setattr__(self, name: str, value) -> None:
211+
"""Override setattr to sync property changes to workflow."""
212+
super().__setattr__(name, value)
213+
# Sync changes to workflow for important properties
214+
if name in ["instructions", "label"] and hasattr(self, "id"):
215+
self.sync_property_change(name)

0 commit comments

Comments
 (0)