diff --git a/.bumpversion.cfg b/.bumpversion.cfg index d27db8672..7183f3029 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 4.1.0rc1 +current_version = 4.1.0rc2 commit = False tag = False parse = (?P\d+)\.(?P\d+)\.(?P\d+)(rc(?P\d+))? diff --git a/docs/reference-docs/auth-backend-and-frontend.md b/docs/reference-docs/auth-backend-and-frontend.md index ddeb7548e..e230311c1 100644 --- a/docs/reference-docs/auth-backend-and-frontend.md +++ b/docs/reference-docs/auth-backend-and-frontend.md @@ -269,6 +269,323 @@ app.register_authorization(authorization_instance) app.register_graphql_authorization(graphql_authorization_instance) ``` +## Authorization and Workflows + +!!! Warning + Role-based access control for workflows is currently in beta. + Initial support has been added to the backend, but the feature is not fully communicated through the UI yet. + +Certain `orchestrator-core` decorators accept authorization callbacks of type `type Authorizer = Callable[OIDCUserModel, bool]`, which return True when the input user is authorized, otherwise False. + +A table (below) is available for comparing possible configuration states with the policy that will be enforced. + +### `@workflow` +The `@workflow` decorator accepts the optional parameters `auth: Authorizer` and `retry_auth: Authorizer`. + +`auth` will be used to determine the authorization of a user to start the workflow. +If `auth` is omitted, the workflow is authorized for any logged in user. + +`retry_auth` will be used to determine the authorization of a user to start, resume, or retry the workflow from a failed step. +If `retry_auth` is omitted, then `auth` is used to authorize. + +(This does not percolate past an `@inputstep` that specifies `resume_auth` or `retry_auth`.) + +Examples: + +* `auth=None, retry_auth=None`: any user may run the workflow. +* `auth=A, retry_auth=B`: users authorized by A may start the workflow. Users authorized by B may retry on failure. + * Example: starting the workflow is a decision that must be made by a product owner. Retrying can be made by an on-call member of the operations team. +* `auth=None, retry_auth=B`: any user can start the workflow, but only users authorized by B may retry on failure. + +### `@inputstep` +The `@inputstep` decorator accepts the optional parameters `resume_auth: Authorizer` and `retry_auth: Authorizer`. + +`resume_auth` will be used to determine the authorization of a user to resume the workflow when suspended at this inputstep. +If `resume_auth` is omitted, then the workflow's `auth` will be used. + +`retry_auth` will be used to determine the authorization of a user to retry the workflow from a failed step following the inputstep. +If `retry_auth` is omitted, then `resume_auth` is used to authorize retries. +If `resume_auth` is also omitted, then the workflow’s `retry_auth` is checked, and then the workflow’s `auth`. + +In summary: + +* A workflow establishes `auth` for starting, resuming, or retrying. +* The workflow can also establish `retry_auth`, which will override `auth` for retries. + * An inputstep can override the existing `auth` with `resume_auth` and the existing `retry_auth` with its own `retry_auth`. +* Subsequent inputsteps can do the same, but any None will not overwrite a previous not-None. + +### Policy resolutions +Below is an exhaustive table of how policies (implemented as callbacks `A`, `B`, `C`, and `D`) +are prioritized in different workflow and inputstep configurations. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
ConfigurationEnforcementNotes
@workflow@inputstepbefore @inputstep@inputstep and after
authretry_authresume_authretry_authstartretryresumeretry
NoneNoneNoneNoneAnyoneAnyoneAnyoneAnyoneDefault
ANoneNoneNoneAAAABroadly restrict the workflow to a specific authorizer.
NoneBNoneNoneAnyoneBAnyoneBoriginal retry_auth is maintained if nothing supercedes it. Weird choice, but this provides a "we specifically want to limit retries" route.
ABNoneNoneABABWorkflow-level auth and retry. Allows A or B to be tighter or distinct, as needed.
NoneNoneCNoneAnyoneAnyoneCCAnyone can start this workflow, but only C can continue it.
ANoneCNoneAACCSubsequent retries use C, not A! Override with retry_auth=A if desired.
NoneBCNoneAnyoneBCCSubsequent retries use C, not B! Override with retry_auth=B if desired.
ABCNoneABCCSimple override initial settings with inputstep resume_auth.
NoneNoneNoneDAnyoneAnyoneAnyoneDAnyone can start or retry or resume, but limit retries to D once inputstep is reached.
ANoneNoneDAAADA can start or retry or resume, but limit retries to D once inputstep is reached.
NoneBNoneDAnyoneBAnyoneDAnyone can start or resume, but only B can retry. After inputstep, only D can retry.
ABNoneDABADA can start or resume, but only B can retry. After inputstep, only D can retry.
NoneNoneCDAnyoneAnyoneCDAnyone can start, but only C can resume and only D can retry after the resume.
ANoneCDAACD
NoneBCDAnyoneBCD
ABCDABCD
+ +### Examples +Assume we have the following function that can be used to create callbacks: + +```python +def allow_roles(*roles) -> Callable[OIDCUserModel|None, bool]: + def f(user: OIDCUserModel) -> bool: + if is_admin(user): # Relative to your authorization provider + return True + for role in roles: + if has_role(user, role): # Relative to your authorization provider + return True + return False + + return f +``` + +We can now construct a variety of authorization policies. + +#### Rubber Stamp Model +!!!example + Suppose we have a workflow W that needs to pause on inputstep `approval` for approval from finance. Ops (and only ops) should be able to start the workflow and retry any failed steps. Finance (and only finance) should be able to resume at the input step. + + ```python + @workflow("An expensive workflow", auth=allow_roles("ops")) + def W(...): + return begin >> A >> ... >> notify_finance >> approval >> ... >> Z + + @inputstep("Approval", resume_auth=allow_roles("finance"), retry_auth=allow_roles("ops")) + def approval(...): + ... + ``` + + +#### Hand-off Model +!!!example + Suppose we have two teams, Dev and Platform, and a long workflow W that should be handed off to Platform at step `approval`. + + Dev can start the workflow and retry steps prior to S. Once step S is reached, Platform (and only Platform) can resume the workflow and retry later failed steps. + + ```python + @workflow("An expensive workflow", auth=allow_roles("dev")) + def W(...): + return begin >> A >> ... >> notify_platform >> handoff >> ... >> Z + + @inputstep("Hand-off", resume_auth=allow_roles("platform")) + def handoff(...): + ... + ``` + Notice that default behaviors let us ignore `retry_auth` arguments in both decorators. + +#### Restricted Retries Model +!!!example + Suppose we have a workflow that anyone can run, but with steps that should only be retried by users with certain backend access. + + ```python + @workflow("A workflow for any user", retry_auth=allow_roles("admin")) + def W(...): + return begin >> A >> ... >> S >> ... >> Z + ``` + + Note that we could specify `auth=allow_roles("user")` if helpful, or we can omit `auth` to fail open to any logged in user. + [1]: https://github.com/workfloworchestrator/example-orchestrator-ui [2]: https://github.com/workfloworchestrator/example-orchestrator [3]: https://next-auth.js.org/ diff --git a/orchestrator/__init__.py b/orchestrator/__init__.py index b64ddc5f3..41c1ceef8 100644 --- a/orchestrator/__init__.py +++ b/orchestrator/__init__.py @@ -13,7 +13,7 @@ """This is the orchestrator workflow engine.""" -__version__ = "4.1.0rc1" +__version__ = "4.1.0rc2" from orchestrator.app import OrchestratorCore from orchestrator.settings import app_settings diff --git a/orchestrator/api/api_v1/endpoints/processes.py b/orchestrator/api/api_v1/endpoints/processes.py index 86b168dc3..5fb182e9f 100644 --- a/orchestrator/api/api_v1/endpoints/processes.py +++ b/orchestrator/api/api_v1/endpoints/processes.py @@ -25,7 +25,7 @@ from fastapi.routing import APIRouter from fastapi.websockets import WebSocket from fastapi_etag.dependency import CacheHit -from more_itertools import chunked +from more_itertools import chunked, last from sentry_sdk.tracing import trace from sqlalchemy import CompoundSelect, Select, select from sqlalchemy.orm import defer, joinedload @@ -56,6 +56,7 @@ ) from orchestrator.services.settings import get_engine_settings from orchestrator.settings import app_settings +from orchestrator.utils.auth import Authorizer from orchestrator.utils.enrich_process import enrich_process from orchestrator.websocket import ( WS_CHANNELS, @@ -63,7 +64,7 @@ broadcast_process_update_to_websocket, websocket_manager, ) -from orchestrator.workflow import ProcessStatus +from orchestrator.workflow import ProcessStat, ProcessStatus, StepList, Workflow from pydantic_forms.types import JSON, State router = APIRouter() @@ -86,6 +87,48 @@ def check_global_lock() -> None: ) +def get_current_steps(pstat: ProcessStat) -> StepList: + """Extract past and current steps from the ProcessStat.""" + remaining_steps = pstat.log + past_steps = pstat.workflow.steps[: -len(remaining_steps)] + return StepList(past_steps + [pstat.log[0]]) + + +def get_auth_callbacks(steps: StepList, workflow: Workflow) -> tuple[Authorizer | None, Authorizer | None]: + """Iterate over workflow and prior steps to determine correct authorization callbacks for the current step. + + It's safest to always iterate through the steps. We could track these callbacks statefully + as we progress through the workflow, but if we fail a step and the system restarts, the previous + callbacks will be lost if they're only available in the process state. + + Priority: + - RESUME callback is explicit RESUME callback, else previous START/RESUME callback + - RETRY callback is explicit RETRY, else explicit RESUME, else previous RETRY + """ + # Default to workflow start callbacks + auth_resume = workflow.authorize_callback + # auth_retry defaults to the workflow start callback if not otherwise specified. + # A workflow SHOULD have both callbacks set to not-None. This enforces the correct default regardless. + auth_retry = workflow.retry_auth_callback or auth_resume # type: ignore[unreachable, truthy-function] + + # Choose the most recently established value for resume. + auth_resume = last(filter(None, (step.resume_auth_callback for step in steps)), auth_resume) + # Choose the most recently established value for retry, unless there is a more recent value for resume. + auth_retry = last( + filter(None, (step.retry_auth_callback or step.resume_auth_callback for step in steps)), auth_retry + ) + return auth_resume, auth_retry + + +def can_be_resumed(status: ProcessStatus) -> bool: + return status in ( + ProcessStatus.SUSPENDED, # Can be resumed + ProcessStatus.FAILED, # Can be retried + ProcessStatus.API_UNAVAILABLE, # subtype of FAILED + ProcessStatus.INCONSISTENT_DATA, # subtype of FAILED + ) + + def resolve_user_name( *, reporter: Reporter | None, @@ -150,18 +193,25 @@ def new_process( dependencies=[Depends(check_global_lock, use_cache=False)], ) def resume_process_endpoint( - process_id: UUID, request: Request, json_data: JSON = Body(...), user: str = Depends(user_name) + process_id: UUID, + request: Request, + json_data: JSON = Body(...), + user: str = Depends(user_name), + user_model: OIDCUserModel | None = Depends(authenticate), ) -> None: process = _get_process(process_id) - if process.last_status == ProcessStatus.COMPLETED: - raise_status(HTTPStatus.CONFLICT, "Resuming a completed workflow is not possible") - - if process.last_status == ProcessStatus.RUNNING: - raise_status(HTTPStatus.CONFLICT, "Resuming a running workflow is not possible") - - if process.last_status == ProcessStatus.RESUMED: - raise_status(HTTPStatus.CONFLICT, "Resuming a resumed workflow is not possible") + if not can_be_resumed(process.last_status): + raise_status(HTTPStatus.CONFLICT, f"Resuming a {process.last_status.lower()} workflow is not possible") + + pstat = load_process(process) + auth_resume, auth_retry = get_auth_callbacks(get_current_steps(pstat), pstat.workflow) + if process.last_status == ProcessStatus.SUSPENDED: + if auth_resume is not None and not auth_resume(user_model): + raise_status(HTTPStatus.FORBIDDEN, "User is not authorized to resume step") + elif process.last_status == ProcessStatus.FAILED: + if auth_retry is not None and not auth_retry(user_model): + raise_status(HTTPStatus.FORBIDDEN, "User is not authorized to retry step") broadcast_invalidate_status_counts() broadcast_func = api_broadcast_process_data(request) diff --git a/orchestrator/services/celery.py b/orchestrator/services/celery.py index 2aa9fa86c..09a7ebcfc 100644 --- a/orchestrator/services/celery.py +++ b/orchestrator/services/celery.py @@ -19,6 +19,7 @@ from celery.result import AsyncResult from kombu.exceptions import ConnectionError, OperationalError +from oauth2_lib.fastapi import OIDCUserModel from orchestrator import app_settings from orchestrator.api.error_handling import raise_status from orchestrator.db import ProcessTable, db @@ -42,7 +43,11 @@ def _block_when_testing(task_result: AsyncResult) -> None: def _celery_start_process( - workflow_key: str, user_inputs: list[State] | None, user: str = SYSTEM_USER, **kwargs: Any + workflow_key: str, + user_inputs: list[State] | None, + user: str = SYSTEM_USER, + user_model: OIDCUserModel | None = None, + **kwargs: Any, ) -> UUID: """Client side call of Celery.""" from orchestrator.services.tasks import NEW_TASK, NEW_WORKFLOW, get_celery_task @@ -57,7 +62,7 @@ def _celery_start_process( task_name = NEW_TASK if wf_table.is_task else NEW_WORKFLOW trigger_task = get_celery_task(task_name) - pstat = create_process(workflow_key, user_inputs, user) + pstat = create_process(workflow_key, user_inputs=user_inputs, user=user, user_model=user_model) try: result = trigger_task.delay(pstat.process_id, workflow_key, user) _block_when_testing(result) diff --git a/orchestrator/services/processes.py b/orchestrator/services/processes.py index b08c17228..bb5e56b95 100644 --- a/orchestrator/services/processes.py +++ b/orchestrator/services/processes.py @@ -467,9 +467,7 @@ def thread_start_process( user_model: OIDCUserModel | None = None, broadcast_func: BroadcastFunc | None = None, ) -> UUID: - pstat = create_process(workflow_key, user_inputs=user_inputs, user=user) - if not pstat.workflow.authorize_callback(user_model): - raise_status(HTTPStatus.FORBIDDEN, error_message_unauthorized(workflow_key)) + pstat = create_process(workflow_key, user_inputs=user_inputs, user=user, user_model=user_model) _safe_logstep_with_func = partial(safe_logstep, broadcast_func=broadcast_func) return _run_process_async(pstat.process_id, lambda: runwf(pstat, _safe_logstep_with_func)) @@ -506,7 +504,6 @@ def thread_resume_process( *, user_inputs: list[State] | None = None, user: str | None = None, - user_model: OIDCUserModel | None = None, broadcast_func: BroadcastFunc | None = None, ) -> UUID: # ATTENTION!! When modifying this function make sure you make similar changes to `resume_workflow` in the test code @@ -515,8 +512,6 @@ def thread_resume_process( user_inputs = [{}] pstat = load_process(process) - if not pstat.workflow.authorize_callback(user_model): - raise_status(HTTPStatus.FORBIDDEN, error_message_unauthorized(str(process.workflow_name))) if pstat.workflow == removed_workflow: raise ValueError("This workflow cannot be resumed") @@ -556,7 +551,6 @@ def resume_process( *, user_inputs: list[State] | None = None, user: str | None = None, - user_model: OIDCUserModel | None = None, broadcast_func: BroadcastFunc | None = None, ) -> UUID: """Resume a failed or suspended process. @@ -565,7 +559,6 @@ def resume_process( process: Process from database user_inputs: Optional user input from forms user: user who resumed this process - user_model: OIDCUserModel of user who resumed this process broadcast_func: Optional function to broadcast process data Returns: @@ -573,8 +566,6 @@ def resume_process( """ pstat = load_process(process) - if not pstat.workflow.authorize_callback(user_model): - raise_status(HTTPStatus.FORBIDDEN, error_message_unauthorized(str(process.workflow_name))) try: post_form(pstat.log[0].form, pstat.state.unwrap(), user_inputs=user_inputs or []) diff --git a/orchestrator/utils/auth.py b/orchestrator/utils/auth.py new file mode 100644 index 000000000..d86705e04 --- /dev/null +++ b/orchestrator/utils/auth.py @@ -0,0 +1,9 @@ +from collections.abc import Callable +from typing import TypeAlias + +from oauth2_lib.fastapi import OIDCUserModel + +# This file is broken out separately to avoid circular imports. + +# Can instead use "type Authorizer = ..." in later Python versions. +Authorizer: TypeAlias = Callable[[OIDCUserModel | None], bool] diff --git a/orchestrator/workflow.py b/orchestrator/workflow.py index 2c9aa3b8f..b15d00196 100644 --- a/orchestrator/workflow.py +++ b/orchestrator/workflow.py @@ -45,6 +45,7 @@ from orchestrator.services.settings import get_engine_settings from orchestrator.targets import Target from orchestrator.types import ErrorDict, StepFunc +from orchestrator.utils.auth import Authorizer from orchestrator.utils.docs import make_workflow_doc from orchestrator.utils.errors import error_state_to_dict from orchestrator.utils.state import form_inject_args, inject_args @@ -80,6 +81,8 @@ class Step(Protocol): name: str form: InputFormGenerator | None assignee: Assignee | None + resume_auth_callback: Authorizer | None = None + retry_auth_callback: Authorizer | None = None def __call__(self, state: State) -> Process: ... @@ -90,7 +93,8 @@ class Workflow(Protocol): __qualname__: str name: str description: str - authorize_callback: Callable[[OIDCUserModel | None], bool] + authorize_callback: Authorizer + retry_auth_callback: Authorizer initial_input_form: InputFormGenerator | None = None target: Target steps: StepList @@ -99,13 +103,20 @@ def __call__(self) -> NoReturn: ... def make_step_function( - f: Callable, name: str, form: InputFormGenerator | None = None, assignee: Assignee | None = Assignee.SYSTEM + f: Callable, + name: str, + form: InputFormGenerator | None = None, + assignee: Assignee | None = Assignee.SYSTEM, + resume_auth_callback: Authorizer | None = None, + retry_auth_callback: Authorizer | None = None, ) -> Step: step_func = cast(Step, f) step_func.name = name step_func.form = form step_func.assignee = assignee + step_func.resume_auth_callback = resume_auth_callback + step_func.retry_auth_callback = retry_auth_callback return step_func @@ -167,6 +178,7 @@ def __repr__(self) -> str: def _handle_simple_input_form_generator(f: StateInputStepFunc) -> StateInputFormGenerator: + """Processes f into a form generator and injects a pre-hook for user authorization.""" if inspect.isgeneratorfunction(f): return cast(StateInputFormGenerator, f) if inspect.isgenerator(f): @@ -191,7 +203,8 @@ def make_workflow( initial_input_form: InputStepFunc | None, target: Target, steps: StepList, - authorize_callback: Callable[[OIDCUserModel | None], bool] | None = None, + authorize_callback: Authorizer | None = None, + retry_auth_callback: Authorizer | None = None, ) -> Workflow: @functools.wraps(f) def wrapping_function() -> NoReturn: @@ -202,6 +215,10 @@ def wrapping_function() -> NoReturn: wrapping_function.name = f.__name__ # default, will be changed by LazyWorkflowInstance wrapping_function.description = description wrapping_function.authorize_callback = allow if authorize_callback is None else authorize_callback + # If no retry auth policy is given, defer to policy for process creation. + wrapping_function.retry_auth_callback = ( + wrapping_function.authorize_callback if retry_auth_callback is None else retry_auth_callback + ) if initial_input_form is None: # We always need a form to prevent starting a workflow when no input is needed. @@ -270,9 +287,16 @@ def wrapper(state: State) -> Process: return decorator -def inputstep(name: str, assignee: Assignee) -> Callable[[InputStepFunc], Step]: +def inputstep( + name: str, + assignee: Assignee, + resume_auth_callback: Authorizer | None = None, + retry_auth_callback: Authorizer | None = None, +) -> Callable[[InputStepFunc], Step]: """Add user input step to workflow. + Any authorization callbacks will be attached to the resulting Step. + IMPORTANT: In contrast to other workflow steps, the `@inputstep` wrapped function will not run in the workflow engine! This means that it must be free of side effects! @@ -299,7 +323,14 @@ def wrapper(state: State) -> FormGenerator: def suspend(state: State) -> Process: return Suspend(state) - return make_step_function(suspend, name, wrapper, assignee) + return make_step_function( + suspend, + name, + wrapper, + assignee, + resume_auth_callback=resume_auth_callback, + retry_auth_callback=retry_auth_callback, + ) return decorator @@ -479,7 +510,8 @@ def workflow( description: str, initial_input_form: InputStepFunc | None = None, target: Target = Target.SYSTEM, - authorize_callback: Callable[[OIDCUserModel | None], bool] | None = None, + authorize_callback: Authorizer | None = None, + retry_auth_callback: Authorizer | None = None, ) -> Callable[[Callable[[], StepList]], Workflow]: """Transform an initial_input_form and a step list into a workflow. @@ -500,7 +532,13 @@ def create_service_port(): def _workflow(f: Callable[[], StepList]) -> Workflow: return make_workflow( - f, description, initial_input_form_in_form_inject_args, target, f(), authorize_callback=authorize_callback + f, + description, + initial_input_form_in_form_inject_args, + target, + f(), + authorize_callback=authorize_callback, + retry_auth_callback=retry_auth_callback, ) return _workflow diff --git a/orchestrator/workflows/utils.py b/orchestrator/workflows/utils.py index 9377b071c..debca23f0 100644 --- a/orchestrator/workflows/utils.py +++ b/orchestrator/workflows/utils.py @@ -20,12 +20,12 @@ from pydantic import field_validator, model_validator from sqlalchemy import select -from oauth2_lib.fastapi import OIDCUserModel from orchestrator.db import ProductTable, SubscriptionTable, db from orchestrator.forms.validators import ProductId from orchestrator.services import subscriptions from orchestrator.targets import Target from orchestrator.types import SubscriptionLifecycle +from orchestrator.utils.auth import Authorizer from orchestrator.utils.errors import StaleDataError from orchestrator.utils.state import form_inject_args from orchestrator.utils.validate_data_version import validate_data_version @@ -201,7 +201,8 @@ def create_workflow( initial_input_form: InputStepFunc | None = None, status: SubscriptionLifecycle = SubscriptionLifecycle.ACTIVE, additional_steps: StepList | None = None, - authorize_callback: Callable[[OIDCUserModel | None], bool] | None = None, + authorize_callback: Authorizer | None = None, + retry_auth_callback: Authorizer | None = None, ) -> Callable[[Callable[[], StepList]], Workflow]: """Transform an initial_input_form and a step list into a workflow with a target=Target.CREATE. @@ -234,6 +235,7 @@ def _create_workflow(f: Callable[[], StepList]) -> Workflow: Target.CREATE, steplist, authorize_callback=authorize_callback, + retry_auth_callback=retry_auth_callback, ) return _create_workflow @@ -243,7 +245,8 @@ def modify_workflow( description: str, initial_input_form: InputStepFunc | None = None, additional_steps: StepList | None = None, - authorize_callback: Callable[[OIDCUserModel | None], bool] | None = None, + authorize_callback: Authorizer | None = None, + retry_auth_callback: Authorizer | None = None, ) -> Callable[[Callable[[], StepList]], Workflow]: """Transform an initial_input_form and a step list into a workflow. @@ -278,6 +281,7 @@ def _modify_workflow(f: Callable[[], StepList]) -> Workflow: Target.MODIFY, steplist, authorize_callback=authorize_callback, + retry_auth_callback=retry_auth_callback, ) return _modify_workflow @@ -287,7 +291,8 @@ def terminate_workflow( description: str, initial_input_form: InputStepFunc | None = None, additional_steps: StepList | None = None, - authorize_callback: Callable[[OIDCUserModel | None], bool] | None = None, + authorize_callback: Authorizer | None = None, + retry_auth_callback: Authorizer | None = None, ) -> Callable[[Callable[[], StepList]], Workflow]: """Transform an initial_input_form and a step list into a workflow. @@ -323,6 +328,7 @@ def _terminate_workflow(f: Callable[[], StepList]) -> Workflow: Target.TERMINATE, steplist, authorize_callback=authorize_callback, + retry_auth_callback=retry_auth_callback, ) return _terminate_workflow diff --git a/test/unit_tests/api/test_processes.py b/test/unit_tests/api/test_processes.py index 1795068d8..343da6206 100644 --- a/test/unit_tests/api/test_processes.py +++ b/test/unit_tests/api/test_processes.py @@ -9,6 +9,7 @@ from sqlalchemy import select from oauth2_lib.fastapi import OIDCUserModel +from orchestrator.api.api_v1.endpoints.processes import get_auth_callbacks from orchestrator.config.assignee import Assignee from orchestrator.db import ( ProcessStepTable, @@ -22,7 +23,8 @@ from orchestrator.services.settings import get_engine_settings from orchestrator.settings import app_settings from orchestrator.targets import Target -from orchestrator.workflow import ProcessStatus, done, init, step, workflow +from orchestrator.workflow import ProcessStatus, StepList, done, init, inputstep, make_workflow, step, workflow +from pydantic_forms.core import FormPage from test.unit_tests.helpers import URL_STR_TYPE from test.unit_tests.workflows import WorkflowInstanceForTests @@ -593,3 +595,162 @@ def unauthorized_workflow(): with WorkflowInstanceForTests(unauthorized_workflow, "unauthorized_workflow"): response = test_client.post("/api/processes/unauthorized_workflow", json=[{}]) assert HTTPStatus.FORBIDDEN == response.status_code + + +def test_inputstep_authorization(test_client): + def disallow(_: OIDCUserModel | None = None) -> bool: + return False + + def allow(_: OIDCUserModel | None = None) -> bool: + return True + + class ConfirmForm(FormPage): + confirm: bool + + @inputstep("unauthorized_resume", assignee=Assignee.SYSTEM, resume_auth_callback=disallow) + def unauthorized_resume(state): + user_input = yield ConfirmForm + return user_input.model_dump() + + @inputstep("authorized_resume", assignee=Assignee.SYSTEM, resume_auth_callback=allow) + def authorized_resume(state): + user_input = yield ConfirmForm + return user_input.model_dump() + + @inputstep("noauth_resume", assignee=Assignee.SYSTEM) + def noauth_resume(state): + user_input = yield ConfirmForm + return user_input.model_dump() + + @workflow("test_auth_workflow", target=Target.CREATE) + def test_auth_workflow(): + return init >> noauth_resume >> authorized_resume >> unauthorized_resume >> done + + with WorkflowInstanceForTests(test_auth_workflow, "test_auth_workflow"): + response = test_client.post("/api/processes/test_auth_workflow", json=[{}]) + assert HTTPStatus.CREATED == response.status_code + process_id = response.json()["id"] + # No auth succeeds + response = test_client.put(f"/api/processes/{process_id}/resume", json=[{"confirm": True}]) + assert HTTPStatus.NO_CONTENT == response.status_code + # Authorized succeeds + response = test_client.put(f"/api/processes/{process_id}/resume", json=[{"confirm": True}]) + assert HTTPStatus.NO_CONTENT == response.status_code + # Unauthorized fails + response = test_client.put(f"/api/processes/{process_id}/resume", json=[{"confirm": True}]) + assert HTTPStatus.FORBIDDEN == response.status_code + + +@pytest.mark.xfail(reason="core currently lacks support for tests involving a failed step") +def test_retry_authorization(test_client): + def disallow(_: OIDCUserModel | None = None) -> bool: + return False + + def allow(_: OIDCUserModel | None = None) -> bool: + return True + + class ConfirmForm(FormPage): + confirm: bool + + @inputstep("authorized_resume", assignee=Assignee.SYSTEM, resume_auth_callback=allow, retry_auth_callback=disallow) + def authorized_resume(state): + user_input = yield ConfirmForm + return user_input.model_dump() + + @step("fails once") + def fails_once(state): + if not hasattr(fails_once, "called"): + fails_once.called = False + + if not fails_once.called: + fails_once.called = True + raise RuntimeError("Failing intentionally, ignore") + return {} + + @workflow("test_auth_workflow", target=Target.CREATE, authorize_callback=allow, retry_auth_callback=disallow) + def test_auth_workflow(): + return init >> authorized_resume >> fails_once >> done + + with WorkflowInstanceForTests(test_auth_workflow, "test_auth_workflow"): + # Creating workflow succeeds + response = test_client.post("/api/processes/test_auth_workflow", json=[{}]) + assert HTTPStatus.CREATED == response.status_code + process_id = response.json()["id"] + # We're authorized to resume, but this will error, so we can retry + response = test_client.put(f"/api/processes/{process_id}/resume", json=[{"confirm": True}]) + assert HTTPStatus.NO_CONTENT == response.status_code + # We're authorized to retry, in spite of workflow's retry_auth_callback=disallow + response = test_client.put(f"/api/processes/{process_id}/resume", json=[{}]) + assert HTTPStatus.NO_CONTENT == response.status_code + + +def _A(_: OIDCUserModel) -> bool: + return True + + +def _B(_: OIDCUserModel) -> bool: + return True + + +def _C(_: OIDCUserModel) -> bool: + return True + + +def _D(_: OIDCUserModel) -> bool: + return True + + +@pytest.mark.parametrize( + "policies, decisions", + [ + ((None, None, None, None), (None, None)), + ((_A, None, None, None), (_A, _A)), + ((None, _B, None, None), (None, _B)), + ((_A, _B, None, None), (_A, _B)), + ((None, None, _C, None), (_C, _C)), + ((_A, None, _C, None), (_C, _C)), + ((None, _B, _C, None), (_C, _C)), + ((_A, _B, _C, None), (_C, _C)), + ((None, None, None, _D), (None, _D)), + ((_A, None, None, _D), (_A, _D)), + ((None, _B, None, _D), (None, _D)), + ((_A, _B, None, _D), (_A, _D)), + ((None, None, _C, _D), (_C, _D)), # 4 + ((_A, None, _C, _D), (_C, _D)), + ((None, _B, _C, _D), (_C, _D)), + ((_A, _B, _C, _D), (_C, _D)), + ], +) +def test_get_auth_callbacks(policies, decisions): + @step("bar") + def bar(): + return {} + + @step("baz") + def baz(): + return {} + + workflow = make_workflow( + f=lambda: {}, + description="description", + initial_input_form=None, + target=Target.SYSTEM, + steps=StepList([]), + authorize_callback=None, + retry_auth_callback=None, + ) + + auth, retry, step_resume_auth, step_retry_auth = policies + want_auth, want_retry = decisions + workflow.authorize_callback = auth + workflow.retry_auth_callback = retry + + @inputstep("foo", Target.SYSTEM, step_resume_auth, step_retry_auth) + def foo(): + return {} + + steps = StepList([bar, foo, baz]) + + got_auth, got_retry = get_auth_callbacks(steps, workflow) + assert got_auth == want_auth + assert got_retry == want_retry