From b2c7aed1b84d2eb17dda0c2ab5f6684ecb75dde4 Mon Sep 17 00:00:00 2001 From: Ben Elam Date: Tue, 20 May 2025 10:53:05 -0700 Subject: [PATCH 01/12] Add utils/auth.py --- orchestrator/utils/auth.py | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 orchestrator/utils/auth.py diff --git a/orchestrator/utils/auth.py b/orchestrator/utils/auth.py new file mode 100644 index 000000000..7e7b16917 --- /dev/null +++ b/orchestrator/utils/auth.py @@ -0,0 +1,10 @@ + +from collections.abc import Callable +from typing import TypeAlias + +from oauth2_lib.fastapi import OIDCUserModel + +# This file is broken out separately to avoid circular imports. + +# Can instead use "type Authorizer = ..." in later Python versions. +Authorizer: TypeAlias = Callable[[OIDCUserModel | None], bool] From 5015f0001d22fe2f5c045934225029f1c63cb940 Mon Sep 17 00:00:00 2001 From: Ben Elam Date: Tue, 6 May 2025 16:07:10 -0700 Subject: [PATCH 02/12] Implement RBAC for resume and retry workflows Unlike new_process, this can be checked immediately in the request handler. Policy priorities specified via workflow and steps are resolved via get_auth_callbacks. --- .../api/api_v1/endpoints/processes.py | 57 ++++++++++++++++++- orchestrator/services/processes.py | 13 +---- orchestrator/workflow.py | 35 +++++++++--- orchestrator/workflows/utils.py | 13 ++++- test/unit_tests/api/test_processes.py | 51 ++++++++++++++++- 5 files changed, 144 insertions(+), 25 deletions(-) diff --git a/orchestrator/api/api_v1/endpoints/processes.py b/orchestrator/api/api_v1/endpoints/processes.py index 86b168dc3..f08d75375 100644 --- a/orchestrator/api/api_v1/endpoints/processes.py +++ b/orchestrator/api/api_v1/endpoints/processes.py @@ -56,6 +56,7 @@ ) from orchestrator.services.settings import get_engine_settings from orchestrator.settings import app_settings +from orchestrator.utils.auth import Authorizer from orchestrator.utils.enrich_process import enrich_process from orchestrator.websocket import ( WS_CHANNELS, @@ -63,7 +64,7 @@ broadcast_process_update_to_websocket, websocket_manager, ) -from orchestrator.workflow import ProcessStatus +from orchestrator.workflow import ProcessStat, ProcessStatus from pydantic_forms.types import JSON, State router = APIRouter() @@ -86,6 +87,44 @@ def check_global_lock() -> None: ) +def get_auth_callbacks(pstat: ProcessStat) -> tuple[Authorizer | None, Authorizer | None]: + """Iterate over workflow and prior steps to determine correct authorization callbacks for the current step. + + It's safest to always iterate through the steps. We could track these callbacks statefully + as we progress through the workflow, but if we fail a step and the system restarts, the previous + callbacks will be lost if they're only available in the process state. + + Priority: + - RESUME callback is explicit RESUME callback, else previous START/RESUME callback + - RETRY callback is explicit RETRY, else explicit RESUME, else previous RETRY + """ + # Default to workflow start callbacks + auth_resume = pstat.workflow.authorize_callback + # Retry defaults to the workflow start callback if not otherwise specified + auth_retry = pstat.workflow.retry_auth_callback if pstat.workflow.retry_auth_callback else auth_resume + # Iterate over previous steps to look for policy changes + remaining_steps = pstat.log + past_steps = pstat.workflow.steps[:-len(remaining_steps)] + # Review last steps and current step + for step in past_steps + [pstat.log[0]]: + if step.resume_auth_callback and step.retry_auth_callback is None: + # Set both to authorize_callback + auth_resume = step.resume_auth_callback + auth_retry = step.resume_auth_callback + continue + elif step.resume_auth_callback and step.retry_auth_callback: + auth_resume = step.resume_auth_callback + auth_retry = step.retry_auth_callback + continue + elif step.resume_auth_callback is None and step.retry_auth_callback: + # Only update retry + auth_retry = step.retry_auth_callback + continue + else: # Both None + continue + return auth_resume, auth_retry + + def resolve_user_name( *, reporter: Reporter | None, @@ -150,7 +189,11 @@ def new_process( dependencies=[Depends(check_global_lock, use_cache=False)], ) def resume_process_endpoint( - process_id: UUID, request: Request, json_data: JSON = Body(...), user: str = Depends(user_name) + process_id: UUID, + request: Request, + json_data: JSON = Body(...), + user: str = Depends(user_name), + user_model: OIDCUserModel | None = Depends(authenticate), ) -> None: process = _get_process(process_id) @@ -163,6 +206,16 @@ def resume_process_endpoint( if process.last_status == ProcessStatus.RESUMED: raise_status(HTTPStatus.CONFLICT, "Resuming a resumed workflow is not possible") + pstat = load_process(process) + auth_resume, auth_retry = get_auth_callbacks(pstat) + if process.last_status == ProcessStatus.SUSPENDED: + if not auth_resume(user_model): + raise_status(HTTPStatus.FORBIDDEN, "User is not authorized to resume step") + elif process.last_status == ProcessStatus.FAILED: + if not auth_retry(user_model): + raise_status(HTTPStatus.FORBIDDEN, "User is not authorized to retry step") + + broadcast_invalidate_status_counts() broadcast_func = api_broadcast_process_data(request) diff --git a/orchestrator/services/processes.py b/orchestrator/services/processes.py index b08c17228..1fdeafce8 100644 --- a/orchestrator/services/processes.py +++ b/orchestrator/services/processes.py @@ -41,6 +41,7 @@ from orchestrator.workflow import ( CALLBACK_TOKEN_KEY, DEFAULT_CALLBACK_PROGRESS_KEY, + Authorizer, Failed, ProcessStat, ProcessStatus, @@ -411,7 +412,6 @@ def run() -> WFProcess: def error_message_unauthorized(workflow_key: str) -> str: return f"User is not authorized to execute '{workflow_key}' workflow" - def create_process( workflow_key: str, user_inputs: list[State] | None = None, @@ -467,9 +467,7 @@ def thread_start_process( user_model: OIDCUserModel | None = None, broadcast_func: BroadcastFunc | None = None, ) -> UUID: - pstat = create_process(workflow_key, user_inputs=user_inputs, user=user) - if not pstat.workflow.authorize_callback(user_model): - raise_status(HTTPStatus.FORBIDDEN, error_message_unauthorized(workflow_key)) + pstat = create_process(workflow_key, user_inputs=user_inputs, user=user, user_model=user_model) _safe_logstep_with_func = partial(safe_logstep, broadcast_func=broadcast_func) return _run_process_async(pstat.process_id, lambda: runwf(pstat, _safe_logstep_with_func)) @@ -506,7 +504,6 @@ def thread_resume_process( *, user_inputs: list[State] | None = None, user: str | None = None, - user_model: OIDCUserModel | None = None, broadcast_func: BroadcastFunc | None = None, ) -> UUID: # ATTENTION!! When modifying this function make sure you make similar changes to `resume_workflow` in the test code @@ -515,8 +512,6 @@ def thread_resume_process( user_inputs = [{}] pstat = load_process(process) - if not pstat.workflow.authorize_callback(user_model): - raise_status(HTTPStatus.FORBIDDEN, error_message_unauthorized(str(process.workflow_name))) if pstat.workflow == removed_workflow: raise ValueError("This workflow cannot be resumed") @@ -556,7 +551,6 @@ def resume_process( *, user_inputs: list[State] | None = None, user: str | None = None, - user_model: OIDCUserModel | None = None, broadcast_func: BroadcastFunc | None = None, ) -> UUID: """Resume a failed or suspended process. @@ -565,7 +559,6 @@ def resume_process( process: Process from database user_inputs: Optional user input from forms user: user who resumed this process - user_model: OIDCUserModel of user who resumed this process broadcast_func: Optional function to broadcast process data Returns: @@ -573,8 +566,6 @@ def resume_process( """ pstat = load_process(process) - if not pstat.workflow.authorize_callback(user_model): - raise_status(HTTPStatus.FORBIDDEN, error_message_unauthorized(str(process.workflow_name))) try: post_form(pstat.log[0].form, pstat.state.unwrap(), user_inputs=user_inputs or []) diff --git a/orchestrator/workflow.py b/orchestrator/workflow.py index 2c9aa3b8f..52d9fe7e0 100644 --- a/orchestrator/workflow.py +++ b/orchestrator/workflow.py @@ -13,19 +13,21 @@ from __future__ import annotations - import contextvars import functools import inspect import secrets from collections.abc import Callable from dataclasses import asdict, dataclass +from functools import update_wrapper +from http import HTTPStatus from itertools import dropwhile from typing import ( Any, Generic, NoReturn, Protocol, + TypeAlias, TypeVar, cast, overload, @@ -40,11 +42,13 @@ from nwastdlib import const, identity from oauth2_lib.fastapi import OIDCUserModel +from orchestrator.api.error_handling import raise_status from orchestrator.config.assignee import Assignee from orchestrator.db import db, transactional from orchestrator.services.settings import get_engine_settings from orchestrator.targets import Target from orchestrator.types import ErrorDict, StepFunc +from orchestrator.utils.auth import Authorizer from orchestrator.utils.docs import make_workflow_doc from orchestrator.utils.errors import error_state_to_dict from orchestrator.utils.state import form_inject_args, inject_args @@ -80,6 +84,8 @@ class Step(Protocol): name: str form: InputFormGenerator | None assignee: Assignee | None + resume_auth_callback: Authorizer | None = None + retry_auth_callback: Authorizer | None = None def __call__(self, state: State) -> Process: ... @@ -90,7 +96,8 @@ class Workflow(Protocol): __qualname__: str name: str description: str - authorize_callback: Callable[[OIDCUserModel | None], bool] + authorize_callback: Authorizer + retry_auth_callback: Authorizer initial_input_form: InputFormGenerator | None = None target: Target steps: StepList @@ -99,13 +106,20 @@ def __call__(self) -> NoReturn: ... def make_step_function( - f: Callable, name: str, form: InputFormGenerator | None = None, assignee: Assignee | None = Assignee.SYSTEM + f: Callable, + name: str, + form: InputFormGenerator | None = None, + assignee: Assignee | None = Assignee.SYSTEM, + resume_auth_callback: Authorizer | None = None, + retry_auth_callback: Authorizer | None = None, ) -> Step: step_func = cast(Step, f) step_func.name = name step_func.form = form step_func.assignee = assignee + step_func.resume_auth_callback = resume_auth_callback + step_func.retry_auth_callback = retry_auth_callback return step_func @@ -167,6 +181,7 @@ def __repr__(self) -> str: def _handle_simple_input_form_generator(f: StateInputStepFunc) -> StateInputFormGenerator: + """Processes f into a form generator and injects a pre-hook for user authorization""" if inspect.isgeneratorfunction(f): return cast(StateInputFormGenerator, f) if inspect.isgenerator(f): @@ -191,7 +206,8 @@ def make_workflow( initial_input_form: InputStepFunc | None, target: Target, steps: StepList, - authorize_callback: Callable[[OIDCUserModel | None], bool] | None = None, + authorize_callback: Authorizer | None = None, + retry_auth_callback: Authorizer | None = None, ) -> Workflow: @functools.wraps(f) def wrapping_function() -> NoReturn: @@ -202,6 +218,8 @@ def wrapping_function() -> NoReturn: wrapping_function.name = f.__name__ # default, will be changed by LazyWorkflowInstance wrapping_function.description = description wrapping_function.authorize_callback = allow if authorize_callback is None else authorize_callback + # If no retry auth policy is given, defer to policy for process creation. + wrapping_function.retry_auth_callback = wrapping_function.authorize_callback if retry_auth_callback is None else retry_auth_callback if initial_input_form is None: # We always need a form to prevent starting a workflow when no input is needed. @@ -270,7 +288,7 @@ def wrapper(state: State) -> Process: return decorator -def inputstep(name: str, assignee: Assignee) -> Callable[[InputStepFunc], Step]: +def inputstep(name: str, assignee: Assignee, resume_auth_callback: Authorizer | None = None, retry_auth_callback: Authorizer | None = None) -> Callable[[InputStepFunc], Step]: """Add user input step to workflow. IMPORTANT: In contrast to other workflow steps, the `@inputstep` wrapped function will not run in the @@ -299,7 +317,7 @@ def wrapper(state: State) -> FormGenerator: def suspend(state: State) -> Process: return Suspend(state) - return make_step_function(suspend, name, wrapper, assignee) + return make_step_function(suspend, name, wrapper, assignee, resume_auth_callback=resume_auth_callback, retry_auth_callback=retry_auth_callback) return decorator @@ -479,7 +497,8 @@ def workflow( description: str, initial_input_form: InputStepFunc | None = None, target: Target = Target.SYSTEM, - authorize_callback: Callable[[OIDCUserModel | None], bool] | None = None, + authorize_callback: Authorizer| None = None, + retry_auth_callback: Authorizer| None = None, ) -> Callable[[Callable[[], StepList]], Workflow]: """Transform an initial_input_form and a step list into a workflow. @@ -500,7 +519,7 @@ def create_service_port(): def _workflow(f: Callable[[], StepList]) -> Workflow: return make_workflow( - f, description, initial_input_form_in_form_inject_args, target, f(), authorize_callback=authorize_callback + f, description, initial_input_form_in_form_inject_args, target, f(), authorize_callback=authorize_callback, retry_auth_callback=retry_auth_callback ) return _workflow diff --git a/orchestrator/workflows/utils.py b/orchestrator/workflows/utils.py index 9377b071c..2b9db31cd 100644 --- a/orchestrator/workflows/utils.py +++ b/orchestrator/workflows/utils.py @@ -26,6 +26,7 @@ from orchestrator.services import subscriptions from orchestrator.targets import Target from orchestrator.types import SubscriptionLifecycle +from orchestrator.utils.auth import Authorizer from orchestrator.utils.errors import StaleDataError from orchestrator.utils.state import form_inject_args from orchestrator.utils.validate_data_version import validate_data_version @@ -201,7 +202,8 @@ def create_workflow( initial_input_form: InputStepFunc | None = None, status: SubscriptionLifecycle = SubscriptionLifecycle.ACTIVE, additional_steps: StepList | None = None, - authorize_callback: Callable[[OIDCUserModel | None], bool] | None = None, + authorize_callback: Authorizer | None = None, + retry_auth_callback: Authorizer | None = None, ) -> Callable[[Callable[[], StepList]], Workflow]: """Transform an initial_input_form and a step list into a workflow with a target=Target.CREATE. @@ -234,6 +236,7 @@ def _create_workflow(f: Callable[[], StepList]) -> Workflow: Target.CREATE, steplist, authorize_callback=authorize_callback, + retry_auth_callback=retry_auth_callback ) return _create_workflow @@ -243,7 +246,8 @@ def modify_workflow( description: str, initial_input_form: InputStepFunc | None = None, additional_steps: StepList | None = None, - authorize_callback: Callable[[OIDCUserModel | None], bool] | None = None, + authorize_callback: Authorizer | None = None, + retry_auth_callback: Authorizer | None = None, ) -> Callable[[Callable[[], StepList]], Workflow]: """Transform an initial_input_form and a step list into a workflow. @@ -278,6 +282,7 @@ def _modify_workflow(f: Callable[[], StepList]) -> Workflow: Target.MODIFY, steplist, authorize_callback=authorize_callback, + retry_auth_callback=retry_auth_callback, ) return _modify_workflow @@ -287,7 +292,8 @@ def terminate_workflow( description: str, initial_input_form: InputStepFunc | None = None, additional_steps: StepList | None = None, - authorize_callback: Callable[[OIDCUserModel | None], bool] | None = None, + authorize_callback: Authorizer | None = None, + retry_auth_callback: Authorizer | None = None, ) -> Callable[[Callable[[], StepList]], Workflow]: """Transform an initial_input_form and a step list into a workflow. @@ -323,6 +329,7 @@ def _terminate_workflow(f: Callable[[], StepList]) -> Workflow: Target.TERMINATE, steplist, authorize_callback=authorize_callback, + retry_auth_callback=retry_auth_callback, ) return _terminate_workflow diff --git a/test/unit_tests/api/test_processes.py b/test/unit_tests/api/test_processes.py index 1795068d8..8e059b2c4 100644 --- a/test/unit_tests/api/test_processes.py +++ b/test/unit_tests/api/test_processes.py @@ -6,6 +6,8 @@ from uuid import uuid4 import pytest +from pydantic_forms.core import FormPage +from pydantic_forms.types import State from sqlalchemy import select from oauth2_lib.fastapi import OIDCUserModel @@ -22,7 +24,7 @@ from orchestrator.services.settings import get_engine_settings from orchestrator.settings import app_settings from orchestrator.targets import Target -from orchestrator.workflow import ProcessStatus, done, init, step, workflow +from orchestrator.workflow import ProcessStatus, done, init, inputstep, step, workflow from test.unit_tests.helpers import URL_STR_TYPE from test.unit_tests.workflows import WorkflowInstanceForTests @@ -593,3 +595,50 @@ def unauthorized_workflow(): with WorkflowInstanceForTests(unauthorized_workflow, "unauthorized_workflow"): response = test_client.post("/api/processes/unauthorized_workflow", json=[{}]) assert HTTPStatus.FORBIDDEN == response.status_code + + +def test_inputstep_authorization(test_client): + def disallow(_: OIDCUserModel | None = None) -> bool: + return False + + def allow(_: OIDCUserModel | None = None) -> bool: + return True + + class ConfirmForm(FormPage): + confirm: bool + + @inputstep("unauthorized_resume", assignee=Assignee.SYSTEM, resume_auth_callback=disallow) + def unauthorized_resume(state: State) -> State: + user_input = yield ConfirmForm + return user_input.model_dump() + + @inputstep("authorized_resume", assignee=Assignee.SYSTEM, resume_auth_callback=allow) + def authorized_resume(state: State) -> State: + user_input = yield ConfirmForm + return user_input.model_dump() + + @inputstep("noauth_resume", assignee=Assignee.SYSTEM) + def noauth_resume(state: State) -> State: + user_input = yield ConfirmForm + return user_input.model_dump() + + @workflow("test_auth_workflow", target=Target.CREATE) + def test_auth_workflow(): + return init >> noauth_resume >> authorized_resume >> unauthorized_resume >> done + + with WorkflowInstanceForTests(test_auth_workflow, "test_auth_workflow"): + response = test_client.post("/api/processes/test_auth_workflow", json=[{}]) + assert HTTPStatus.CREATED == response.status_code + process_id = response.json()["id"] + # No auth succeeds + response = test_client.put(f"/api/processes/{process_id}/resume", json=[{"confirm": True}]) + assert HTTPStatus.NO_CONTENT == response.status_code + # Authorized succeeds + response = test_client.put(f"/api/processes/{process_id}/resume", json=[{"confirm": True}]) + assert HTTPStatus.NO_CONTENT == response.status_code + # Unauthorized fails + response = test_client.put(f"/api/processes/{process_id}/resume", json=[{"confirm": True}]) + assert HTTPStatus.FORBIDDEN == response.status_code + + #TODO test how this interacts with passing a different callback to @authorize_workflow + # These should be as functionally independent as possible. \ No newline at end of file From 4008a41d2a6af0f9153ce0878a63fb997c463e11 Mon Sep 17 00:00:00 2001 From: Ben Elam Date: Tue, 20 May 2025 16:26:56 -0700 Subject: [PATCH 03/12] Implement RBAC for processes started via celery --- orchestrator/services/celery.py | 9 +++++++-- orchestrator/services/processes.py | 3 ++- orchestrator/utils/auth.py | 1 - 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/orchestrator/services/celery.py b/orchestrator/services/celery.py index 2aa9fa86c..037dd5c88 100644 --- a/orchestrator/services/celery.py +++ b/orchestrator/services/celery.py @@ -18,6 +18,7 @@ import structlog from celery.result import AsyncResult from kombu.exceptions import ConnectionError, OperationalError +from oauth2_lib.fastapi import OIDCUserModel from orchestrator import app_settings from orchestrator.api.error_handling import raise_status @@ -42,7 +43,11 @@ def _block_when_testing(task_result: AsyncResult) -> None: def _celery_start_process( - workflow_key: str, user_inputs: list[State] | None, user: str = SYSTEM_USER, **kwargs: Any + workflow_key: str, + user_inputs: list[State] | None, + user: str = SYSTEM_USER, + user_model: OIDCUserModel | None = None, + **kwargs: Any ) -> UUID: """Client side call of Celery.""" from orchestrator.services.tasks import NEW_TASK, NEW_WORKFLOW, get_celery_task @@ -57,7 +62,7 @@ def _celery_start_process( task_name = NEW_TASK if wf_table.is_task else NEW_WORKFLOW trigger_task = get_celery_task(task_name) - pstat = create_process(workflow_key, user_inputs, user) + pstat = create_process(workflow_key, user_inputs=user_inputs, user=user, user_model=user_model) try: result = trigger_task.delay(pstat.process_id, workflow_key, user) _block_when_testing(result) diff --git a/orchestrator/services/processes.py b/orchestrator/services/processes.py index 1fdeafce8..90a422e69 100644 --- a/orchestrator/services/processes.py +++ b/orchestrator/services/processes.py @@ -412,6 +412,7 @@ def run() -> WFProcess: def error_message_unauthorized(workflow_key: str) -> str: return f"User is not authorized to execute '{workflow_key}' workflow" + def create_process( workflow_key: str, user_inputs: list[State] | None = None, @@ -574,7 +575,7 @@ def resume_process( raise resume_func = get_execution_context()["resume"] - return resume_func(process, user_inputs=user_inputs, user=user, broadcast_func=broadcast_func) + return resume_func(process, user_inputs=user_inputs, user=user, user_model=user_model, broadcast_func=broadcast_func) def ensure_correct_callback_token(pstat: ProcessStat, *, token: str) -> None: diff --git a/orchestrator/utils/auth.py b/orchestrator/utils/auth.py index 7e7b16917..d86705e04 100644 --- a/orchestrator/utils/auth.py +++ b/orchestrator/utils/auth.py @@ -1,4 +1,3 @@ - from collections.abc import Callable from typing import TypeAlias From 8666e916d678d7f6dd8b320f238e6dc3b2f42049 Mon Sep 17 00:00:00 2001 From: Ben Elam Date: Tue, 20 May 2025 18:38:25 -0700 Subject: [PATCH 04/12] Fix bug and linting issues --- .../api/api_v1/endpoints/processes.py | 17 ++++---- orchestrator/services/celery.py | 4 +- orchestrator/services/processes.py | 3 +- orchestrator/workflow.py | 41 ++++++++++++++----- orchestrator/workflows/utils.py | 3 +- test/unit_tests/api/test_processes.py | 15 ++++--- 6 files changed, 49 insertions(+), 34 deletions(-) diff --git a/orchestrator/api/api_v1/endpoints/processes.py b/orchestrator/api/api_v1/endpoints/processes.py index f08d75375..b50c387e2 100644 --- a/orchestrator/api/api_v1/endpoints/processes.py +++ b/orchestrator/api/api_v1/endpoints/processes.py @@ -101,10 +101,10 @@ def get_auth_callbacks(pstat: ProcessStat) -> tuple[Authorizer | None, Authorize # Default to workflow start callbacks auth_resume = pstat.workflow.authorize_callback # Retry defaults to the workflow start callback if not otherwise specified - auth_retry = pstat.workflow.retry_auth_callback if pstat.workflow.retry_auth_callback else auth_resume + auth_retry = pstat.workflow.retry_auth_callback if pstat.workflow.retry_auth_callback is not None else auth_resume # Iterate over previous steps to look for policy changes remaining_steps = pstat.log - past_steps = pstat.workflow.steps[:-len(remaining_steps)] + past_steps = pstat.workflow.steps[: -len(remaining_steps)] # Review last steps and current step for step in past_steps + [pstat.log[0]]: if step.resume_auth_callback and step.retry_auth_callback is None: @@ -112,16 +112,16 @@ def get_auth_callbacks(pstat: ProcessStat) -> tuple[Authorizer | None, Authorize auth_resume = step.resume_auth_callback auth_retry = step.resume_auth_callback continue - elif step.resume_auth_callback and step.retry_auth_callback: + if step.resume_auth_callback and step.retry_auth_callback: auth_resume = step.resume_auth_callback auth_retry = step.retry_auth_callback continue - elif step.resume_auth_callback is None and step.retry_auth_callback: + if step.resume_auth_callback is None and step.retry_auth_callback: # Only update retry auth_retry = step.retry_auth_callback continue - else: # Both None - continue + # Both None + continue return auth_resume, auth_retry @@ -209,13 +209,12 @@ def resume_process_endpoint( pstat = load_process(process) auth_resume, auth_retry = get_auth_callbacks(pstat) if process.last_status == ProcessStatus.SUSPENDED: - if not auth_resume(user_model): + if auth_resume is not None and not auth_resume(user_model): raise_status(HTTPStatus.FORBIDDEN, "User is not authorized to resume step") elif process.last_status == ProcessStatus.FAILED: - if not auth_retry(user_model): + if auth_retry is not None and not auth_retry(user_model): raise_status(HTTPStatus.FORBIDDEN, "User is not authorized to retry step") - broadcast_invalidate_status_counts() broadcast_func = api_broadcast_process_data(request) diff --git a/orchestrator/services/celery.py b/orchestrator/services/celery.py index 037dd5c88..09a7ebcfc 100644 --- a/orchestrator/services/celery.py +++ b/orchestrator/services/celery.py @@ -18,8 +18,8 @@ import structlog from celery.result import AsyncResult from kombu.exceptions import ConnectionError, OperationalError -from oauth2_lib.fastapi import OIDCUserModel +from oauth2_lib.fastapi import OIDCUserModel from orchestrator import app_settings from orchestrator.api.error_handling import raise_status from orchestrator.db import ProcessTable, db @@ -47,7 +47,7 @@ def _celery_start_process( user_inputs: list[State] | None, user: str = SYSTEM_USER, user_model: OIDCUserModel | None = None, - **kwargs: Any + **kwargs: Any, ) -> UUID: """Client side call of Celery.""" from orchestrator.services.tasks import NEW_TASK, NEW_WORKFLOW, get_celery_task diff --git a/orchestrator/services/processes.py b/orchestrator/services/processes.py index 90a422e69..bb5e56b95 100644 --- a/orchestrator/services/processes.py +++ b/orchestrator/services/processes.py @@ -41,7 +41,6 @@ from orchestrator.workflow import ( CALLBACK_TOKEN_KEY, DEFAULT_CALLBACK_PROGRESS_KEY, - Authorizer, Failed, ProcessStat, ProcessStatus, @@ -575,7 +574,7 @@ def resume_process( raise resume_func = get_execution_context()["resume"] - return resume_func(process, user_inputs=user_inputs, user=user, user_model=user_model, broadcast_func=broadcast_func) + return resume_func(process, user_inputs=user_inputs, user=user, broadcast_func=broadcast_func) def ensure_correct_callback_token(pstat: ProcessStat, *, token: str) -> None: diff --git a/orchestrator/workflow.py b/orchestrator/workflow.py index 52d9fe7e0..b15d00196 100644 --- a/orchestrator/workflow.py +++ b/orchestrator/workflow.py @@ -13,21 +13,19 @@ from __future__ import annotations + import contextvars import functools import inspect import secrets from collections.abc import Callable from dataclasses import asdict, dataclass -from functools import update_wrapper -from http import HTTPStatus from itertools import dropwhile from typing import ( Any, Generic, NoReturn, Protocol, - TypeAlias, TypeVar, cast, overload, @@ -42,7 +40,6 @@ from nwastdlib import const, identity from oauth2_lib.fastapi import OIDCUserModel -from orchestrator.api.error_handling import raise_status from orchestrator.config.assignee import Assignee from orchestrator.db import db, transactional from orchestrator.services.settings import get_engine_settings @@ -181,7 +178,7 @@ def __repr__(self) -> str: def _handle_simple_input_form_generator(f: StateInputStepFunc) -> StateInputFormGenerator: - """Processes f into a form generator and injects a pre-hook for user authorization""" + """Processes f into a form generator and injects a pre-hook for user authorization.""" if inspect.isgeneratorfunction(f): return cast(StateInputFormGenerator, f) if inspect.isgenerator(f): @@ -219,7 +216,9 @@ def wrapping_function() -> NoReturn: wrapping_function.description = description wrapping_function.authorize_callback = allow if authorize_callback is None else authorize_callback # If no retry auth policy is given, defer to policy for process creation. - wrapping_function.retry_auth_callback = wrapping_function.authorize_callback if retry_auth_callback is None else retry_auth_callback + wrapping_function.retry_auth_callback = ( + wrapping_function.authorize_callback if retry_auth_callback is None else retry_auth_callback + ) if initial_input_form is None: # We always need a form to prevent starting a workflow when no input is needed. @@ -288,9 +287,16 @@ def wrapper(state: State) -> Process: return decorator -def inputstep(name: str, assignee: Assignee, resume_auth_callback: Authorizer | None = None, retry_auth_callback: Authorizer | None = None) -> Callable[[InputStepFunc], Step]: +def inputstep( + name: str, + assignee: Assignee, + resume_auth_callback: Authorizer | None = None, + retry_auth_callback: Authorizer | None = None, +) -> Callable[[InputStepFunc], Step]: """Add user input step to workflow. + Any authorization callbacks will be attached to the resulting Step. + IMPORTANT: In contrast to other workflow steps, the `@inputstep` wrapped function will not run in the workflow engine! This means that it must be free of side effects! @@ -317,7 +323,14 @@ def wrapper(state: State) -> FormGenerator: def suspend(state: State) -> Process: return Suspend(state) - return make_step_function(suspend, name, wrapper, assignee, resume_auth_callback=resume_auth_callback, retry_auth_callback=retry_auth_callback) + return make_step_function( + suspend, + name, + wrapper, + assignee, + resume_auth_callback=resume_auth_callback, + retry_auth_callback=retry_auth_callback, + ) return decorator @@ -497,8 +510,8 @@ def workflow( description: str, initial_input_form: InputStepFunc | None = None, target: Target = Target.SYSTEM, - authorize_callback: Authorizer| None = None, - retry_auth_callback: Authorizer| None = None, + authorize_callback: Authorizer | None = None, + retry_auth_callback: Authorizer | None = None, ) -> Callable[[Callable[[], StepList]], Workflow]: """Transform an initial_input_form and a step list into a workflow. @@ -519,7 +532,13 @@ def create_service_port(): def _workflow(f: Callable[[], StepList]) -> Workflow: return make_workflow( - f, description, initial_input_form_in_form_inject_args, target, f(), authorize_callback=authorize_callback, retry_auth_callback=retry_auth_callback + f, + description, + initial_input_form_in_form_inject_args, + target, + f(), + authorize_callback=authorize_callback, + retry_auth_callback=retry_auth_callback, ) return _workflow diff --git a/orchestrator/workflows/utils.py b/orchestrator/workflows/utils.py index 2b9db31cd..debca23f0 100644 --- a/orchestrator/workflows/utils.py +++ b/orchestrator/workflows/utils.py @@ -20,7 +20,6 @@ from pydantic import field_validator, model_validator from sqlalchemy import select -from oauth2_lib.fastapi import OIDCUserModel from orchestrator.db import ProductTable, SubscriptionTable, db from orchestrator.forms.validators import ProductId from orchestrator.services import subscriptions @@ -236,7 +235,7 @@ def _create_workflow(f: Callable[[], StepList]) -> Workflow: Target.CREATE, steplist, authorize_callback=authorize_callback, - retry_auth_callback=retry_auth_callback + retry_auth_callback=retry_auth_callback, ) return _create_workflow diff --git a/test/unit_tests/api/test_processes.py b/test/unit_tests/api/test_processes.py index 8e059b2c4..729ba58e8 100644 --- a/test/unit_tests/api/test_processes.py +++ b/test/unit_tests/api/test_processes.py @@ -6,8 +6,6 @@ from uuid import uuid4 import pytest -from pydantic_forms.core import FormPage -from pydantic_forms.types import State from sqlalchemy import select from oauth2_lib.fastapi import OIDCUserModel @@ -25,6 +23,7 @@ from orchestrator.settings import app_settings from orchestrator.targets import Target from orchestrator.workflow import ProcessStatus, done, init, inputstep, step, workflow +from pydantic_forms.core import FormPage from test.unit_tests.helpers import URL_STR_TYPE from test.unit_tests.workflows import WorkflowInstanceForTests @@ -600,7 +599,7 @@ def unauthorized_workflow(): def test_inputstep_authorization(test_client): def disallow(_: OIDCUserModel | None = None) -> bool: return False - + def allow(_: OIDCUserModel | None = None) -> bool: return True @@ -608,17 +607,17 @@ class ConfirmForm(FormPage): confirm: bool @inputstep("unauthorized_resume", assignee=Assignee.SYSTEM, resume_auth_callback=disallow) - def unauthorized_resume(state: State) -> State: + def unauthorized_resume(state): user_input = yield ConfirmForm return user_input.model_dump() @inputstep("authorized_resume", assignee=Assignee.SYSTEM, resume_auth_callback=allow) - def authorized_resume(state: State) -> State: + def authorized_resume(state): user_input = yield ConfirmForm return user_input.model_dump() @inputstep("noauth_resume", assignee=Assignee.SYSTEM) - def noauth_resume(state: State) -> State: + def noauth_resume(state): user_input = yield ConfirmForm return user_input.model_dump() @@ -640,5 +639,5 @@ def test_auth_workflow(): response = test_client.put(f"/api/processes/{process_id}/resume", json=[{"confirm": True}]) assert HTTPStatus.FORBIDDEN == response.status_code - #TODO test how this interacts with passing a different callback to @authorize_workflow - # These should be as functionally independent as possible. \ No newline at end of file + # TODO test how this interacts with passing a different callback to @authorize_workflow + # These should be as functionally independent as possible. From cb85551ae8b3f30f8751312f81498b8360e2b775 Mon Sep 17 00:00:00 2001 From: Ben Elam Date: Wed, 4 Jun 2025 12:45:56 -0700 Subject: [PATCH 05/12] Refactor get_auth_callbacks; add tests --- .../api/api_v1/endpoints/processes.py | 25 ++++--- test/unit_tests/api/test_processes.py | 69 ++++++++++++++++++- 2 files changed, 83 insertions(+), 11 deletions(-) diff --git a/orchestrator/api/api_v1/endpoints/processes.py b/orchestrator/api/api_v1/endpoints/processes.py index b50c387e2..ee701bb7a 100644 --- a/orchestrator/api/api_v1/endpoints/processes.py +++ b/orchestrator/api/api_v1/endpoints/processes.py @@ -64,7 +64,7 @@ broadcast_process_update_to_websocket, websocket_manager, ) -from orchestrator.workflow import ProcessStat, ProcessStatus +from orchestrator.workflow import ProcessStat, ProcessStatus, StepList, Workflow from pydantic_forms.types import JSON, State router = APIRouter() @@ -87,7 +87,14 @@ def check_global_lock() -> None: ) -def get_auth_callbacks(pstat: ProcessStat) -> tuple[Authorizer | None, Authorizer | None]: +def get_current_steps(pstat: ProcessStat) -> StepList: + """Extract past and current steps from the ProcessStat.""" + remaining_steps = pstat.log + past_steps = pstat.workflow.steps[: -len(remaining_steps)] + return StepList(past_steps + [pstat.log[0]]) + + +def get_auth_callbacks(steps: StepList, workflow: Workflow) -> tuple[Authorizer | None, Authorizer | None]: """Iterate over workflow and prior steps to determine correct authorization callbacks for the current step. It's safest to always iterate through the steps. We could track these callbacks statefully @@ -99,14 +106,12 @@ def get_auth_callbacks(pstat: ProcessStat) -> tuple[Authorizer | None, Authorize - RETRY callback is explicit RETRY, else explicit RESUME, else previous RETRY """ # Default to workflow start callbacks - auth_resume = pstat.workflow.authorize_callback - # Retry defaults to the workflow start callback if not otherwise specified - auth_retry = pstat.workflow.retry_auth_callback if pstat.workflow.retry_auth_callback is not None else auth_resume + auth_resume = workflow.authorize_callback + # auth_retry defaults to the workflow start callback if not otherwise specified. + # A workflow SHOULD have both callbacks set to not-None. This enforces the correct default regardless. + auth_retry = workflow.retry_auth_callback or auth_resume # type: ignore[unreachable, truthy-function] # Iterate over previous steps to look for policy changes - remaining_steps = pstat.log - past_steps = pstat.workflow.steps[: -len(remaining_steps)] - # Review last steps and current step - for step in past_steps + [pstat.log[0]]: + for step in steps: if step.resume_auth_callback and step.retry_auth_callback is None: # Set both to authorize_callback auth_resume = step.resume_auth_callback @@ -207,7 +212,7 @@ def resume_process_endpoint( raise_status(HTTPStatus.CONFLICT, "Resuming a resumed workflow is not possible") pstat = load_process(process) - auth_resume, auth_retry = get_auth_callbacks(pstat) + auth_resume, auth_retry = get_auth_callbacks(get_current_steps(pstat), pstat.workflow) if process.last_status == ProcessStatus.SUSPENDED: if auth_resume is not None and not auth_resume(user_model): raise_status(HTTPStatus.FORBIDDEN, "User is not authorized to resume step") diff --git a/test/unit_tests/api/test_processes.py b/test/unit_tests/api/test_processes.py index 729ba58e8..985cbb7af 100644 --- a/test/unit_tests/api/test_processes.py +++ b/test/unit_tests/api/test_processes.py @@ -9,6 +9,7 @@ from sqlalchemy import select from oauth2_lib.fastapi import OIDCUserModel +from orchestrator.api.api_v1.endpoints.processes import get_auth_callbacks from orchestrator.config.assignee import Assignee from orchestrator.db import ( ProcessStepTable, @@ -22,7 +23,7 @@ from orchestrator.services.settings import get_engine_settings from orchestrator.settings import app_settings from orchestrator.targets import Target -from orchestrator.workflow import ProcessStatus, done, init, inputstep, step, workflow +from orchestrator.workflow import ProcessStatus, StepList, done, init, inputstep, make_workflow, step, workflow from pydantic_forms.core import FormPage from test.unit_tests.helpers import URL_STR_TYPE from test.unit_tests.workflows import WorkflowInstanceForTests @@ -641,3 +642,69 @@ def test_auth_workflow(): # TODO test how this interacts with passing a different callback to @authorize_workflow # These should be as functionally independent as possible. + + +def test_get_auth_callbacks(): + def A(_: OIDCUserModel) -> bool: + return True + + def B(_: OIDCUserModel) -> bool: + return True + + def C(_: OIDCUserModel) -> bool: + return True + + def D(_: OIDCUserModel) -> bool: + return True + + @step("bar") + def bar(): + return {} + + @step("baz") + def baz(): + return {} + + workflow = make_workflow( + f=lambda: {}, + description="description", + initial_input_form=None, + target=Target.SYSTEM, + steps=StepList([]), + authorize_callback=None, + retry_auth_callback=None, + ) + + cases = [ + ((None, None, None, None), (None, None)), + ((A, None, None, None), (A, A)), + ((None, B, None, None), (None, B)), + ((A, B, None, None), (A, B)), + ((None, None, C, None), (C, C)), + ((A, None, C, None), (C, C)), + ((None, B, C, None), (C, C)), + ((A, B, C, None), (C, C)), + ((None, None, None, D), (None, D)), + ((A, None, None, D), (A, D)), + ((None, B, None, D), (None, D)), + ((A, B, None, D), (A, D)), + ((None, None, C, D), (C, D)), # 4 + ((A, None, C, D), (C, D)), + ((None, B, C, D), (C, D)), + ((A, B, C, D), (C, D)), + ] + for case in cases: + auth, retry, step_resume_auth, step_retry_auth = case[0] + want_auth, want_retry = case[1] + workflow.authorize_callback = auth + workflow.retry_auth_callback = retry + + @inputstep("foo", Target.SYSTEM, step_resume_auth, step_retry_auth) + def foo(): + return {} + + steps = StepList([bar, foo, baz]) + + got_auth, got_retry = get_auth_callbacks(steps, workflow) + assert got_auth == want_auth + assert got_retry == want_retry From e2df4801c3aef8758d33fab75f868c231758db5d Mon Sep 17 00:00:00 2001 From: Ben Elam Date: Wed, 4 Jun 2025 13:30:09 -0700 Subject: [PATCH 06/12] Use functional approach for get_auth_callbacks --- .../api/api_v1/endpoints/processes.py | 26 ++++++------------- 1 file changed, 8 insertions(+), 18 deletions(-) diff --git a/orchestrator/api/api_v1/endpoints/processes.py b/orchestrator/api/api_v1/endpoints/processes.py index ee701bb7a..4c7b9b6bd 100644 --- a/orchestrator/api/api_v1/endpoints/processes.py +++ b/orchestrator/api/api_v1/endpoints/processes.py @@ -25,7 +25,7 @@ from fastapi.routing import APIRouter from fastapi.websockets import WebSocket from fastapi_etag.dependency import CacheHit -from more_itertools import chunked +from more_itertools import chunked, last from sentry_sdk.tracing import trace from sqlalchemy import CompoundSelect, Select, select from sqlalchemy.orm import defer, joinedload @@ -110,23 +110,13 @@ def get_auth_callbacks(steps: StepList, workflow: Workflow) -> tuple[Authorizer # auth_retry defaults to the workflow start callback if not otherwise specified. # A workflow SHOULD have both callbacks set to not-None. This enforces the correct default regardless. auth_retry = workflow.retry_auth_callback or auth_resume # type: ignore[unreachable, truthy-function] - # Iterate over previous steps to look for policy changes - for step in steps: - if step.resume_auth_callback and step.retry_auth_callback is None: - # Set both to authorize_callback - auth_resume = step.resume_auth_callback - auth_retry = step.resume_auth_callback - continue - if step.resume_auth_callback and step.retry_auth_callback: - auth_resume = step.resume_auth_callback - auth_retry = step.retry_auth_callback - continue - if step.resume_auth_callback is None and step.retry_auth_callback: - # Only update retry - auth_retry = step.retry_auth_callback - continue - # Both None - continue + + # Choose the most recently established value for resume. + auth_resume = last(filter(None, (step.resume_auth_callback for step in steps)), auth_resume) + # Choose the most recently established value for retry, unless there is a more recent value for resume. + auth_retry = last( + filter(None, (step.retry_auth_callback or step.resume_auth_callback for step in steps)), auth_retry + ) return auth_resume, auth_retry From 47ddd2f411a190ee71a7c28936efeafa3c474cd7 Mon Sep 17 00:00:00 2001 From: Ben Elam Date: Tue, 10 Jun 2025 14:49:58 -0700 Subject: [PATCH 07/12] Parametrize new tests --- test/unit_tests/api/test_processes.py | 85 ++++++++++++++------------- 1 file changed, 44 insertions(+), 41 deletions(-) diff --git a/test/unit_tests/api/test_processes.py b/test/unit_tests/api/test_processes.py index 985cbb7af..6188a831f 100644 --- a/test/unit_tests/api/test_processes.py +++ b/test/unit_tests/api/test_processes.py @@ -640,23 +640,45 @@ def test_auth_workflow(): response = test_client.put(f"/api/processes/{process_id}/resume", json=[{"confirm": True}]) assert HTTPStatus.FORBIDDEN == response.status_code - # TODO test how this interacts with passing a different callback to @authorize_workflow - # These should be as functionally independent as possible. +def _A(_: OIDCUserModel) -> bool: + return True -def test_get_auth_callbacks(): - def A(_: OIDCUserModel) -> bool: - return True - def B(_: OIDCUserModel) -> bool: - return True +def _B(_: OIDCUserModel) -> bool: + return True - def C(_: OIDCUserModel) -> bool: - return True - def D(_: OIDCUserModel) -> bool: - return True +def _C(_: OIDCUserModel) -> bool: + return True + +def _D(_: OIDCUserModel) -> bool: + return True + + +@pytest.mark.parametrize( + "policies, decisions", + [ + ((None, None, None, None), (None, None)), + ((_A, None, None, None), (_A, _A)), + ((None, _B, None, None), (None, _B)), + ((_A, _B, None, None), (_A, _B)), + ((None, None, _C, None), (_C, _C)), + ((_A, None, _C, None), (_C, _C)), + ((None, _B, _C, None), (_C, _C)), + ((_A, _B, _C, None), (_C, _C)), + ((None, None, None, _D), (None, _D)), + ((_A, None, None, _D), (_A, _D)), + ((None, _B, None, _D), (None, _D)), + ((_A, _B, None, _D), (_A, _D)), + ((None, None, _C, _D), (_C, _D)), # 4 + ((_A, None, _C, _D), (_C, _D)), + ((None, _B, _C, _D), (_C, _D)), + ((_A, _B, _C, _D), (_C, _D)), + ], +) +def test_get_auth_callbacks(policies, decisions): @step("bar") def bar(): return {} @@ -675,36 +697,17 @@ def baz(): retry_auth_callback=None, ) - cases = [ - ((None, None, None, None), (None, None)), - ((A, None, None, None), (A, A)), - ((None, B, None, None), (None, B)), - ((A, B, None, None), (A, B)), - ((None, None, C, None), (C, C)), - ((A, None, C, None), (C, C)), - ((None, B, C, None), (C, C)), - ((A, B, C, None), (C, C)), - ((None, None, None, D), (None, D)), - ((A, None, None, D), (A, D)), - ((None, B, None, D), (None, D)), - ((A, B, None, D), (A, D)), - ((None, None, C, D), (C, D)), # 4 - ((A, None, C, D), (C, D)), - ((None, B, C, D), (C, D)), - ((A, B, C, D), (C, D)), - ] - for case in cases: - auth, retry, step_resume_auth, step_retry_auth = case[0] - want_auth, want_retry = case[1] - workflow.authorize_callback = auth - workflow.retry_auth_callback = retry + auth, retry, step_resume_auth, step_retry_auth = policies + want_auth, want_retry = decisions + workflow.authorize_callback = auth + workflow.retry_auth_callback = retry - @inputstep("foo", Target.SYSTEM, step_resume_auth, step_retry_auth) - def foo(): - return {} + @inputstep("foo", Target.SYSTEM, step_resume_auth, step_retry_auth) + def foo(): + return {} - steps = StepList([bar, foo, baz]) + steps = StepList([bar, foo, baz]) - got_auth, got_retry = get_auth_callbacks(steps, workflow) - assert got_auth == want_auth - assert got_retry == want_retry + got_auth, got_retry = get_auth_callbacks(steps, workflow) + assert got_auth == want_auth + assert got_retry == want_retry From 13436e9e13a91b76368ab14594e21100f9bad94e Mon Sep 17 00:00:00 2001 From: Ben Elam Date: Wed, 11 Jun 2025 16:20:12 -0700 Subject: [PATCH 08/12] Add docs for workflow authorization --- .../auth-backend-and-frontend.md | 314 ++++++++++++++++++ 1 file changed, 314 insertions(+) diff --git a/docs/reference-docs/auth-backend-and-frontend.md b/docs/reference-docs/auth-backend-and-frontend.md index ddeb7548e..3f7617d7c 100644 --- a/docs/reference-docs/auth-backend-and-frontend.md +++ b/docs/reference-docs/auth-backend-and-frontend.md @@ -269,6 +269,320 @@ app.register_authorization(authorization_instance) app.register_graphql_authorization(graphql_authorization_instance) ``` +## Authorization and Workflows +**Role-based access control for workflows is currently in beta. Initial support has been added to the backend, but the feature is not fully communicated through the UI yet.** + +Certain `orchestrator-core` decorators accept authorization callbacks of type `type Authorizer = Callable[OIDCUserModel, bool]`, which return True when the input user is authorized, otherwise False. + +A table (below) is available for comparing possible configuration states with the policy that will be enforced. + +### `@workflow` +The `@workflow` decorator accepts the optional parameters `auth: Authorizer` and `retry_auth: Authorizer`. + +`auth` will be used to determine the authorization of a user to start the workflow. +If `auth` is omitted, the workflow is authorized for any logged in user. + +`retry_auth` will be used to determine the authorization of a user to start, resume, or retry the workflow from a failed step. +If `retry_auth` is omitted, then `auth` is used to authorize. + +(This does not percolate past an `@inputstep` that specifies `resume_auth` or `retry_auth`.) + +Examples: + +* `auth=None, retry_auth=None`: any user may run the workflow. +* `auth=A, retry_auth=B`: users authorized by A may start the workflow. Users authorized by B may retry on failure. + * Example: starting the workflow is a decision that must be made by a product owner. Retrying can be made by an on-call member of the operations team. +* `auth=None, retry_auth=B`: any user can start the workflow, but only users authorized by B may retry on failure. + +### `@inputstep` +The `@inputstep` decorator accepts the optional parameters `resume_auth: Authorizer` and `retry_auth: Authorizer`. + +`resume_auth` will be used to determine the authorization of a user to resume the workflow when suspended at this inputstep. +If `resume_auth` is omitted, then the workflow's `auth` will be used. + +`retry_auth` will be used to determine the authorization of a user to retry the workflow from a failed step following the inputstep. +If `retry_auth` is omitted, then `resume_auth` is used to authorize retries. +If `resume_auth` is also omitted, then the workflow’s `retry_auth` is checked, and then the workflow’s `auth`. + +In summary: + +* A workflow establishes `auth` for starting, resuming, or retrying. +* The workflow can also establish `retry_auth`, which will override `auth` for retries. + * An inputstep can override the existing `auth` with `resume_auth` and the existing `retry_auth` with its own `retry_auth`. +* Subsequent inputsteps can do the same, but any None will not overwrite a previous not-None. + +### Policy resolutions +Below is an exhaustive table of how policies (implemented as callbacks `A`, `B`, `C`, and `D`) +are prioritized in different workflow and inputstep configurations. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
ConfigurationEnforcementNotes
@workflow@inputstepbefore @inputstep@inputstep and after
authretry_authresume_authretry_authstartretryresumeretry
NoneNoneNoneNoneAnyoneAnyoneAnyoneAnyoneDefault
ANoneNoneNoneAAAABroadly restrict the workflow to a specific authorizer.
NoneBNoneNoneAnyoneBAnyoneBoriginal retry_auth is maintained if nothing supercedes it. Weird choice, but this provides a "we specifically want to limit retries" route.
ABNoneNoneABABWorkflow-level auth and retry. Allows A or B to be tighter or distinct, as needed.
NoneNoneCNoneAnyoneAnyoneCCAnyone can start this workflow, but only C can continue it.
ANoneCNoneAACCSubsequent retries use C, not A! Override with retry_auth=A if desired.
NoneBCNoneAnyoneBCCSubsequent retries use C, not B! Override with retry_auth=B if desired.
ABCNoneABCCSimple override initial settings with inputstep resume_auth.
NoneNoneNoneDAnyoneAnyoneAnyoneDAnyone can start or retry or resume, but limit retries to D once inputstep is reached.
ANoneNoneDAAADA can start or retry or resume, but limit retries to D once inputstep is reached.
NoneBNoneDAnyoneBAnyoneDAnyone can start or resume, but only B can retry. After inputstep, only D can retry.
ABNoneDABADA can start or resume, but only B can retry. After inputstep, only D can retry.
NoneNoneCDAnyoneAnyoneCDAnyone can start, but only C can resume and only D can retry after the resume.
ANoneCDAACD
NoneBCDAnyoneBCD
ABCDABCD
+ +### Examples +Assume we have the following function that can be used to create callbacks: + +```python +def allow_roles(*roles) -> Callable[OIDCUserModel|None, bool]: + def f(user: OIDCUserModel) -> bool: + if is_admin(user): # Relative to your authorization provider + return True + for role in roles: + if has_role(user, role): # Relative to your authorization provider + return True + return False + + return f +``` + +We can now construct a variety of authorization policies. + +#### Rubber Stamp Model +!!!example + Suppose we have a workflow W that needs to pause on inputstep `approval` for approval from finance. Ops (and only ops) should be able to start the workflow and retry any failed steps. Finance (and only finance) should be able to resume at the input step. + + ```python + @workflow("An expensive workflow", auth=allow_roles("ops")) + def W(...): + return begin >> A >> ... >> notify_finance >> approval >> ... >> Z + + @inputstep("Approval", resume_auth=allow_roles("finance"), retry_auth=allow_roles("ops")) + def approval(...): + ... + ``` + + +#### Hand-off Model +!!!example + Suppose we have two teams, Dev and Platform, and a long workflow W that should be handed off to Platform at step `approval`. + + Dev can start the workflow and retry steps prior to S. Once step S is reached, Platform (and only Platform) can resume the workflow and retry later failed steps. + + ```python + @workflow("An expensive workflow", auth=allow_roles("dev")) + def W(...): + return begin >> A >> ... >> notify_platform >> handoff >> ... >> Z + + @inputstep("Hand-off", resume_auth=allow_roles("platform")) + def handoff(...): + ... + ``` + Notice that default behaviors let us ignore `retry_auth` arguments in both decorators. + +#### Restricted Retries Model +!!!example + Suppose we have a workflow that anyone can run, but with steps that should only be retried by users with certain backend access. + + ```python + @workflow("A workflow for any user", retry_auth=allow_roles("admin")) + def W(...): + return begin >> A >> ... >> S >> ... >> Z + ``` + + Note that we could specify `auth=allow_roles("user")` if helpful, or we can omit `auth` to fail open to any logged in user. + [1]: https://github.com/workfloworchestrator/example-orchestrator-ui [2]: https://github.com/workfloworchestrator/example-orchestrator [3]: https://next-auth.js.org/ From c6ae7d65c6d224c5c1fcb22e35d555606523ecf9 Mon Sep 17 00:00:00 2001 From: Ben Elam Date: Wed, 11 Jun 2025 16:28:16 -0700 Subject: [PATCH 09/12] Improve process filtering on resume endpoint --- .../api/api_v1/endpoints/processes.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/orchestrator/api/api_v1/endpoints/processes.py b/orchestrator/api/api_v1/endpoints/processes.py index 4c7b9b6bd..5fb182e9f 100644 --- a/orchestrator/api/api_v1/endpoints/processes.py +++ b/orchestrator/api/api_v1/endpoints/processes.py @@ -120,6 +120,15 @@ def get_auth_callbacks(steps: StepList, workflow: Workflow) -> tuple[Authorizer return auth_resume, auth_retry +def can_be_resumed(status: ProcessStatus) -> bool: + return status in ( + ProcessStatus.SUSPENDED, # Can be resumed + ProcessStatus.FAILED, # Can be retried + ProcessStatus.API_UNAVAILABLE, # subtype of FAILED + ProcessStatus.INCONSISTENT_DATA, # subtype of FAILED + ) + + def resolve_user_name( *, reporter: Reporter | None, @@ -192,14 +201,8 @@ def resume_process_endpoint( ) -> None: process = _get_process(process_id) - if process.last_status == ProcessStatus.COMPLETED: - raise_status(HTTPStatus.CONFLICT, "Resuming a completed workflow is not possible") - - if process.last_status == ProcessStatus.RUNNING: - raise_status(HTTPStatus.CONFLICT, "Resuming a running workflow is not possible") - - if process.last_status == ProcessStatus.RESUMED: - raise_status(HTTPStatus.CONFLICT, "Resuming a resumed workflow is not possible") + if not can_be_resumed(process.last_status): + raise_status(HTTPStatus.CONFLICT, f"Resuming a {process.last_status.lower()} workflow is not possible") pstat = load_process(process) auth_resume, auth_retry = get_auth_callbacks(get_current_steps(pstat), pstat.workflow) From 80d13c2a8d06472ff166374595e780d106a90eab Mon Sep 17 00:00:00 2001 From: Ben Elam Date: Wed, 11 Jun 2025 16:39:41 -0700 Subject: [PATCH 10/12] Add xfailed test for resume_process_endpoint --- test/unit_tests/api/test_processes.py | 43 +++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/test/unit_tests/api/test_processes.py b/test/unit_tests/api/test_processes.py index 6188a831f..343da6206 100644 --- a/test/unit_tests/api/test_processes.py +++ b/test/unit_tests/api/test_processes.py @@ -641,6 +641,49 @@ def test_auth_workflow(): assert HTTPStatus.FORBIDDEN == response.status_code +@pytest.mark.xfail(reason="core currently lacks support for tests involving a failed step") +def test_retry_authorization(test_client): + def disallow(_: OIDCUserModel | None = None) -> bool: + return False + + def allow(_: OIDCUserModel | None = None) -> bool: + return True + + class ConfirmForm(FormPage): + confirm: bool + + @inputstep("authorized_resume", assignee=Assignee.SYSTEM, resume_auth_callback=allow, retry_auth_callback=disallow) + def authorized_resume(state): + user_input = yield ConfirmForm + return user_input.model_dump() + + @step("fails once") + def fails_once(state): + if not hasattr(fails_once, "called"): + fails_once.called = False + + if not fails_once.called: + fails_once.called = True + raise RuntimeError("Failing intentionally, ignore") + return {} + + @workflow("test_auth_workflow", target=Target.CREATE, authorize_callback=allow, retry_auth_callback=disallow) + def test_auth_workflow(): + return init >> authorized_resume >> fails_once >> done + + with WorkflowInstanceForTests(test_auth_workflow, "test_auth_workflow"): + # Creating workflow succeeds + response = test_client.post("/api/processes/test_auth_workflow", json=[{}]) + assert HTTPStatus.CREATED == response.status_code + process_id = response.json()["id"] + # We're authorized to resume, but this will error, so we can retry + response = test_client.put(f"/api/processes/{process_id}/resume", json=[{"confirm": True}]) + assert HTTPStatus.NO_CONTENT == response.status_code + # We're authorized to retry, in spite of workflow's retry_auth_callback=disallow + response = test_client.put(f"/api/processes/{process_id}/resume", json=[{}]) + assert HTTPStatus.NO_CONTENT == response.status_code + + def _A(_: OIDCUserModel) -> bool: return True From 28761f6a78f7f00a2b456c268cac2f45ee038245 Mon Sep 17 00:00:00 2001 From: Mark90 Date: Thu, 12 Jun 2025 13:54:06 +0200 Subject: [PATCH 11/12] Change RBAC bold text to a warning banner --- docs/reference-docs/auth-backend-and-frontend.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/docs/reference-docs/auth-backend-and-frontend.md b/docs/reference-docs/auth-backend-and-frontend.md index 3f7617d7c..e230311c1 100644 --- a/docs/reference-docs/auth-backend-and-frontend.md +++ b/docs/reference-docs/auth-backend-and-frontend.md @@ -270,7 +270,10 @@ app.register_graphql_authorization(graphql_authorization_instance) ``` ## Authorization and Workflows -**Role-based access control for workflows is currently in beta. Initial support has been added to the backend, but the feature is not fully communicated through the UI yet.** + +!!! Warning + Role-based access control for workflows is currently in beta. + Initial support has been added to the backend, but the feature is not fully communicated through the UI yet. Certain `orchestrator-core` decorators accept authorization callbacks of type `type Authorizer = Callable[OIDCUserModel, bool]`, which return True when the input user is authorized, otherwise False. From 384317593d469c55f447a4f41cd8bc7f4bb3d58f Mon Sep 17 00:00:00 2001 From: Mark90 Date: Thu, 12 Jun 2025 14:30:14 +0200 Subject: [PATCH 12/12] Bump version to 4.1.0rc2 --- .bumpversion.cfg | 2 +- orchestrator/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.bumpversion.cfg b/.bumpversion.cfg index d27db8672..7183f3029 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 4.1.0rc1 +current_version = 4.1.0rc2 commit = False tag = False parse = (?P\d+)\.(?P\d+)\.(?P\d+)(rc(?P\d+))? diff --git a/orchestrator/__init__.py b/orchestrator/__init__.py index b64ddc5f3..41c1ceef8 100644 --- a/orchestrator/__init__.py +++ b/orchestrator/__init__.py @@ -13,7 +13,7 @@ """This is the orchestrator workflow engine.""" -__version__ = "4.1.0rc1" +__version__ = "4.1.0rc2" from orchestrator.app import OrchestratorCore from orchestrator.settings import app_settings