Skip to content

feature: allow rebuilding conda/pypi environments #2442

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions metaflow/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
("tag", ".tag_cli.cli"),
("spot-metadata", ".kubernetes.spot_metadata_cli.cli"),
("logs", ".logs_cli.cli"),
("environment", ".pypi.environment_cli.cli"),
]

# Add additional commands to the runner here
Expand Down
29 changes: 26 additions & 3 deletions metaflow/plugins/pypi/conda_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io
import json
import os
import shutil
import tarfile
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
Expand Down Expand Up @@ -32,6 +33,7 @@ def __init__(self, msg):
class CondaEnvironment(MetaflowEnvironment):
TYPE = "conda"
_filecache = None
_disable_cache = False

def __init__(self, flow):
self.flow = flow
Expand Down Expand Up @@ -107,7 +109,10 @@ def solve(id_, environment, type_):
return (
id_,
(
self.read_from_environment_manifest([id_, platform, type_])
(
not self._disable_cache
and self.read_from_environment_manifest([id_, platform, type_])
)
or self.write_to_environment_manifest(
[id_, platform, type_],
self.solvers[type_].solve(id_, **environment),
Expand Down Expand Up @@ -153,7 +158,7 @@ def _path(url, local_path):
_meta = copy.deepcopy(local_packages)
for id_, packages, _, _ in results:
for package in packages:
if package.get("path"):
if package.get("path") and not self._disable_cache:
# Cache only those packages that manifest is unaware of
local_packages.pop(package["url"], None)
else:
Expand Down Expand Up @@ -186,7 +191,7 @@ def _path(url, local_path):
storage.save_bytes(
list_of_path_and_filehandle,
len_hint=len(list_of_path_and_filehandle),
# overwrite=True,
overwrite=self._disable_cache,
)
for id_, packages, _, platform in results:
if id_ in dirty:
Expand Down Expand Up @@ -290,6 +295,9 @@ def pypi_solve(env):

self.logger("Virtual environment(s) bootstrapped!")

def disable_cache(self):
self._disable_cache = True

def executable(self, step_name, default=None):
step = next((step for step in self.flow if step.name == step_name), None)
if step is None:
Expand Down Expand Up @@ -319,6 +327,21 @@ def is_disabled(self, step):
return str(disabled).lower() == "true"
return False

def delete_environment(self, step):
env = self.get_environment(step)
paths = []
for solver in self.solvers.keys():
if solver not in env:
continue
for platform in env[solver].get("platforms", [None]):
paths.append(
self.solvers[solver].path_to_environment(env["id_"], platform)
)

# delete collected paths
for path in paths:
shutil.rmtree(path, ignore_errors=True)

@functools.lru_cache(maxsize=None)
def get_environment(self, step):
environment = {}
Expand Down
50 changes: 50 additions & 0 deletions metaflow/plugins/pypi/environment_cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from metaflow._vendor import click
from metaflow.exception import MetaflowException


@click.group()
def cli():
pass


@cli.group(help="Commands related to managing the conda/pypi environments")
@click.pass_context
def environment(ctx):
# the logger is configured in cli.py
global echo
echo = ctx.obj.echo


@environment.command(help="Resolve the environment(s)")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the other plugin uses those as argument resolve start end mid isntead of --step start --start end.

@click.option(
"--step",
"steps",
multiple=True,
default=[],
help="Steps to resolve the environment for",
)
@click.option(
"--force/--no-force",
default=False,
is_flag=True,
help="Force re-resolving the environment(s)",
)
@click.pass_obj
def resolve(obj, steps, force=False):
# possibly limiting steps to resolve. make sure its a list and not a tuple
step_names = list(steps)

steps = [step for step in obj.flow if (step.name in step_names) or not step_names]

# Delete existing environments if we are rebuilding.
if force:
for step in steps:
obj.environment.delete_environment(step)

if not hasattr(obj.environment, "disable_cache"):
raise MetaflowException("The environment does not support disabling the cache.")

# Disable the cache before initializing if we are rebuilding.
if force:
obj.environment.disable_cache()
obj.environment.init_environment(echo, only_steps=step_names)
Loading