diff --git a/libs/labelbox/src/labelbox/schema/workflow/__init__.py b/libs/labelbox/src/labelbox/schema/workflow/__init__.py index 4246edba5..d33643124 100644 --- a/libs/labelbox/src/labelbox/schema/workflow/__init__.py +++ b/libs/labelbox/src/labelbox/schema/workflow/__init__.py @@ -39,12 +39,18 @@ from labelbox.schema.workflow.graph import ProjectWorkflowGraph # Import from monolithic workflow.py file -from labelbox.schema.workflow.workflow import ProjectWorkflow, NodeType +from labelbox.schema.workflow.workflow import ( + ProjectWorkflow, + NodeType, + InitialNodes, +) + +# Import configuration classes +from labelbox.schema.workflow.config import LabelingConfig, ReworkConfig # Import from monolithic project_filter.py file from labelbox.schema.workflow.project_filter import ( ProjectWorkflowFilter, - created_by, labeled_by, annotation, dataset, @@ -94,13 +100,17 @@ "NodeType", "ProjectWorkflowGraph", "ProjectWorkflowFilter", - # Filter construction functions - "created_by", + # Workflow configuration + "InitialNodes", + "LabelingConfig", + "ReworkConfig", + # Filter field objects "labeled_by", "annotation", - "sample", "dataset", "issue_category", + # Filter construction functions + "sample", "model_prediction", "natural_language", "labeled_at", diff --git a/libs/labelbox/src/labelbox/schema/workflow/config.py b/libs/labelbox/src/labelbox/schema/workflow/config.py new file mode 100644 index 000000000..273826c62 --- /dev/null +++ b/libs/labelbox/src/labelbox/schema/workflow/config.py @@ -0,0 +1,44 @@ +"""Configuration models for workflow initial nodes.""" + +from typing import Optional, Union, List +from pydantic import BaseModel, Field + + +class LabelingConfig(BaseModel): + """Configuration for InitialLabeling node creation. + + Attributes: + instructions: Task instructions for labelers + max_contributions_per_user: Maximum contributions per user (null means infinite) + """ + + instructions: Optional[str] = Field( + default=None, description="Task instructions for labelers" + ) + max_contributions_per_user: Optional[int] = Field( + default=None, + description="Maximum contributions per user (null means infinite)", + ge=0, + ) + + +class ReworkConfig(BaseModel): + """Configuration for InitialRework node creation. + + Attributes: + instructions: Task instructions for rework + individual_assignment: User IDs for individual assignment + max_contributions_per_user: Maximum contributions per user (null means infinite) + """ + + instructions: Optional[str] = Field( + default=None, description="Task instructions for rework" + ) + individual_assignment: Optional[Union[str, List[str]]] = Field( + default=None, description="User IDs for individual assignment" + ) + max_contributions_per_user: Optional[int] = Field( + default=None, + description="Maximum contributions per user (null means infinite)", + ge=0, + ) diff --git a/libs/labelbox/src/labelbox/schema/workflow/enums.py b/libs/labelbox/src/labelbox/schema/workflow/enums.py index b0bf17e06..cf87a9147 100644 --- a/libs/labelbox/src/labelbox/schema/workflow/enums.py +++ b/libs/labelbox/src/labelbox/schema/workflow/enums.py @@ -106,7 +106,7 @@ class FilterField(str, Enum): """ # User and creation filters - CreatedBy = "CreatedBy" + LabeledBy = "CreatedBy" # Maps to backend CreatedBy field # Annotation and content filters Annotation = "Annotation" diff --git a/libs/labelbox/src/labelbox/schema/workflow/nodes/autoqa_node.py b/libs/labelbox/src/labelbox/schema/workflow/nodes/autoqa_node.py index 4f37530bf..d0cb57b45 100644 --- a/libs/labelbox/src/labelbox/schema/workflow/nodes/autoqa_node.py +++ b/libs/labelbox/src/labelbox/schema/workflow/nodes/autoqa_node.py @@ -79,7 +79,7 @@ class AutoQANode(BaseWorkflowNode): The evaluation results determine automatic routing without human intervention. """ - label: str = Field(default="Label Score (AutoQA)") + label: str = Field(default="Label Score (AutoQA)", max_length=50) filters: List[Dict[str, Any]] = Field( default_factory=lambda: [], description="Contains the filters for the AutoQA node", diff --git a/libs/labelbox/src/labelbox/schema/workflow/nodes/custom_rework_node.py b/libs/labelbox/src/labelbox/schema/workflow/nodes/custom_rework_node.py index abec42576..e863515b7 100644 --- a/libs/labelbox/src/labelbox/schema/workflow/nodes/custom_rework_node.py +++ b/libs/labelbox/src/labelbox/schema/workflow/nodes/custom_rework_node.py @@ -81,7 +81,7 @@ class CustomReworkNode(BaseWorkflowNode): rework processes and quality checks. """ - label: str = Field(default="") + label: str = Field(default="", max_length=50) node_config: List[ConfigEntry] = Field( default_factory=lambda: [], description="Contains assignment rules etc.", @@ -117,6 +117,7 @@ class CustomReworkNode(BaseWorkflowNode): default=None, description="Maximum contributions per user (null means infinite)", alias="maxContributionsPerUser", + ge=0, ) # Has one input and one output output_else: None = Field(default=None, frozen=True) # Only one output (if) diff --git a/libs/labelbox/src/labelbox/schema/workflow/nodes/done_node.py b/libs/labelbox/src/labelbox/schema/workflow/nodes/done_node.py index 4e18da5f9..fce1d3bd6 100644 --- a/libs/labelbox/src/labelbox/schema/workflow/nodes/done_node.py +++ b/libs/labelbox/src/labelbox/schema/workflow/nodes/done_node.py @@ -57,7 +57,7 @@ class DoneNode(BaseWorkflowNode): and will not flow to any other nodes in the workflow. """ - label: str = Field(default="Done") + label: str = Field(default="Done", max_length=50) definition_id: WorkflowDefinitionId = Field( default=WorkflowDefinitionId.Done, frozen=True, alias="definitionId" ) diff --git a/libs/labelbox/src/labelbox/schema/workflow/nodes/initial_labeling_node.py b/libs/labelbox/src/labelbox/schema/workflow/nodes/initial_labeling_node.py index e9f1e1f4d..1adff2e2b 100644 --- a/libs/labelbox/src/labelbox/schema/workflow/nodes/initial_labeling_node.py +++ b/libs/labelbox/src/labelbox/schema/workflow/nodes/initial_labeling_node.py @@ -59,7 +59,9 @@ class InitialLabelingNode(BaseWorkflowNode): and cannot have incoming connections from other nodes. """ - label: str = Field(default="Initial labeling task", frozen=True) + label: str = Field( + default="Initial labeling task", frozen=True, max_length=50 + ) filter_logic: Literal["and", "or"] = Field( default=DEFAULT_FILTER_LOGIC_AND, alias="filterLogic" ) @@ -84,6 +86,7 @@ class InitialLabelingNode(BaseWorkflowNode): default=None, description="Maximum contributions per user (null means infinite)", alias="maxContributionsPerUser", + ge=0, ) node_config: List[ConfigEntry] = Field( default_factory=lambda: [], diff --git a/libs/labelbox/src/labelbox/schema/workflow/nodes/initial_rework_node.py b/libs/labelbox/src/labelbox/schema/workflow/nodes/initial_rework_node.py index 39054de01..fca96aeb0 100644 --- a/libs/labelbox/src/labelbox/schema/workflow/nodes/initial_rework_node.py +++ b/libs/labelbox/src/labelbox/schema/workflow/nodes/initial_rework_node.py @@ -66,7 +66,9 @@ class InitialReworkNode(BaseWorkflowNode): to ensure proper routing in the Labelbox platform. """ - label: str = Field(default="Rework (all rejected)", frozen=True) + label: str = Field( + default="Rework (all rejected)", frozen=True, max_length=50 + ) filter_logic: Literal["and", "or"] = Field( default=DEFAULT_FILTER_LOGIC_AND, alias="filterLogic" ) @@ -100,6 +102,7 @@ class InitialReworkNode(BaseWorkflowNode): default=None, description="Maximum contributions per user (null means infinite)", alias="maxContributionsPerUser", + ge=0, ) @field_validator("individual_assignment", mode="before") diff --git a/libs/labelbox/src/labelbox/schema/workflow/nodes/logic_node.py b/libs/labelbox/src/labelbox/schema/workflow/nodes/logic_node.py index 167d64aa2..1457a7227 100644 --- a/libs/labelbox/src/labelbox/schema/workflow/nodes/logic_node.py +++ b/libs/labelbox/src/labelbox/schema/workflow/nodes/logic_node.py @@ -30,7 +30,9 @@ class LogicNode(BaseWorkflowNode): """Logic node. One or more instances possible. One input, two outputs (if/else).""" label: str = Field( - default="Logic", description="Display name for the logic node" + default="Logic", + description="Display name for the logic node", + max_length=50, ) filters: List[Dict[str, Any]] = Field( default_factory=lambda: [], @@ -199,7 +201,7 @@ def remove_filter(self, filter_field: FilterField) -> "LogicNode": Args: filter_field: FilterField enum value specifying which filter type to remove - (e.g., FilterField.CreatedBy, FilterField.Sample, FilterField.LabelingTime) + (e.g., FilterField.LabeledBy, FilterField.Sample, FilterField.LabelingTime) Returns: LogicNode: Self for method chaining @@ -209,7 +211,7 @@ def remove_filter(self, filter_field: FilterField) -> "LogicNode": >>> >>> # Type-safe enum approach (required) >>> logic.remove_filter(FilterField.Sample) - >>> logic.remove_filter(FilterField.CreatedBy) + >>> logic.remove_filter(FilterField.LabeledBy) # Consistent with labeled_by() function >>> logic.remove_filter(FilterField.LabelingTime) >>> logic.remove_filter(FilterField.Metadata) """ @@ -371,7 +373,7 @@ def get_filters(self) -> "ProjectWorkflowFilter": >>> logic = workflow.get_node_by_id("some-logic-node-id") >>> user_filters = logic.get_filters() >>> # Add a new filter - >>> user_filters.append(created_by(["new-user-id"])) + >>> user_filters.append(labeled_by(["new-user-id"])) >>> # Apply the updated filters back to the node >>> logic.set_filters(user_filters) """ @@ -393,25 +395,25 @@ def add_filter(self, filter_rule: Dict[str, Any]) -> "LogicNode": Args: filter_rule: Filter rule from filter functions - (e.g., created_by(["user_id"]), labeling_time.greater_than(300)) + (e.g., labeled_by(["user_id"]), labeling_time.greater_than(300)) Returns: LogicNode: Self for method chaining Example: - >>> from labelbox.schema.workflow.project_filter import created_by, labeling_time, metadata, condition + >>> from labelbox.schema.workflow.project_filter import labeled_by, labeling_time, metadata, condition >>> - >>> logic.add_filter(created_by(["user-123"])) + >>> logic.add_filter(labeled_by(["user-123"])) >>> logic.add_filter(labeling_time.greater_than(300)) >>> logic.add_filter(metadata([condition.contains("tag", "test")])) - >>> # Adding another created_by filter will replace the previous one - >>> logic.add_filter(created_by(["user-456"])) # Replaces previous created_by filter + >>> # Adding another labeled_by filter will replace the previous one + >>> logic.add_filter(labeled_by(["user-456"])) # Replaces previous labeled_by filter """ # Validate that this looks like filter function output if not self._is_filter_function_output(filter_rule): raise ValueError( "add_filter() only accepts output from filter functions. " - "Use functions like created_by(), labeling_time.greater_than(), etc." + "Use functions like labeled_by(), labeling_time.greater_than(), etc." ) # Get the field name from the filter rule to check for existing filters @@ -455,7 +457,7 @@ def _is_filter_function_output(self, filter_rule: Dict[str, Any]) -> bool: # Map backend field names to FilterField enum values backend_to_field = { - "CreatedBy": FilterField.CreatedBy, + "CreatedBy": FilterField.LabeledBy, # Backend CreatedBy maps to user-facing LabeledBy "Annotation": FilterField.Annotation, "LabeledAt": FilterField.LabeledAt, "Sample": FilterField.Sample, diff --git a/libs/labelbox/src/labelbox/schema/workflow/nodes/review_node.py b/libs/labelbox/src/labelbox/schema/workflow/nodes/review_node.py index 9e3dab416..4c51d38c0 100644 --- a/libs/labelbox/src/labelbox/schema/workflow/nodes/review_node.py +++ b/libs/labelbox/src/labelbox/schema/workflow/nodes/review_node.py @@ -69,7 +69,7 @@ class ReviewNode(BaseWorkflowNode): which default to "and" logic. This allows more flexible routing. """ - label: str = Field(default="Review task") + label: str = Field(default="Review task", max_length=50) # For ReviewNode, filter_logic defaults to "or" filter_logic: Literal["and", "or"] = Field( default=DEFAULT_FILTER_LOGIC_OR, alias="filterLogic" @@ -96,6 +96,7 @@ class ReviewNode(BaseWorkflowNode): default=None, description="Maximum contributions per user (null means infinite)", alias="maxContributionsPerUser", + ge=0, ) node_config: List[Dict[str, Any]] = Field( default_factory=lambda: [], diff --git a/libs/labelbox/src/labelbox/schema/workflow/nodes/rework_node.py b/libs/labelbox/src/labelbox/schema/workflow/nodes/rework_node.py index bae284da8..079e4b434 100644 --- a/libs/labelbox/src/labelbox/schema/workflow/nodes/rework_node.py +++ b/libs/labelbox/src/labelbox/schema/workflow/nodes/rework_node.py @@ -61,7 +61,7 @@ class ReworkNode(BaseWorkflowNode): workflow's initial rework entry point without manual routing. """ - label: str = Field(default="Rework") + label: str = Field(default="Rework", max_length=50) filter_logic: Literal["and", "or"] = Field( default=DEFAULT_FILTER_LOGIC_AND, alias="filterLogic" ) diff --git a/libs/labelbox/src/labelbox/schema/workflow/nodes/unknown_workflow_node.py b/libs/labelbox/src/labelbox/schema/workflow/nodes/unknown_workflow_node.py index 5dc629c12..40a6036f9 100644 --- a/libs/labelbox/src/labelbox/schema/workflow/nodes/unknown_workflow_node.py +++ b/libs/labelbox/src/labelbox/schema/workflow/nodes/unknown_workflow_node.py @@ -71,7 +71,7 @@ class UnknownWorkflowNode(BaseWorkflowNode): as a safety mechanism to prevent data loss during parsing. """ - label: str = Field(default="") + label: str = Field(default="", max_length=50) node_config: Optional[List[Dict[str, Any]]] = Field( default=None, alias="config" ) diff --git a/libs/labelbox/src/labelbox/schema/workflow/project_filter.py b/libs/labelbox/src/labelbox/schema/workflow/project_filter.py index a3faced2a..785d2e03c 100644 --- a/libs/labelbox/src/labelbox/schema/workflow/project_filter.py +++ b/libs/labelbox/src/labelbox/schema/workflow/project_filter.py @@ -101,7 +101,7 @@ def is_one_of(self, values: List[str]) -> Dict[str, Any]: Args: values: List of IDs to match """ - return {self._field_name: values, "__operator": "is"} + return {self._field_name: values} def is_not_one_of(self, values: List[str]) -> Dict[str, Any]: """Filter for items that are NOT one of the specified values. @@ -171,30 +171,12 @@ def __call__( batch = ListField("Batch") consensus_average = RangeField("ConsensusAverage") feature_consensus_average = FeatureRangeField("FeatureConsensusAverage") -# Note: dataset is a function, not a field object - -# Function versions for filter functions -def dataset( - dataset_ids: List[str], label: Optional[str] = None -) -> Dict[str, Any]: - """Filter by dataset IDs. - - Args: - dataset_ids: List of dataset IDs to filter by - label: Optional custom label to display in the UI instead of the default "DS-0" format - - Returns: - Dict representing the filter rule - - Examples: - dataset(["dataset-123", "dataset-456"]) - dataset(["dataset-123"], label="My Custom Dataset") - """ - result: Dict[str, Any] = {"Dataset": dataset_ids} - if label is not None: - result["__label"] = label - return result +# List-based filter field instances +labeled_by = ListField("CreatedBy") # Maps to backend CreatedBy field +dataset = ListField("Dataset") +issue_category = ListField("IssueCategory") +annotation = ListField("Annotation") class MetadataCondition: @@ -302,60 +284,6 @@ def metadata( return result -def created_by( - user_ids: List[str], label: Optional[str] = None -) -> Dict[str, Any]: - """Filter by users who created the labels. - - Args: - user_ids: List of user IDs - label: Optional custom label to display in the UI - - Returns: - Dict representing the filter rule - """ - result: Dict[str, Any] = {"CreatedBy": user_ids} - if label is not None: - result["__label"] = label - return result - - -def labeled_by( - user_ids: List[str], label: Optional[str] = None -) -> Dict[str, Any]: - """Filter by users who labeled the data. - - Args: - user_ids: List of user IDs - label: Optional custom label to display in the UI - - Returns: - Dict representing the filter rule - """ - result: Dict[str, Any] = {"LabeledBy": user_ids} - if label is not None: - result["__label"] = label - return result - - -def annotation( - schema_node_ids: List[str], label: Optional[str] = None -) -> Dict[str, Any]: - """Filter by annotation schema node IDs. - - Args: - schema_node_ids: List of annotation schema node IDs - label: Optional custom label to display in the UI - - Returns: - Dict representing the filter rule - """ - result: Dict[str, Any] = {"Annotation": schema_node_ids} - if label is not None: - result["__label"] = label - return result - - def sample(percentage: int, label: Optional[str] = None) -> Dict[str, Any]: """Filter by random sample percentage. @@ -383,24 +311,6 @@ def sample(percentage: int, label: Optional[str] = None) -> Dict[str, Any]: return result -def issue_category( - category_ids: List[str], label: Optional[str] = None -) -> Dict[str, Any]: - """Filter by issue category IDs. - - Args: - category_ids: List of issue category IDs - label: Optional custom label to display in the UI - - Returns: - Dict representing the filter rule - """ - result: Dict[str, Any] = {"IssueCategory": category_ids} - if label is not None: - result["__label"] = label - return result - - def model_prediction( conditions: List[Dict[str, Any]], label: Optional[str] = None ) -> Dict[str, Any]: @@ -568,14 +478,17 @@ def convert_to_api_format(filter_rule: Dict[str, Any]) -> Dict[str, Any]: class ProjectWorkflowFilter(BaseModel): """ - Project workflow filter collection that enforces filter function syntax. + Project workflow filter collection that enforces filter syntax. - Only accepts filters created using filter functions in this module. + Only accepts filters created using filter field objects and functions in this module. This ensures type safety, IDE support, and eliminates manual string construction errors. Example Usage: filters = ProjectWorkflowFilter([ - created_by(["user-123"]), + labeled_by.is_one_of(["user-123"]), + dataset.is_one_of(["dataset-456"]), + issue_category.is_one_of(["cat1", "cat2"]), + annotation.is_one_of(["bbox", "segmentation"]), sample(20), labeled_at.between("2024-01-01", "2024-12-31"), metadata([condition.contains("tag", "test")]), @@ -586,7 +499,7 @@ class ProjectWorkflowFilter(BaseModel): logic.set_filters(filters) # Or add individual filters - logic.add_filter(created_by(["user-123"])) + logic.add_filter(labeled_by.is_one_of(["user-123"])) """ rules: List[Dict[str, Any]] = Field(default_factory=lambda: []) @@ -636,7 +549,7 @@ def _validate_filter_structure(self, rule: Dict[str, Any]) -> None: if not isinstance(rule, dict) or not rule: raise ValueError( "Filters must be created using filter functions. " - "Use functions like created_by([...]), metadata([...]), labeled_at.between(...), etc." + "Use functions like labeled_by([...]), metadata([...]), labeled_at.between(...), etc." ) # Basic structural validation - ensure we have at least one field diff --git a/libs/labelbox/src/labelbox/schema/workflow/workflow.py b/libs/labelbox/src/labelbox/schema/workflow/workflow.py index 3c45d2ffd..306f9a2c6 100644 --- a/libs/labelbox/src/labelbox/schema/workflow/workflow.py +++ b/libs/labelbox/src/labelbox/schema/workflow/workflow.py @@ -19,6 +19,7 @@ Union, Literal, overload, + NamedTuple, ) from pydantic import BaseModel, ConfigDict, PrivateAttr @@ -43,6 +44,7 @@ AutoQANode, ) from labelbox.schema.workflow.project_filter import ProjectWorkflowFilter +from labelbox.schema.workflow.config import LabelingConfig, ReworkConfig # Import the utility classes from labelbox.schema.workflow.workflow_utils import ( @@ -58,6 +60,18 @@ logger = logging.getLogger(__name__) +class InitialNodes(NamedTuple): + """Container for the two required initial workflow nodes. + + Attributes: + labeling: InitialLabeling node for new data entering workflow + rework: InitialRework node for rejected data needing corrections + """ + + labeling: InitialLabelingNode + rework: InitialReworkNode + + def _validate_definition_id( definition_id_str: str, node_id: str ) -> WorkflowDefinitionId: @@ -390,6 +404,9 @@ def validate(cls, workflow: "ProjectWorkflow") -> "ProjectWorkflow": def update_config(self, reposition: bool = True) -> "ProjectWorkflow": """Update the workflow configuration on the server. + This method automatically validates the workflow before updating to ensure + data integrity and prevent invalid configurations from being saved. + Args: reposition: Whether to automatically reposition nodes before update @@ -397,9 +414,26 @@ def update_config(self, reposition: bool = True) -> "ProjectWorkflow": ProjectWorkflow: Updated workflow instance Raises: - ValueError: If the update operation fails + ValueError: If validation errors are found or the update operation fails """ try: + # Always validate workflow before updating (mandatory for data safety) + validation_result = self.check_validity() + validation_errors = validation_result.get("errors", []) + + if validation_errors: + # Format validation errors for clear user feedback + formatted_errors = self.format_validation_errors( + validation_result + ) + logger.error(f"Workflow validation failed: {formatted_errors}") + + # Raise a clear ValueError with validation details + raise ValueError( + f"Cannot update workflow configuration due to validation errors:\n{formatted_errors}\n\n" + f"Please fix these issues before updating." + ) + if reposition: self.reposition_nodes() @@ -455,9 +489,59 @@ def update_config(self, reposition: bool = True) -> "ProjectWorkflow": raise ValueError(f"Failed to update workflow: {e}") # Workflow management operations - def reset_config(self) -> "ProjectWorkflow": - """Reset the workflow configuration to an empty workflow.""" - return WorkflowOperations.reset_config(self) + def reset_to_initial_nodes( + self, + labeling_config: Optional[LabelingConfig] = None, + rework_config: Optional[ReworkConfig] = None, + ) -> InitialNodes: + """Reset workflow and create the two required initial nodes. + + Clears all existing nodes and edges, then creates: + - InitialLabeling node: Entry point for new data requiring labeling + - InitialRework node: Entry point for rejected data requiring corrections + + Args: + labeling_config: Configuration for InitialLabeling node + rework_config: Configuration for InitialRework node + + Returns: + InitialNodes with labeling and rework nodes ready for workflow building + + Example: + >>> initial_nodes = workflow.reset_to_initial_nodes( + ... labeling_config=LabelingConfig(instructions="Label all objects", max_contributions_per_user=10), + ... rework_config=ReworkConfig(individual_assignment=["user-id-123"]) + ... ) + >>> done = workflow.add_node(type=NodeType.Done) + >>> workflow.add_edge(initial_nodes.labeling, done) + >>> workflow.add_edge(initial_nodes.rework, done) + """ + # Convert configs to dicts for node creation + labeling_dict = ( + labeling_config.model_dump(exclude_none=True) + if labeling_config + else {} + ) + rework_dict = ( + rework_config.model_dump(exclude_none=True) if rework_config else {} + ) + + # Reset workflow configuration + self.config = {"nodes": [], "edges": []} + self._nodes_cache = None + self._edges_cache = None + + # Create required initial nodes using internal method + initial_labeling = cast( + InitialLabelingNode, + self._create_node_internal(InitialLabelingNode, **labeling_dict), + ) + initial_rework = cast( + InitialReworkNode, + self._create_node_internal(InitialReworkNode, **rework_dict), + ) + + return InitialNodes(labeling=initial_labeling, rework=initial_rework) def delete_nodes(self, nodes: List[BaseWorkflowNode]) -> "ProjectWorkflow": """Delete specified nodes from the workflow.""" @@ -615,26 +699,6 @@ def _create_node_internal( return node # Type overloads for add_node method with node-specific parameters - @overload - def add_node( - self, - *, - type: Literal[NodeType.InitialLabeling], - instructions: Optional[str] = None, - max_contributions_per_user: Optional[int] = None, - **kwargs, - ) -> InitialLabelingNode: ... - - @overload - def add_node( - self, - *, - type: Literal[NodeType.InitialRework], - instructions: Optional[str] = None, - individual_assignment: Optional[Union[str, List[str]]] = None, - max_contributions_per_user: Optional[int] = None, - **kwargs, - ) -> InitialReworkNode: ... @overload def add_node( @@ -699,6 +763,13 @@ def add_node( def add_node(self, *, type: NodeType, **kwargs) -> BaseWorkflowNode: """Add a node to the workflow with type-specific parameters.""" + # Block manual creation of initial nodes + if type in [NodeType.InitialLabeling, NodeType.InitialRework]: + raise ValueError( + f"Cannot create {type.value} nodes via add_node(). " + f"Use workflow.reset_to_initial_nodes() instead." + ) + workflow_def_id = WorkflowDefinitionId(type.value) node_class = NODE_TYPE_MAP[workflow_def_id] diff --git a/libs/labelbox/src/labelbox/schema/workflow/workflow_operations.py b/libs/labelbox/src/labelbox/schema/workflow/workflow_operations.py index c0fcb9b0c..8129c31b7 100644 --- a/libs/labelbox/src/labelbox/schema/workflow/workflow_operations.py +++ b/libs/labelbox/src/labelbox/schema/workflow/workflow_operations.py @@ -223,28 +223,6 @@ def create_node_internal( return node # Overloaded add_node methods for type safety - @staticmethod - @overload - def add_node( - workflow: "ProjectWorkflow", - *, - type: Literal[NodeType.InitialLabeling], - instructions: Optional[str] = None, - max_contributions_per_user: Optional[int] = None, - **kwargs: Any, - ) -> InitialLabelingNode: ... - - @staticmethod - @overload - def add_node( - workflow: "ProjectWorkflow", - *, - type: Literal[NodeType.InitialRework], - instructions: Optional[str] = None, - individual_assignment: Optional[Union[str, List[str]]] = None, - max_contributions_per_user: Optional[int] = None, - **kwargs: Any, - ) -> InitialReworkNode: ... @staticmethod @overload @@ -628,6 +606,26 @@ def delete_nodes( workflow: "ProjectWorkflow", nodes: List[BaseWorkflowNode] ) -> "ProjectWorkflow": """Delete specified nodes from the workflow.""" + # Prevent deletion of initial nodes + initial_node_types = [ + WorkflowDefinitionId.InitialLabelingTask, + WorkflowDefinitionId.InitialReworkTask, + ] + + for node in nodes: + if node.definition_id in initial_node_types: + node_type_name = ( + "InitialLabeling" + if node.definition_id + == WorkflowDefinitionId.InitialLabelingTask + else "InitialRework" + ) + raise ValueError( + f"Cannot delete {node_type_name} node (ID: {node.id}). " + f"Initial nodes are required for workflow validity. " + f"Use workflow.reset_to_initial_nodes() to create a new workflow instead." + ) + # Get node IDs to remove node_ids = [node.id for node in nodes] @@ -648,11 +646,3 @@ def delete_nodes( workflow._edges_cache = None return workflow - - @staticmethod - def reset_config(workflow: "ProjectWorkflow") -> "ProjectWorkflow": - """Reset the workflow configuration to an empty workflow.""" - workflow.config = {"nodes": [], "edges": []} - workflow._nodes_cache = None - workflow._edges_cache = None - return workflow diff --git a/libs/labelbox/src/labelbox/schema/workflow/workflow_utils.py b/libs/labelbox/src/labelbox/schema/workflow/workflow_utils.py index a4f0fb6e2..bd2ca0ca0 100644 --- a/libs/labelbox/src/labelbox/schema/workflow/workflow_utils.py +++ b/libs/labelbox/src/labelbox/schema/workflow/workflow_utils.py @@ -48,7 +48,7 @@ def validate_initial_nodes( errors.append( { "node_type": "InitialLabelingNode", - "reason": "Workflow must have exactly one InitialLabelingNode, but found 0", + "reason": "Workflow must have exactly one InitialLabelingNode, but found 0. Use workflow.reset_to_initial_nodes() to create required nodes.", "node_id": "missing", } ) @@ -57,7 +57,7 @@ def validate_initial_nodes( errors.append( { "node_type": "InitialLabelingNode", - "reason": f"Workflow must have exactly one InitialLabelingNode, but found {len(initial_labeling_nodes)}", + "reason": f"Workflow must have exactly one InitialLabelingNode, but found {len(initial_labeling_nodes)}. Use workflow.reset_to_initial_nodes() to create a valid workflow.", "node_id": node.id, } ) @@ -67,7 +67,7 @@ def validate_initial_nodes( errors.append( { "node_type": "InitialReworkNode", - "reason": "Workflow must have exactly one InitialReworkNode, but found 0", + "reason": "Workflow must have exactly one InitialReworkNode, but found 0. Use workflow.reset_to_initial_nodes() to create required nodes.", "node_id": "missing", } ) @@ -76,7 +76,7 @@ def validate_initial_nodes( errors.append( { "node_type": "InitialReworkNode", - "reason": f"Workflow must have exactly one InitialReworkNode, but found {len(initial_rework_nodes)}", + "reason": f"Workflow must have exactly one InitialReworkNode, but found {len(initial_rework_nodes)}. Use workflow.reset_to_initial_nodes() to create a valid workflow.", "node_id": node.id, } ) @@ -164,9 +164,6 @@ def validate(cls, workflow: "ProjectWorkflow") -> "ProjectWorkflow": nodes = workflow.get_nodes() edges = workflow.get_edges() - if not nodes: - return workflow - # Build graph for validation graph = ProjectWorkflowGraph() for edge in edges: @@ -180,7 +177,7 @@ def validate(cls, workflow: "ProjectWorkflow") -> "ProjectWorkflow": errors.extend(connection_errors) # Store validation results - workflow._validation_errors = {"validation": errors} + workflow._validation_errors = {"errors": errors} return workflow @staticmethod diff --git a/libs/labelbox/tests/integration/test_workflow.py b/libs/labelbox/tests/integration/test_workflow.py index 16e50653a..b85494460 100644 --- a/libs/labelbox/tests/integration/test_workflow.py +++ b/libs/labelbox/tests/integration/test_workflow.py @@ -3,7 +3,7 @@ Tests the following workflow operations: - Creating workflows with different node types -- Updating workflows without reset_config() +- Updating workflows with configuration changes - Copying workflows between projects - LogicNode filter operations (add/remove/update) - Node removal operations with validation @@ -20,8 +20,8 @@ ProjectWorkflowFilter, WorkflowDefinitionId, FilterField, - # Import filter functions - created_by, + LabelingConfig, + labeled_by, dataset, natural_language, labeling_time, @@ -63,22 +63,18 @@ def test_workflow_creation(client, test_projects): source_project, _ = test_projects workflow = source_project.get_workflow() - workflow.reset_config() - # All valid workflows must have both InitialLabelingNode and InitialReworkNode - initial_labeling_node = workflow.add_node( - type=NodeType.InitialLabeling, instructions="Start labeling here" + # Create workflow with required initial nodes + initial_nodes = workflow.reset_to_initial_nodes( + labeling_config=LabelingConfig(instructions="Start labeling here") ) - initial_rework_node = workflow.add_node(type=NodeType.InitialRework) - review_node = workflow.add_node(type=NodeType.Review, name="Review Task") - done_node = workflow.add_node(type=NodeType.Done, name="Done") # Connect both initial nodes to review node - workflow.add_edge(initial_labeling_node, review_node) - workflow.add_edge(initial_rework_node, review_node) + workflow.add_edge(initial_nodes.labeling, review_node) + workflow.add_edge(initial_nodes.rework, review_node) workflow.add_edge(review_node, done_node, NodeOutput.Approved) workflow.update_config(reposition=False) @@ -111,17 +107,13 @@ def test_workflow_creation_simple(client): # Get or create workflow workflow = project.get_workflow() - # Clear config - workflow.reset_config() - - # Create workflow nodes - initial_labeling = workflow.add_node( - type=NodeType.InitialLabeling, - instructions="This is the entry point", + # Create workflow with required initial nodes + initial_nodes = workflow.reset_to_initial_nodes( + labeling_config=LabelingConfig( + instructions="This is the entry point" + ) ) - initial_rework = workflow.add_node(type=NodeType.InitialRework) - review = workflow.add_node( type=NodeType.Review, name="Test review task" ) @@ -133,8 +125,8 @@ def test_workflow_creation_simple(client): rework = workflow.add_node(type=NodeType.Rework) # Connect nodes using NodeOutput enum - workflow.add_edge(initial_labeling, review) - workflow.add_edge(initial_rework, review) + workflow.add_edge(initial_nodes.labeling, review) + workflow.add_edge(initial_nodes.rework, review) workflow.add_edge(review, rework, NodeOutput.Rejected) workflow.add_edge(review, done, NodeOutput.Approved) @@ -204,14 +196,11 @@ def test_node_types(client, test_projects): source_project, _ = test_projects workflow = source_project.get_workflow() - workflow.reset_config() - initial_labeling = workflow.add_node( - type=NodeType.InitialLabeling, instructions="Start labeling" + initial_nodes = workflow.reset_to_initial_nodes( + labeling_config=LabelingConfig(instructions="Start labeling") ) - initial_rework = workflow.add_node(type=NodeType.InitialRework) - review = workflow.add_node(type=NodeType.Review, name="Review Task") logic = workflow.add_node(type=NodeType.Logic, name="Logic Decision") @@ -227,8 +216,8 @@ def test_node_types(client, test_projects): done1 = workflow.add_node(type=NodeType.Done, name="Complete 1") done2 = workflow.add_node(type=NodeType.Done, name="Complete 2") - workflow.add_edge(initial_labeling, review) - workflow.add_edge(initial_rework, review) + workflow.add_edge(initial_nodes.labeling, review) + workflow.add_edge(initial_nodes.rework, review) workflow.add_edge(review, logic, NodeOutput.Approved) workflow.add_edge(logic, rework, NodeOutput.If) workflow.add_edge(logic, custom_rework, NodeOutput.Else) @@ -257,27 +246,25 @@ def test_node_types(client, test_projects): def test_workflow_update_without_reset(client, test_projects): - """Test updating an existing workflow without reset_config().""" + """Test updating an existing workflow by modifying node properties.""" source_project, _ = test_projects # Create initial workflow workflow = source_project.get_workflow() - workflow.reset_config() - initial_labeling = workflow.add_node( - type=NodeType.InitialLabeling, instructions="Original instructions" + initial_nodes = workflow.reset_to_initial_nodes( + labeling_config=LabelingConfig(instructions="Original instructions") ) - initial_rework = workflow.add_node(type=NodeType.InitialRework) review = workflow.add_node(type=NodeType.Review, name="Original Review") done = workflow.add_node(type=NodeType.Done, name="Original Done") - workflow.add_edge(initial_labeling, review) - workflow.add_edge(initial_rework, review) + workflow.add_edge(initial_nodes.labeling, review) + workflow.add_edge(initial_nodes.rework, review) workflow.add_edge(review, done, NodeOutput.Approved) workflow.update_config(reposition=False) - # Update workflow without reset_config() + # Update workflow by modifying existing nodes updated_workflow = source_project.get_workflow() nodes = updated_workflow.get_nodes() @@ -335,30 +322,91 @@ def test_workflow_update_without_reset(client, test_projects): assert review_nodes[0].name == "Updated Review" +def test_workflow_validation_in_update_config(client, test_projects): + """Test the mandatory validation behavior in update_config.""" + source_project, _ = test_projects + + # Create an invalid workflow (missing connections) + workflow = source_project.get_workflow() + + # Create workflow with required initial nodes but invalid connections + initial_nodes = workflow.reset_to_initial_nodes() + # Add a review node with no connections - this should cause validation errors + review = workflow.add_node(type=NodeType.Review, name="Unconnected Review") + + # Only connect the initial nodes together, leaving review disconnected + workflow.add_edge( + initial_nodes.labeling, initial_nodes.rework + ) # This is also invalid + + # Test 1: update_config should validate and fail with invalid workflow + with pytest.raises(ValueError) as exc_info: + workflow.update_config() + + assert "validation errors" in str(exc_info.value).lower() + assert "Cannot update workflow configuration" in str(exc_info.value) + + # Test 2: Multiple calls should consistently fail validation + with pytest.raises(ValueError) as exc_info: + workflow.update_config() + + assert "validation errors" in str(exc_info.value).lower() + + # Test 3: Validation errors should be consistently reported + with pytest.raises(ValueError) as exc_info: + workflow.update_config() + + # Verify the error message is clear and helpful + error_message = str(exc_info.value) + assert "validation errors" in error_message.lower() + assert "please fix these issues" in error_message.lower() + + # Test 4: Create a valid workflow and test successful update + initial_nodes = workflow.reset_to_initial_nodes() + review = workflow.add_node(type=NodeType.Review, name="Connected Review") + done = workflow.add_node(type=NodeType.Done, name="Final") + + # Create proper connections + workflow.add_edge(initial_nodes.labeling, review) + workflow.add_edge(initial_nodes.rework, review) + workflow.add_edge(review, done, NodeOutput.Approved) + + # This should work without errors + result = workflow.update_config() + assert result is not None + + # Test successful update - should not raise any exceptions + try: + workflow.update_config() + # If we get here, the update was successful + assert True + except ValueError: + # Should not happen with a valid workflow + assert False, "Valid workflow should not raise validation errors" + + def test_workflow_copy(client, test_projects): """Test copying a workflow between projects.""" source_project, target_project = test_projects # Create source workflow source_workflow = source_project.get_workflow() - source_workflow.reset_config() - initial_labeling = source_workflow.add_node( - type=NodeType.InitialLabeling, instructions="Source workflow" + initial_nodes = source_workflow.reset_to_initial_nodes( + labeling_config=LabelingConfig(instructions="Source workflow") ) - initial_rework = source_workflow.add_node(type=NodeType.InitialRework) review = source_workflow.add_node( type=NodeType.Review, name="Source Review" ) logic = source_workflow.add_node( type=NodeType.Logic, name="Source Logic", - filters=ProjectWorkflowFilter([created_by(["source-user"])]), + filters=ProjectWorkflowFilter([labeled_by.is_one_of(["source-user"])]), ) done = source_workflow.add_node(type=NodeType.Done, name="Source Done") - source_workflow.add_edge(initial_labeling, review) - source_workflow.add_edge(initial_rework, review) + source_workflow.add_edge(initial_nodes.labeling, review) + source_workflow.add_edge(initial_nodes.rework, review) source_workflow.add_edge(review, logic, NodeOutput.Approved) source_workflow.add_edge(logic, done, NodeOutput.If) @@ -386,23 +434,19 @@ def test_production_logic_node_with_comprehensive_filters( source_project, _ = test_projects workflow = source_project.get_workflow() - workflow.reset_config() # Create basic workflow structure - initial_labeling = workflow.add_node(type=NodeType.InitialLabeling) - initial_rework = workflow.add_node(type=NodeType.InitialRework) + initial_nodes = workflow.reset_to_initial_nodes() done = workflow.add_node(type=NodeType.Done) # Create production-like logic node with comprehensive filters - # Note: match_filters=MatchFilters.Any should set filter_logic="or" but - # the backend may not persist this correctly, causing it to default to "and" logic = workflow.add_node( type=NodeType.Logic, name="Production Logic", match_filters=MatchFilters.Any, filters=ProjectWorkflowFilter( [ - created_by( + labeled_by.is_one_of( ["cly7gzohg07zz07v5fqs63zmx", "cl7k7a9x1764808vk6bm1hf8e"] ), metadata([m_condition.contains("tag", ["test"])]), @@ -413,8 +457,8 @@ def test_production_logic_node_with_comprehensive_filters( ), labeling_time.greater_than(1000), review_time.less_than_or_equal(100), - dataset(["cm37vyets000z072314wxgt0l"]), - annotation(["cm37w0e0500lf0709ba7c42m9"]), + dataset.is_one_of(["cm37vyets000z072314wxgt0l"]), + annotation.is_one_of(["cm37w0e0500lf0709ba7c42m9"]), consensus_average(0.17, 0.61), model_prediction( [ @@ -429,8 +473,8 @@ def test_production_logic_node_with_comprehensive_filters( ), ) - workflow.add_edge(initial_labeling, logic) - workflow.add_edge(initial_rework, logic) + workflow.add_edge(initial_nodes.labeling, logic) + workflow.add_edge(initial_nodes.rework, logic) workflow.add_edge(logic, done, NodeOutput.If) workflow.update_config(reposition=False) @@ -450,15 +494,13 @@ def test_production_logic_node_with_comprehensive_filters( len(filters) >= 10 ), f"Should have at least 10 filters, got {len(filters)}" - # The filter_logic may default to "and" even when MatchFilters.Any is specified - # This is likely due to backend persistence behavior - the important thing is - # that the comprehensive filters are properly set and parsed + # Verify filter logic is properly set assert production_logic.filter_logic in [ "and", "or", ], "Should have valid filter logic" - # Verify key filter types are present - this is the main test objective + # Verify key filter types are present filter_fields = [f["field"] for f in filters] expected_fields = [ "CreatedBy", @@ -468,7 +510,7 @@ def test_production_logic_node_with_comprehensive_filters( "LabelingTime", "Dataset", "ModelPrediction", - "NlSearch", # From natural_language filter + "NlSearch", ] for field in expected_fields: assert field in filter_fields, f"Should have {field} filter" @@ -479,10 +521,8 @@ def test_filter_operations_with_persistence(client, test_projects): source_project, _ = test_projects workflow = source_project.get_workflow() - workflow.reset_config() - initial_labeling = workflow.add_node(type=NodeType.InitialLabeling) - initial_rework = workflow.add_node(type=NodeType.InitialRework) + initial_nodes = workflow.reset_to_initial_nodes() done = workflow.add_node(type=NodeType.Done) # Create logic node with initial filters @@ -491,15 +531,15 @@ def test_filter_operations_with_persistence(client, test_projects): name="Filter Test", filters=ProjectWorkflowFilter( [ - created_by(["user1", "user2"]), + labeled_by.is_one_of(["user1", "user2"]), sample(30), labeling_time.greater_than(500), ] ), ) - workflow.add_edge(initial_labeling, logic) - workflow.add_edge(initial_rework, logic) + workflow.add_edge(initial_nodes.labeling, logic) + workflow.add_edge(initial_nodes.rework, logic) workflow.add_edge(logic, done, NodeOutput.If) workflow.update_config(reposition=False) @@ -518,7 +558,7 @@ def test_filter_operations_with_persistence(client, test_projects): ), f"Should start with 3 filters, got {initial_count}" # Test removing filters with persistence - logic_node.remove_filter(FilterField.CreatedBy) + logic_node.remove_filter(FilterField.LabeledBy) logic_node.remove_filter(FilterField.Sample) updated_workflow.update_config(reposition=False) @@ -542,10 +582,10 @@ def test_filter_operations_with_persistence(client, test_projects): ), "LabelingTime filter should remain" assert ( "CreatedBy" not in remaining_fields - ), "CreatedBy filter should be removed" + ), "LabeledBy filter should be removed" # Test adding filters with persistence - logic_after_removal.add_filter(dataset(["new-dataset"])) + logic_after_removal.add_filter(dataset.is_one_of(["new-dataset"])) logic_after_removal.add_filter( metadata([m_condition.starts_with("priority", "high")]) ) @@ -571,11 +611,9 @@ def test_node_removal_with_validation(client, test_projects): source_project, _ = test_projects workflow = source_project.get_workflow() - workflow.reset_config() # Create workflow with removable nodes - initial_labeling = workflow.add_node(type=NodeType.InitialLabeling) - initial_rework = workflow.add_node(type=NodeType.InitialRework) + initial_nodes = workflow.reset_to_initial_nodes() review = workflow.add_node(type=NodeType.Review, name="Primary Review") logic = workflow.add_node( type=NodeType.Logic, @@ -592,8 +630,8 @@ def test_node_removal_with_validation(client, test_projects): done_final = workflow.add_node(type=NodeType.Done, name="Final") # Create connections - workflow.add_edge(initial_labeling, review) - workflow.add_edge(initial_rework, review) + workflow.add_edge(initial_nodes.labeling, review) + workflow.add_edge(initial_nodes.rework, review) workflow.add_edge(review, logic, NodeOutput.Approved) workflow.add_edge(logic, done_high, NodeOutput.If) workflow.add_edge(logic, secondary_review, NodeOutput.Else) @@ -661,9 +699,8 @@ def test_node_removal_with_validation(client, test_projects): ), "Secondary Rework node should exist" -# Remove redundant test - metadata conversion should be unit test def test_metadata_multiple_conditions(): - """Test metadata filter with multiple conditions - unit test for conversion logic.""" + """Test metadata filter with multiple conditions.""" multi_filter = { "metadata": [ {"key": "source", "operator": "ends_with", "value": "test1"}, @@ -684,10 +721,8 @@ def test_model_prediction_conditions(client, test_projects): source_project, _ = test_projects workflow = source_project.get_workflow() - workflow.reset_config() - initial_labeling = workflow.add_node(type=NodeType.InitialLabeling) - initial_rework = workflow.add_node(type=NodeType.InitialRework) + initial_nodes = workflow.reset_to_initial_nodes() done = workflow.add_node(type=NodeType.Done) # Test different model prediction conditions @@ -712,8 +747,8 @@ def test_model_prediction_conditions(client, test_projects): ) # Create connections - workflow.add_edge(initial_labeling, logic_none) - workflow.add_edge(initial_rework, logic_none) + workflow.add_edge(initial_nodes.labeling, logic_none) + workflow.add_edge(initial_nodes.rework, logic_none) workflow.add_edge(logic_none, logic_one_of, NodeOutput.If) workflow.add_edge(logic_one_of, done, NodeOutput.If) diff --git a/libs/labelbox/tests/integration/test_workflow_validation.py b/libs/labelbox/tests/integration/test_workflow_validation.py new file mode 100644 index 000000000..b216b6c6f --- /dev/null +++ b/libs/labelbox/tests/integration/test_workflow_validation.py @@ -0,0 +1,125 @@ +""" +Tests for workflow validation and error handling. + +Tests that the new validation prevents invalid workflow states and provides clear error messages. +""" + +import pytest +from labelbox.schema.workflow import NodeType, LabelingConfig, ReworkConfig +from labelbox.schema.media_type import MediaType + + +def test_cannot_create_initial_nodes_via_add_node(client): + """Test that creating initial nodes via add_node is blocked.""" + project = client.create_project( + name="Test Validation", media_type=MediaType.Image + ) + + try: + workflow = project.get_workflow() + initial_nodes = workflow.reset_to_initial_nodes() + + # Should fail when trying to create additional initial nodes + with pytest.raises( + ValueError, + match="Cannot create initial_labeling_task nodes via add_node", + ): + workflow.add_node(type=NodeType.InitialLabeling) + + with pytest.raises( + ValueError, + match="Cannot create initial_rework_task nodes via add_node", + ): + workflow.add_node(type=NodeType.InitialRework) + + finally: + project.delete() + + +def test_cannot_delete_initial_nodes(client): + """Test that deleting initial nodes is blocked.""" + project = client.create_project( + name="Test Validation", media_type=MediaType.Image + ) + + try: + workflow = project.get_workflow() + initial_nodes = workflow.reset_to_initial_nodes() + + # Should fail when trying to delete initial nodes + with pytest.raises( + ValueError, match="Cannot delete InitialLabeling node" + ): + workflow.delete_nodes([initial_nodes.labeling]) + + with pytest.raises( + ValueError, match="Cannot delete InitialRework node" + ): + workflow.delete_nodes([initial_nodes.rework]) + + finally: + project.delete() + + +def test_valid_workflow_with_configs(client): + """Test that workflows with proper configurations work correctly.""" + project = client.create_project( + name="Test Validation", media_type=MediaType.Image + ) + + try: + workflow = project.get_workflow() + + # Should work with configuration + initial_nodes = workflow.reset_to_initial_nodes( + labeling_config=LabelingConfig( + instructions="Label carefully", max_contributions_per_user=5 + ), + rework_config=ReworkConfig( + instructions="Fix the issues", + individual_assignment=["user-123"], + max_contributions_per_user=3, + ), + ) + + # Verify nodes were created with correct config + assert initial_nodes.labeling.instructions == "Label carefully" + assert initial_nodes.labeling.max_contributions_per_user == 5 + assert initial_nodes.rework.instructions == "Fix the issues" + assert initial_nodes.rework.individual_assignment == ["user-123"] + assert initial_nodes.rework.max_contributions_per_user == 3 + + # Should be able to create other node types + done = workflow.add_node(type=NodeType.Done) + workflow.add_edge(initial_nodes.labeling, done) + workflow.add_edge(initial_nodes.rework, done) + + # Should validate and update successfully + workflow.update_config() + + finally: + project.delete() + + +def test_empty_workflow_validation_fails(client): + """Test that workflows with missing initial nodes fail validation.""" + project = client.create_project( + name="Test Validation", media_type=MediaType.Image + ) + + try: + workflow = project.get_workflow() + + # Manually create invalid state for testing (bypassing public API) + workflow.config = {"nodes": [], "edges": []} + workflow._nodes_cache = None + workflow._edges_cache = None + + # Should fail validation + with pytest.raises( + ValueError, match="Use workflow.reset_to_initial_nodes" + ): + workflow.update_config() + + finally: + project.delete()