Skip to content

Feature: Support spinning a single metaflow step #2444

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 46 additions & 6 deletions metaflow/cli.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
import functools
import inspect
import sys
Expand All @@ -23,6 +24,8 @@
DEFAULT_METADATA,
DEFAULT_MONITOR,
DEFAULT_PACKAGE_SUFFIXES,
DATASTORE_SYSROOT_SPIN,
DATASTORE_LOCAL_DIR,
)
from .metaflow_current import current
from metaflow.system import _system_monitor, _system_logger
Expand Down Expand Up @@ -114,6 +117,8 @@ def logger(body="", system_msg=False, head="", bad=False, timestamp=True, nl=Tru
"step": "metaflow.cli_components.step_cmd.step",
"run": "metaflow.cli_components.run_cmds.run",
"resume": "metaflow.cli_components.run_cmds.resume",
"spin": "metaflow.cli_components.run_cmds.spin",
"spin-step": "metaflow.cli_components.step_cmd.spin_step",
},
)
def cli(ctx):
Expand Down Expand Up @@ -440,14 +445,10 @@ def start(
ctx.obj.event_logger = LOGGING_SIDECARS[event_logger](
flow=ctx.obj.flow, env=ctx.obj.environment
)
ctx.obj.event_logger.start()
_system_logger.init_system_logger(ctx.obj.flow.name, ctx.obj.event_logger)

ctx.obj.monitor = MONITOR_SIDECARS[monitor](
flow=ctx.obj.flow, env=ctx.obj.environment
)
ctx.obj.monitor.start()
_system_monitor.init_system_monitor(ctx.obj.flow.name, ctx.obj.monitor)

ctx.obj.metadata = [m for m in METADATA_PROVIDERS if m.TYPE == metadata][0](
ctx.obj.environment, ctx.obj.flow, ctx.obj.event_logger, ctx.obj.monitor
Expand All @@ -462,6 +463,45 @@ def start(
)

ctx.obj.config_options = config_options
ctx.obj.is_spin = False

# Override values for spin
if hasattr(ctx, "saved_args") and ctx.saved_args and "spin" in ctx.saved_args[0]:
# To minimize side-effects for spin, we will only use the following:
# - local metadata provider,
# - local datastore,
# - local environment,
# - null event logger,
# - null monitor
ctx.obj.is_spin = True
ctx.obj.event_logger = LOGGING_SIDECARS["nullSidecarLogger"](
flow=ctx.obj.flow, env=ctx.obj.environment
)
ctx.obj.monitor = MONITOR_SIDECARS["nullSidecarMonitor"](
flow=ctx.obj.flow, env=ctx.obj.environment
)
ctx.obj.metadata = [m for m in METADATA_PROVIDERS if m.TYPE == "local"][0](
ctx.obj.environment, ctx.obj.flow, ctx.obj.event_logger, ctx.obj.monitor
)
ctx.obj.datastore_impl = [d for d in DATASTORES if d.TYPE == "local"][0]
# Set datastore_root to be DATASTORE_SYSROOT_SPIN if not provided
datastore_root = os.path.join(DATASTORE_SYSROOT_SPIN, DATASTORE_LOCAL_DIR)
ctx.obj.datastore_impl.datastore_root = datastore_root
ctx.obj.flow_datastore = FlowDataStore(
ctx.obj.flow.name,
ctx.obj.environment, # Same environment as run/resume
ctx.obj.metadata, # local metadata
ctx.obj.event_logger, # null event logger
ctx.obj.monitor, # null monitor
storage_impl=ctx.obj.datastore_impl,
)

# Start event logger and monitor
ctx.obj.event_logger.start()
_system_logger.init_system_logger(ctx.obj.flow.name, ctx.obj.event_logger)

ctx.obj.monitor.start()
_system_monitor.init_system_monitor(ctx.obj.flow.name, ctx.obj.monitor)

decorators._init(ctx.obj.flow)

Expand All @@ -478,7 +518,7 @@ def start(
deco_options,
)

# In the case of run/resume, we will want to apply the TL decospecs
# In the case of run/resume/spin, we will want to apply the TL decospecs
# *after* the run decospecs so that they don't take precedence. In other
# words, for the same decorator, we want `myflow.py run --with foo` to
# take precedence over any other `foo` decospec
Expand Down Expand Up @@ -506,7 +546,7 @@ def start(
if (
hasattr(ctx, "saved_args")
and ctx.saved_args
and ctx.saved_args[0] not in ("run", "resume")
and ctx.saved_args[0] not in ("run", "resume", "spin")
):
# run/resume are special cases because they can add more decorators with --with,
# so they have to take care of themselves.
Expand Down
148 changes: 131 additions & 17 deletions metaflow/cli_components/run_cmds.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from ..metaflow_current import current
from ..metaflow_config import DEFAULT_DECOSPECS
from ..package import MetaflowPackage
from ..runtime import NativeRuntime
from ..runtime import NativeRuntime, SpinRuntime
from ..system import _system_logger

from ..tagging_util import validate_tags
Expand All @@ -20,7 +20,7 @@
def before_run(obj, tags, decospecs):
validate_tags(tags)

# There's a --with option both at the top-level and for the run
# There's a --with option both at the top-level and for the run/resume/spin
# subcommand. Why?
#
# "run --with shoes" looks so much better than "--with shoes run".
Expand All @@ -40,7 +40,7 @@ def before_run(obj, tags, decospecs):
+ list(obj.environment.decospecs() or [])
)
if all_decospecs:
# These decospecs are the ones from run/resume PLUS the ones from the
# These decospecs are the ones from run/resume/spin PLUS the ones from the
# environment (for example the @conda)
decorators._attach_decorators(obj.flow, all_decospecs)
decorators._init(obj.flow)
Expand All @@ -65,6 +65,29 @@ def before_run(obj, tags, decospecs):
)


def common_runner_options(func):
@click.option(
"--run-id-file",
default=None,
show_default=True,
type=str,
help="Write the ID of this run to the file specified.",
)
@click.option(
"--runner-attribute-file",
default=None,
show_default=True,
type=str,
help="Write the metadata and pathspec of this run to the file specified. Used internally "
"for Metaflow's Runner API.",
)
@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)

return wrapper


def write_file(file_path, content):
if file_path is not None:
with open(file_path, "w", encoding="utf-8") as f:
Expand Down Expand Up @@ -129,20 +152,6 @@ def common_run_options(func):
"in steps.",
callback=config_callback,
)
@click.option(
"--run-id-file",
default=None,
show_default=True,
type=str,
help="Write the ID of this run to the file specified.",
)
@click.option(
"--runner-attribute-file",
default=None,
show_default=True,
type=str,
help="Write the metadata and pathspec of this run to the file specified. Used internally for Metaflow's Runner API.",
)
@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
Expand Down Expand Up @@ -187,6 +196,7 @@ def wrapper(*args, **kwargs):
@click.command(help="Resume execution of a previous run of this flow.")
@tracing.cli("cli/resume")
@common_run_options
@common_runner_options
@click.pass_obj
def resume(
obj,
Expand Down Expand Up @@ -305,6 +315,7 @@ def resume(
@click.command(help="Run the workflow locally.")
@tracing.cli("cli/run")
@common_run_options
@common_runner_options
@click.option(
"--namespace",
"user_namespace",
Expand Down Expand Up @@ -380,3 +391,106 @@ def run(
)
with runtime.run_heartbeat():
runtime.execute()


@click.command(help="Spins up a task for a given step from a previous run locally.")
@click.argument("step-name")
@click.option(
"--spin-pathspec",
default=None,
type=str,
help="Use specified task pathspec from a previous run to spin up the step.",
)
@click.option(
"--skip-decorators/--no-skip-decorators",
is_flag=True,
default=False,
show_default=True,
help="Skip decorators attached to the step.",
)
@click.option(
"--artifacts-module",
default=None,
show_default=True,
help="Path to a module that contains artifacts to be used in the spun step. The artifacts should "
"be defined as a dictionary called ARTIFACTS with keys as the artifact names and values as the "
"artifact values. The artifact values will overwrite the default values of the artifacts used in "
"the spun step.",
)
@click.option(
"--persist/--no-persist",
"persist",
default=True,
show_default=True,
help="Whether to persist the artifacts in the spun step. If set to false, the artifacts will not"
" be persisted and will not be available in the spun step's datastore.",
)
@click.option(
"--max-log-size",
default=10,
show_default=True,
help="Maximum size of stdout and stderr captured in "
"megabytes. If a step outputs more than this to "
"stdout/stderr, its output will be truncated.",
)
@common_runner_options
@click.pass_obj
def spin(
obj,
step_name,
spin_pathspec=None,
persist=True,
artifacts_module=None,
skip_decorators=False,
max_log_size=None,
run_id_file=None,
runner_attribute_file=None,
**kwargs
):
before_run(obj, [], [])
obj.echo(f"Spinning up step *{step_name}* locally for flow *{obj.flow.name}*")
obj.flow._set_constants(obj.graph, kwargs, obj.config_options)
step_func = getattr(obj.flow, step_name)

spin_runtime = SpinRuntime(
obj.flow,
obj.graph,
obj.flow_datastore,
obj.metadata,
obj.environment,
obj.package,
obj.logger,
obj.entrypoint,
obj.event_logger,
obj.monitor,
step_func,
step_name,
spin_pathspec,
skip_decorators,
artifacts_module,
persist,
max_log_size * 1024 * 1024,
)

write_latest_run_id(obj, spin_runtime.run_id)
write_file(run_id_file, spin_runtime.run_id)

# datastore_root is os.path.join(DATASTORE_SYSROOT_SPIN, DATASTORE_LOCAL_DIR)
# We only neeed the root for the metadata, i.e. the portion before DATASTORE_LOCAL_DIR
datastore_root = spin_runtime._flow_datastore._storage_impl.datastore_root
spin_metadata_root = datastore_root.rsplit("/", 1)[0]
spin_runtime.execute()

if runner_attribute_file:
with open(runner_attribute_file, "w") as f:
json.dump(
{
"task_id": spin_runtime.task.task_id,
"step_name": step_name,
"run_id": spin_runtime.run_id,
"flow_name": obj.flow.name,
# Store metadata in a format that can be used by the Runner API
"metadata": f"{obj.metadata.__class__.TYPE}@{spin_metadata_root}",
},
f,
)
Loading