From 780bd1887a57265fc35003ad02b858c312d9414c Mon Sep 17 00:00:00 2001 From: Sakari Ikonen Date: Wed, 4 Jun 2025 17:11:01 +0300 Subject: [PATCH 1/5] wip: initial working command for environment init --- metaflow/plugins/__init__.py | 1 + metaflow/plugins/pypi/environment_cli.py | 34 ++++++++++++++++++++++++ 2 files changed, 35 insertions(+) create mode 100644 metaflow/plugins/pypi/environment_cli.py diff --git a/metaflow/plugins/__init__.py b/metaflow/plugins/__init__.py index f74a70726ba..001d03e54a1 100644 --- a/metaflow/plugins/__init__.py +++ b/metaflow/plugins/__init__.py @@ -18,6 +18,7 @@ ("tag", ".tag_cli.cli"), ("spot-metadata", ".kubernetes.spot_metadata_cli.cli"), ("logs", ".logs_cli.cli"), + ("environment", ".pypi.environment_cli.cli"), ] # Add additional commands to the runner here diff --git a/metaflow/plugins/pypi/environment_cli.py b/metaflow/plugins/pypi/environment_cli.py new file mode 100644 index 00000000000..2703931b543 --- /dev/null +++ b/metaflow/plugins/pypi/environment_cli.py @@ -0,0 +1,34 @@ +from metaflow._vendor import click +from metaflow.cli import LOGGER_TIMESTAMP + + +@click.group() +def cli(): + pass + + +@cli.group(help="Commands related to logs") +@click.pass_context +def environment(ctx): + # the logger is configured in cli.py + global echo + echo = ctx.obj.echo + + +@environment.command(help="Rebuild the environment") +@click.option( + "--step", + "steps", + multiple=True, + default=[], + help="Steps to rebuild the environment", +) +@click.pass_obj +def rebuild(obj, steps): + + steps = list(steps) + print(steps, type(steps)) + + print(obj.flow) + print(obj.environment) + obj.environment.init_environment(echo, steps) From 8adcea2e7b340247b1d341289d55af91b488b589 Mon Sep 17 00:00:00 2001 From: Sakari Ikonen Date: Wed, 4 Jun 2025 17:48:32 +0300 Subject: [PATCH 2/5] add force_rebuild flag to conda environment --- metaflow/plugins/pypi/conda_environment.py | 13 +++++++++---- metaflow/plugins/pypi/environment_cli.py | 2 +- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/metaflow/plugins/pypi/conda_environment.py b/metaflow/plugins/pypi/conda_environment.py index 75a954a3023..6298a02e5fe 100644 --- a/metaflow/plugins/pypi/conda_environment.py +++ b/metaflow/plugins/pypi/conda_environment.py @@ -32,6 +32,7 @@ def __init__(self, msg): class CondaEnvironment(MetaflowEnvironment): TYPE = "conda" _filecache = None + _force_rebuild = False def __init__(self, flow): self.flow = flow @@ -74,7 +75,8 @@ def wrapper(*args, **kwargs): micromamba = Micromamba(self.logger) self.solvers = {"conda": micromamba, "pypi": Pip(micromamba, self.logger)} - def init_environment(self, echo, only_steps=None): + def init_environment(self, echo, only_steps=None, force_rebuild=False): + self._force_rebuild = force_rebuild # The implementation optimizes for latency to ensure as many operations can # be turned into cheap no-ops as feasible. Otherwise, we focus on maintaining # a balance between latency and maintainability of code without re-implementing @@ -107,7 +109,10 @@ def solve(id_, environment, type_): return ( id_, ( - self.read_from_environment_manifest([id_, platform, type_]) + ( + not self._force_rebuild + and self.read_from_environment_manifest([id_, platform, type_]) + ) or self.write_to_environment_manifest( [id_, platform, type_], self.solvers[type_].solve(id_, **environment), @@ -153,7 +158,7 @@ def _path(url, local_path): _meta = copy.deepcopy(local_packages) for id_, packages, _, _ in results: for package in packages: - if package.get("path"): + if package.get("path") and not self._force_rebuild: # Cache only those packages that manifest is unaware of local_packages.pop(package["url"], None) else: @@ -186,7 +191,7 @@ def _path(url, local_path): storage.save_bytes( list_of_path_and_filehandle, len_hint=len(list_of_path_and_filehandle), - # overwrite=True, + overwrite=self._force_rebuild, ) for id_, packages, _, platform in results: if id_ in dirty: diff --git a/metaflow/plugins/pypi/environment_cli.py b/metaflow/plugins/pypi/environment_cli.py index 2703931b543..9a0b6dba668 100644 --- a/metaflow/plugins/pypi/environment_cli.py +++ b/metaflow/plugins/pypi/environment_cli.py @@ -31,4 +31,4 @@ def rebuild(obj, steps): print(obj.flow) print(obj.environment) - obj.environment.init_environment(echo, steps) + obj.environment.init_environment(echo, only_steps=steps, force_rebuild=True) From d019e146fe97f35ab0d1d32c30086df94b4b75ea Mon Sep 17 00:00:00 2001 From: Sakari Ikonen Date: Wed, 4 Jun 2025 17:51:42 +0300 Subject: [PATCH 3/5] cleanup --- metaflow/plugins/pypi/environment_cli.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/metaflow/plugins/pypi/environment_cli.py b/metaflow/plugins/pypi/environment_cli.py index 9a0b6dba668..296dde74501 100644 --- a/metaflow/plugins/pypi/environment_cli.py +++ b/metaflow/plugins/pypi/environment_cli.py @@ -25,10 +25,7 @@ def environment(ctx): ) @click.pass_obj def rebuild(obj, steps): - + # possibly limiting steps to rebuild steps = list(steps) - print(steps, type(steps)) - print(obj.flow) - print(obj.environment) obj.environment.init_environment(echo, only_steps=steps, force_rebuild=True) From b1ade4e414270e9eb447a50123edc06dee030c12 Mon Sep 17 00:00:00 2001 From: Sakari Ikonen Date: Thu, 5 Jun 2025 00:19:23 +0300 Subject: [PATCH 4/5] add deleting of existing environments --- metaflow/plugins/pypi/conda_environment.py | 30 +++++++++++++++++----- metaflow/plugins/pypi/environment_cli.py | 21 +++++++++++---- 2 files changed, 40 insertions(+), 11 deletions(-) diff --git a/metaflow/plugins/pypi/conda_environment.py b/metaflow/plugins/pypi/conda_environment.py index 6298a02e5fe..962eddbbb90 100644 --- a/metaflow/plugins/pypi/conda_environment.py +++ b/metaflow/plugins/pypi/conda_environment.py @@ -5,6 +5,7 @@ import io import json import os +import shutil import tarfile import threading from concurrent.futures import ThreadPoolExecutor, as_completed @@ -32,7 +33,7 @@ def __init__(self, msg): class CondaEnvironment(MetaflowEnvironment): TYPE = "conda" _filecache = None - _force_rebuild = False + _disable_cache = False def __init__(self, flow): self.flow = flow @@ -75,8 +76,7 @@ def wrapper(*args, **kwargs): micromamba = Micromamba(self.logger) self.solvers = {"conda": micromamba, "pypi": Pip(micromamba, self.logger)} - def init_environment(self, echo, only_steps=None, force_rebuild=False): - self._force_rebuild = force_rebuild + def init_environment(self, echo, only_steps=None): # The implementation optimizes for latency to ensure as many operations can # be turned into cheap no-ops as feasible. Otherwise, we focus on maintaining # a balance between latency and maintainability of code without re-implementing @@ -110,7 +110,7 @@ def solve(id_, environment, type_): id_, ( ( - not self._force_rebuild + not self._disable_cache and self.read_from_environment_manifest([id_, platform, type_]) ) or self.write_to_environment_manifest( @@ -158,7 +158,7 @@ def _path(url, local_path): _meta = copy.deepcopy(local_packages) for id_, packages, _, _ in results: for package in packages: - if package.get("path") and not self._force_rebuild: + if package.get("path") and not self._disable_cache: # Cache only those packages that manifest is unaware of local_packages.pop(package["url"], None) else: @@ -191,7 +191,7 @@ def _path(url, local_path): storage.save_bytes( list_of_path_and_filehandle, len_hint=len(list_of_path_and_filehandle), - overwrite=self._force_rebuild, + overwrite=self._disable_cache, ) for id_, packages, _, platform in results: if id_ in dirty: @@ -295,6 +295,9 @@ def pypi_solve(env): self.logger("Virtual environment(s) bootstrapped!") + def disable_cache(self): + self._disable_cache = True + def executable(self, step_name, default=None): step = next((step for step in self.flow if step.name == step_name), None) if step is None: @@ -324,6 +327,21 @@ def is_disabled(self, step): return str(disabled).lower() == "true" return False + def delete_environment(self, step): + env = self.get_environment(step) + paths = [] + for solver in self.solvers.keys(): + if solver not in env: + continue + for platform in env[solver].get("platforms", [None]): + paths.append( + self.solvers[solver].path_to_environment(env["id_"], platform) + ) + + # delete collected paths + for path in paths: + shutil.rmtree(path, ignore_errors=True) + @functools.lru_cache(maxsize=None) def get_environment(self, step): environment = {} diff --git a/metaflow/plugins/pypi/environment_cli.py b/metaflow/plugins/pypi/environment_cli.py index 296dde74501..87247833b03 100644 --- a/metaflow/plugins/pypi/environment_cli.py +++ b/metaflow/plugins/pypi/environment_cli.py @@ -1,5 +1,5 @@ from metaflow._vendor import click -from metaflow.cli import LOGGER_TIMESTAMP +from metaflow.exception import MetaflowException @click.group() @@ -7,7 +7,7 @@ def cli(): pass -@cli.group(help="Commands related to logs") +@cli.group(help="Commands related to managing the conda/pypi environments") @click.pass_context def environment(ctx): # the logger is configured in cli.py @@ -25,7 +25,18 @@ def environment(ctx): ) @click.pass_obj def rebuild(obj, steps): - # possibly limiting steps to rebuild - steps = list(steps) + # possibly limiting steps to rebuild. make sure its a list and not a tuple + step_names = list(steps) - obj.environment.init_environment(echo, only_steps=steps, force_rebuild=True) + steps = [step for step in obj.flow if (step.name in step_names) or not step_names] + + # Delete existing environments + for step in steps: + obj.environment.delete_environment(step) + + if not hasattr(obj.environment, "disable_cache"): + raise MetaflowException("The environment does not support disabling the cache.") + + # Disable the cache before initializing + obj.environment.disable_cache() + obj.environment.init_environment(echo, only_steps=step_names) From 1022cdbe5a8c38c475a3be913bcbe22511f4efb0 Mon Sep 17 00:00:00 2001 From: Sakari Ikonen Date: Thu, 5 Jun 2025 13:00:24 +0300 Subject: [PATCH 5/5] rename cmd to resolve and add --force flag --- metaflow/plugins/pypi/environment_cli.py | 26 ++++++++++++++++-------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/metaflow/plugins/pypi/environment_cli.py b/metaflow/plugins/pypi/environment_cli.py index 87247833b03..0a08629f298 100644 --- a/metaflow/plugins/pypi/environment_cli.py +++ b/metaflow/plugins/pypi/environment_cli.py @@ -15,28 +15,36 @@ def environment(ctx): echo = ctx.obj.echo -@environment.command(help="Rebuild the environment") +@environment.command(help="Resolve the environment(s)") @click.option( "--step", "steps", multiple=True, default=[], - help="Steps to rebuild the environment", + help="Steps to resolve the environment for", +) +@click.option( + "--force/--no-force", + default=False, + is_flag=True, + help="Force re-resolving the environment(s)", ) @click.pass_obj -def rebuild(obj, steps): - # possibly limiting steps to rebuild. make sure its a list and not a tuple +def resolve(obj, steps, force=False): + # possibly limiting steps to resolve. make sure its a list and not a tuple step_names = list(steps) steps = [step for step in obj.flow if (step.name in step_names) or not step_names] - # Delete existing environments - for step in steps: - obj.environment.delete_environment(step) + # Delete existing environments if we are rebuilding. + if force: + for step in steps: + obj.environment.delete_environment(step) if not hasattr(obj.environment, "disable_cache"): raise MetaflowException("The environment does not support disabling the cache.") - # Disable the cache before initializing - obj.environment.disable_cache() + # Disable the cache before initializing if we are rebuilding. + if force: + obj.environment.disable_cache() obj.environment.init_environment(echo, only_steps=step_names)