diff --git a/metaflow/plugins/__init__.py b/metaflow/plugins/__init__.py index f74a70726ba..1f364af0032 100644 --- a/metaflow/plugins/__init__.py +++ b/metaflow/plugins/__init__.py @@ -69,6 +69,7 @@ ("trigger_on_finish", ".events_decorator.TriggerOnFinishDecorator"), ("pypi_base", ".pypi.pypi_decorator.PyPIFlowDecorator"), ("conda_base", ".pypi.conda_decorator.CondaFlowDecorator"), + ("run_on_finish", ".argo.hook_decorator.RunOnFinishDecorator"), ] # Add environments here diff --git a/metaflow/plugins/argo/argo_workflows.py b/metaflow/plugins/argo/argo_workflows.py index af4a510cb5b..512e4a94ca4 100644 --- a/metaflow/plugins/argo/argo_workflows.py +++ b/metaflow/plugins/argo/argo_workflows.py @@ -65,6 +65,7 @@ ) from .argo_client import ArgoClient +from .exit_hooks import ExitHookHack, HttpExitHook from metaflow.util import resolve_identity @@ -795,6 +796,7 @@ def _compile_workflow_template(self): dag_annotation = {"metaflow/dag": json.dumps(graph_info)} + lifecycle_hooks = self._lifecycle_hooks() return ( WorkflowTemplate() .metadata( @@ -903,97 +905,20 @@ def _compile_workflow_template(self): if self.enable_error_msg_capture else None ) - # Set exit hook handlers if notifications are enabled + # Set lifecycle hooks if notifications are enabled .hooks( { - **( - { - # workflow status maps to Completed - "notify-slack-on-success": LifecycleHook() - .expression("workflow.status == 'Succeeded'") - .template("notify-slack-on-success"), - } - if self.notify_on_success and self.notify_slack_webhook_url - else {} - ), - **( - { - # workflow status maps to Completed - "notify-pager-duty-on-success": LifecycleHook() - .expression("workflow.status == 'Succeeded'") - .template("notify-pager-duty-on-success"), - } - if self.notify_on_success - and self.notify_pager_duty_integration_key - else {} - ), - **( - { - # workflow status maps to Completed - "notify-incident-io-on-success": LifecycleHook() - .expression("workflow.status == 'Succeeded'") - .template("notify-incident-io-on-success"), - } - if self.notify_on_success - and self.notify_incident_io_api_key - else {} - ), - **( - { - # workflow status maps to Failed or Error - "notify-slack-on-failure": LifecycleHook() - .expression("workflow.status == 'Failed'") - .template("notify-slack-on-error"), - "notify-slack-on-error": LifecycleHook() - .expression("workflow.status == 'Error'") - .template("notify-slack-on-error"), - } - if self.notify_on_error and self.notify_slack_webhook_url - else {} - ), - **( - { - # workflow status maps to Failed or Error - "notify-pager-duty-on-failure": LifecycleHook() - .expression("workflow.status == 'Failed'") - .template("notify-pager-duty-on-error"), - "notify-pager-duty-on-error": LifecycleHook() - .expression("workflow.status == 'Error'") - .template("notify-pager-duty-on-error"), - } - if self.notify_on_error - and self.notify_pager_duty_integration_key - else {} - ), - **( - { - # workflow status maps to Failed or Error - "notify-incident-io-on-failure": LifecycleHook() - .expression("workflow.status == 'Failed'") - .template("notify-incident-io-on-error"), - "notify-incident-io-on-error": LifecycleHook() - .expression("workflow.status == 'Error'") - .template("notify-incident-io-on-error"), - } - if self.notify_on_error and self.notify_incident_io_api_key - else {} - ), - # Warning: terrible hack to workaround a bug in Argo Workflow - # where the hooks listed above do not execute unless - # there is an explicit exit hook. as and when this - # bug is patched, we should remove this effectively - # no-op hook. - **( - {"exit": LifecycleHook().template("exit-hook-hack")} - if self.notify_on_error or self.notify_on_success - else {} - ), + lifecycle.name: lifecycle + for hook in lifecycle_hooks + for lifecycle in hook.lifecycle_hooks } ) # Top-level DAG template(s) .templates(self._dag_templates()) # Container templates .templates(self._container_templates()) + # Lifecycle hook template(s) + .templates([hook.template for hook in lifecycle_hooks]) # Exit hook template(s) .templates(self._exit_hook_templates()) # Sidecar templates (Daemon Containers) @@ -2333,40 +2258,42 @@ def _daemon_templates(self): templates.append(self._heartbeat_daemon_template()) return templates - # Return exit hook templates for workflow execution notifications. - def _exit_hook_templates(self): - templates = [] + # Return lifecycle hooks for workflow execution notifications. + def _lifecycle_hooks(self): + hooks = [] if self.notify_on_error: - templates.append(self._slack_error_template()) - templates.append(self._pager_duty_alert_template()) - templates.append(self._incident_io_alert_template()) + hooks.append(self._slack_error_template()) + hooks.append(self._pager_duty_alert_template()) + hooks.append(self._incident_io_alert_template()) if self.notify_on_success: - templates.append(self._slack_success_template()) - templates.append(self._pager_duty_change_template()) - templates.append(self._incident_io_change_template()) + hooks.append(self._slack_success_template()) + hooks.append(self._pager_duty_change_template()) + hooks.append(self._incident_io_change_template()) + + run_on_finish_decos = self.flow._flow_decorators.get("run_on_finish", []) + + for deco in run_on_finish_decos: + hooks.extend(deco.hooks) # Clean up None values from templates. - templates = list(filter(None, templates)) - - if self.notify_on_error or self.notify_on_success: - # Warning: terrible hack to workaround a bug in Argo Workflow where the - # templates listed above do not execute unless there is an - # explicit exit hook. as and when this bug is patched, we should - # remove this effectively no-op template. - # Note: We use the Http template because changing this to an actual no-op container had the side-effect of - # leaving LifecycleHooks in a pending state even when they have finished execution. - templates.append( - Template("exit-hook-hack").http( - Http("GET") - .url( + hooks = list(filter(None, hooks)) + + if hooks: + hooks.append( + ExitHookHack( + url=( self.notify_slack_webhook_url or "https://events.pagerduty.com/v2/enqueue" ) - .success_condition("true == true") ) ) + return hooks + + def _exit_hook_templates(self): + templates = [] if self.enable_error_msg_capture: templates.extend(self._error_msg_capture_hook_templates()) + return templates def _error_msg_capture_hook_templates(self): @@ -2515,30 +2442,30 @@ def _pager_duty_alert_template(self): # https://developer.pagerduty.com/docs/ZG9jOjExMDI5NTgx-send-an-alert-event if self.notify_pager_duty_integration_key is None: return None - return Template("notify-pager-duty-on-error").http( - Http("POST") - .url("https://events.pagerduty.com/v2/enqueue") - .header("Content-Type", "application/json") - .body( - json.dumps( - { - "event_action": "trigger", - "routing_key": self.notify_pager_duty_integration_key, - # "dedup_key": self.flow.name, # TODO: Do we need deduplication? - "payload": { - "source": "{{workflow.name}}", - "severity": "info", - "summary": "Metaflow run %s/argo-{{workflow.name}} failed!" - % self.flow.name, - "custom_details": { - "Flow": self.flow.name, - "Run ID": "argo-{{workflow.name}}", - }, + return HttpExitHook( + name="notify-pager-duty-on-error", + method="POST", + url="https://events.pagerduty.com/v2/enqueue", + headers={"Content-Type": "application/json"}, + body=json.dumps( + { + "event_action": "trigger", + "routing_key": self.notify_pager_duty_integration_key, + # "dedup_key": self.flow.name, # TODO: Do we need deduplication? + "payload": { + "source": "{{workflow.name}}", + "severity": "info", + "summary": "Metaflow run %s/argo-{{workflow.name}} failed!" + % self.flow.name, + "custom_details": { + "Flow": self.flow.name, + "Run ID": "argo-{{workflow.name}}", }, - "links": self._pager_duty_notification_links(), - } - ) - ) + }, + "links": self._pager_duty_notification_links(), + } + ), + on_error=True, ) def _incident_io_alert_template(self): @@ -2549,50 +2476,52 @@ def _incident_io_alert_template(self): "Creating alerts for errors requires a alert source config ID." ) ui_links = self._incident_io_ui_urls_for_run() - return Template("notify-incident-io-on-error").http( - Http("POST") - .url( + return HttpExitHook( + name="notify-incident-io-on-error", + method="POST", + url=( "https://api.incident.io/v2/alert_events/http/%s" % self.incident_io_alert_source_config_id - ) - .header("Content-Type", "application/json") - .header("Authorization", "Bearer %s" % self.notify_incident_io_api_key) - .body( - json.dumps( - { - "idempotency_key": "argo-{{workflow.name}}", # use run id to deduplicate alerts. - "status": "firing", - "title": "Flow %s has failed." % self.flow.name, - "description": "Metaflow run {run_pathspec} failed!{urls}".format( - run_pathspec="%s/argo-{{workflow.name}}" % self.flow.name, - urls=( - "\n\nSee details for the run at:\n\n" - + "\n\n".join(ui_links) - if ui_links - else "" - ), - ), - "source_url": ( - "%s/%s/%s" - % ( - UI_URL.rstrip("/"), - self.flow.name, - "argo-{{workflow.name}}", - ) - if UI_URL - else None + ), + headers={ + "Content-Type": "application/json", + "Authorization": "Bearer %s" % self.notify_incident_io_api_key, + }, + body=json.dumps( + { + "idempotency_key": "argo-{{workflow.name}}", # use run id to deduplicate alerts. + "status": "firing", + "title": "Flow %s has failed." % self.flow.name, + "description": "Metaflow run {run_pathspec} failed!{urls}".format( + run_pathspec="%s/argo-{{workflow.name}}" % self.flow.name, + urls=( + "\n\nSee details for the run at:\n\n" + + "\n\n".join(ui_links) + if ui_links + else "" ), - "metadata": { - **(self.incident_io_metadata or {}), - **{ - "run_status": "failed", - "flow_name": self.flow.name, - "run_id": "argo-{{workflow.name}}", - }, + ), + "source_url": ( + "%s/%s/%s" + % ( + UI_URL.rstrip("/"), + self.flow.name, + "argo-{{workflow.name}}", + ) + if UI_URL + else None + ), + "metadata": { + **(self.incident_io_metadata or {}), + **{ + "run_status": "failed", + "flow_name": self.flow.name, + "run_id": "argo-{{workflow.name}}", }, - } - ) - ) + }, + } + ), + on_error=True, ) def _incident_io_change_template(self): @@ -2603,50 +2532,52 @@ def _incident_io_change_template(self): "Creating alerts for successes requires an alert source config ID." ) ui_links = self._incident_io_ui_urls_for_run() - return Template("notify-incident-io-on-success").http( - Http("POST") - .url( + return HttpExitHook( + name="notify-incident-io-on-success", + method="POST", + url=( "https://api.incident.io/v2/alert_events/http/%s" % self.incident_io_alert_source_config_id - ) - .header("Content-Type", "application/json") - .header("Authorization", "Bearer %s" % self.notify_incident_io_api_key) - .body( - json.dumps( - { - "idempotency_key": "argo-{{workflow.name}}", # use run id to deduplicate alerts. - "status": "firing", - "title": "Flow %s has succeeded." % self.flow.name, - "description": "Metaflow run {run_pathspec} succeeded!{urls}".format( - run_pathspec="%s/argo-{{workflow.name}}" % self.flow.name, - urls=( - "\n\nSee details for the run at:\n\n" - + "\n\n".join(ui_links) - if ui_links - else "" - ), - ), - "source_url": ( - "%s/%s/%s" - % ( - UI_URL.rstrip("/"), - self.flow.name, - "argo-{{workflow.name}}", - ) - if UI_URL - else None + ), + headers={ + "Content-Type": "application/json", + "Authorization": "Bearer %s" % self.notify_incident_io_api_key, + }, + body=json.dumps( + { + "idempotency_key": "argo-{{workflow.name}}", # use run id to deduplicate alerts. + "status": "firing", + "title": "Flow %s has succeeded." % self.flow.name, + "description": "Metaflow run {run_pathspec} succeeded!{urls}".format( + run_pathspec="%s/argo-{{workflow.name}}" % self.flow.name, + urls=( + "\n\nSee details for the run at:\n\n" + + "\n\n".join(ui_links) + if ui_links + else "" ), - "metadata": { - **(self.incident_io_metadata or {}), - **{ - "run_status": "succeeded", - "flow_name": self.flow.name, - "run_id": "argo-{{workflow.name}}", - }, + ), + "source_url": ( + "%s/%s/%s" + % ( + UI_URL.rstrip("/"), + self.flow.name, + "argo-{{workflow.name}}", + ) + if UI_URL + else None + ), + "metadata": { + **(self.incident_io_metadata or {}), + **{ + "run_status": "succeeded", + "flow_name": self.flow.name, + "run_id": "argo-{{workflow.name}}", }, - } - ) - ) + }, + } + ), + on_success=True, ) def _incident_io_ui_urls_for_run(self): @@ -2671,27 +2602,27 @@ def _pager_duty_change_template(self): # https://developer.pagerduty.com/docs/ZG9jOjExMDI5NTgy-send-a-change-event if self.notify_pager_duty_integration_key is None: return None - return Template("notify-pager-duty-on-success").http( - Http("POST") - .url("https://events.pagerduty.com/v2/change/enqueue") - .header("Content-Type", "application/json") - .body( - json.dumps( - { - "routing_key": self.notify_pager_duty_integration_key, - "payload": { - "summary": "Metaflow run %s/argo-{{workflow.name}} Succeeded" - % self.flow.name, - "source": "{{workflow.name}}", - "custom_details": { - "Flow": self.flow.name, - "Run ID": "argo-{{workflow.name}}", - }, + return HttpExitHook( + name="notify-pager-duty-on-success", + method="POST", + url="https://events.pagerduty.com/v2/change/enqueue", + headers={"Content-Type": "application/json"}, + body=json.dumps( + { + "routing_key": self.notify_pager_duty_integration_key, + "payload": { + "summary": "Metaflow run %s/argo-{{workflow.name}} Succeeded" + % self.flow.name, + "source": "{{workflow.name}}", + "custom_details": { + "Flow": self.flow.name, + "Run ID": "argo-{{workflow.name}}", }, - "links": self._pager_duty_notification_links(), - } - ) - ) + }, + "links": self._pager_duty_notification_links(), + } + ), + on_success=True, ) def _pager_duty_notification_links(self): @@ -2795,8 +2726,12 @@ def _slack_error_template(self): blocks = self._get_slack_blocks(message) payload = {"text": message, "blocks": blocks} - return Template("notify-slack-on-error").http( - Http("POST").url(self.notify_slack_webhook_url).body(json.dumps(payload)) + return HttpExitHook( + name="notify-slack-on-error", + method="POST", + url=self.notify_slack_webhook_url, + body=json.dumps(payload), + on_error=True, ) def _slack_success_template(self): @@ -2811,8 +2746,12 @@ def _slack_success_template(self): blocks = self._get_slack_blocks(message) payload = {"text": message, "blocks": blocks} - return Template("notify-slack-on-success").http( - Http("POST").url(self.notify_slack_webhook_url).body(json.dumps(payload)) + return HttpExitHook( + name="notify-slack-on-success", + method="POST", + url=self.notify_slack_webhook_url, + body=json.dumps(payload), + on_success=True, ) def _heartbeat_daemon_template(self): @@ -4179,57 +4118,3 @@ def to_json(self): def __str__(self): return json.dumps(self.payload, indent=4) - - -class Http(object): - # https://argoproj.github.io/argo-workflows/fields/#http - - def __init__(self, method): - tree = lambda: defaultdict(tree) - self.payload = tree() - self.payload["method"] = method - self.payload["headers"] = [] - - def header(self, header, value): - self.payload["headers"].append({"name": header, "value": value}) - return self - - def body(self, body): - self.payload["body"] = str(body) - return self - - def url(self, url): - self.payload["url"] = url - return self - - def success_condition(self, success_condition): - self.payload["successCondition"] = success_condition - return self - - def to_json(self): - return self.payload - - def __str__(self): - return json.dumps(self.payload, indent=4) - - -class LifecycleHook(object): - # https://argoproj.github.io/argo-workflows/fields/#lifecyclehook - - def __init__(self): - tree = lambda: defaultdict(tree) - self.payload = tree() - - def expression(self, expression): - self.payload["expression"] = str(expression) - return self - - def template(self, template): - self.payload["template"] = template - return self - - def to_json(self): - return self.payload - - def __str__(self): - return json.dumps(self.payload, indent=4) diff --git a/metaflow/plugins/argo/exit_hooks.py b/metaflow/plugins/argo/exit_hooks.py new file mode 100644 index 00000000000..849e7bbd9ee --- /dev/null +++ b/metaflow/plugins/argo/exit_hooks.py @@ -0,0 +1,243 @@ +from collections import defaultdict +import json +from typing import Callable, List + + +class JsonSerializable(object): + def to_json(self): + return self.payload + + def __str__(self): + return json.dumps(self.payload, indent=4) + + +class _LifecycleHook(JsonSerializable): + # https://argoproj.github.io/argo-workflows/fields/#lifecyclehook + + def __init__(self, name): + tree = lambda: defaultdict(tree) + self.name = name + self.payload = tree() + + def expression(self, expression): + self.payload["expression"] = str(expression) + return self + + def template(self, template): + self.payload["template"] = template + return self + + +class _Template(JsonSerializable): + # https://argoproj.github.io/argo-workflows/fields/#template + + def __init__(self, name): + tree = lambda: defaultdict(tree) + self.name = name + self.payload = tree() + self.payload["name"] = name + + def http(self, http): + self.payload["http"] = http.to_json() + return self + + def script(self, script): + self.payload["script"] = script.to_json() + return self + + def service_account_name(self, service_account_name): + self.payload["serviceAccountName"] = service_account_name + return self + + +class Hook(object): + """ + Abstraction for Argo Workflows exit hooks. + A hook consists of a Template, and one or more LifecycleHooks that trigger the template + """ + + template: "_Template" + lifecycle_hooks: List["_LifecycleHook"] + + +class _ScriptSpec(JsonSerializable): + # https://argo-workflows.readthedocs.io/en/latest/walk-through/scripts-and-results/ + + def __init__(self): + tree = lambda: defaultdict(tree) + self.payload = tree() + + def image(self, image): + self.payload["image"] = image + return self + + def command(self, command: List[str]): + self.payload["command"] = command + return self + + def source(self, source: str): + # encode the source as a oneliner due to json limitations + self.payload["source"] = json.dumps(source) + return self + + +class _HttpSpec(JsonSerializable): + # https://argoproj.github.io/argo-workflows/fields/#http + + def __init__(self, method): + tree = lambda: defaultdict(tree) + self.payload = tree() + self.payload["method"] = method + self.payload["headers"] = [] + + def header(self, header, value): + self.payload["headers"].append({"name": header, "value": value}) + return self + + def body(self, body): + self.payload["body"] = str(body) + return self + + def url(self, url): + self.payload["url"] = url + return self + + def success_condition(self, success_condition): + self.payload["successCondition"] = success_condition + return self + + def to_json(self): + return self.payload + + def __str__(self): + return json.dumps(self.payload, indent=4) + + +# HTTP hook +class HttpExitHook(Hook): + def __init__( + self, + name, + url, + method="GET", + headers=None, + body=None, + on_success=False, + on_error=False, + ): + self.template = _Template(name) + http = _HttpSpec(method).url(url) + if headers is not None: + for header, value in headers.items(): + http.header(header, value) + + if body is not None: + http.body(json.dumps(body)) + + self.template.http(http) + + self.lifecycle_hooks = [] + + if on_success and on_error: + raise Exception("Set only one of the on_success/on_error at a time.") + + if on_success: + self.lifecycle_hooks.append( + _LifecycleHook(name) + .expression("workflow.status == 'Succeeded'") + .template(self.template.name) + ) + + if on_error: + self.lifecycle_hooks.append( + _LifecycleHook(name) + .expression("workflow.status == 'Error'") + .template(self.template.name) + ) + self.lifecycle_hooks.append( + _LifecycleHook(f"{name}-failure") + .expression("workflow.status == 'Failure'") + .template(self.template.name) + ) + + if not on_success and not on_error: + # add an expressionless lifecycle hook + self.lifecycle_hooks.append(_LifecycleHook(name).template(name)) + + +class ExitHookHack(Hook): + # Warning: terrible hack to workaround a bug in Argo Workflow where the + # templates listed above do not execute unless there is an + # explicit exit hook. as and when this bug is patched, we should + # remove this effectively no-op template. + # Note: We use the Http template because changing this to an actual no-op container had the side-effect of + # leaving LifecycleHooks in a pending state even when they have finished execution. + def __init__( + self, + url, + headers=None, + body=None, + ): + self.template = _Template("exit-hook-hack") + http = _HttpSpec("GET").url(url) + if headers is not None: + for header, value in headers.items(): + http.header(header, value) + + if body is not None: + http.body(json.dumps(body)) + + http.success_condition("true == true") + + self.template.http(http) + + self.lifecycle_hooks = [] + + # add an expressionless lifecycle hook + self.lifecycle_hooks.append(_LifecycleHook("exit").template("exit-hook-hack")) + + +class ScriptHook(Hook): + def __init__( + self, + name: str, + source: str, + image: str = None, + language: str = "python", + on_success=False, + on_error=False, + ): + self.template = _Template(name) + script = _ScriptSpec().command([language]).source(source) + if image is not None: + script.image(image) + + self.template.script(script) + + self.lifecycle_hooks = [] + + if on_success and on_error: + raise Exception("Set only one of the on_success/on_error at a time.") + + if on_success: + self.lifecycle_hooks.append( + _LifecycleHook(name) + .expression("workflow.status == 'Succeeded'") + .template(self.template.name) + ) + + if on_error: + self.lifecycle_hooks.append( + _LifecycleHook(name) + .expression("workflow.status == 'Error'") + .template(self.template.name) + ) + self.lifecycle_hooks.append( + _LifecycleHook(f"{name}-failure") + .expression("workflow.status == 'Failure'") + .template(self.template.name) + ) + + if not on_success and not on_error: + # add an expressionless lifecycle hook + self.lifecycle_hooks.append(_LifecycleHook(name).template(name)) diff --git a/metaflow/plugins/argo/hook_decorator.py b/metaflow/plugins/argo/hook_decorator.py new file mode 100644 index 00000000000..77e742550f8 --- /dev/null +++ b/metaflow/plugins/argo/hook_decorator.py @@ -0,0 +1,46 @@ +from metaflow.decorators import FlowDecorator +import inspect + +from metaflow.exception import MetaflowException +from metaflow.plugins.argo.exit_hooks import ScriptHook + + +class RunOnFinishDecorator(FlowDecorator): + name = "run_on_finish" + allow_multiple = True + + defaults = { + "functions": [], + "image": None, + "language": "python", + "on_success": None, + "on_error": None, + } + + def flow_init( + self, flow, graph, environment, flow_datastore, metadata, logger, echo, options + ): + on_success = self.attributes["on_success"] + on_error = self.attributes["on_error"] + + if not on_success and not on_error: + raise MetaflowException("Choose one of the options on_success/on_error") + + prefix = "" + if on_success: + prefix = "success" + elif on_error: + prefix = "error" + + self.hooks = [] + for fn in self.attributes["functions"]: + self.hooks.append( + ScriptHook( + name=f"{prefix}-{fn.__name__}", + language=self.attributes["language"], + source=inspect.getsource(fn), + image=self.attributes["image"], + on_success=self.attributes["on_success"], + on_error=self.attributes["on_error"], + ) + )