diff --git a/metaflow/cli.py b/metaflow/cli.py index d24829e6db6..61b76aa4830 100644 --- a/metaflow/cli.py +++ b/metaflow/cli.py @@ -15,6 +15,7 @@ from .exception import CommandException, MetaflowException from .flowspec import _FlowState from .graph import FlowGraph +from .meta_files import read_included_dist_info from .metaflow_config import ( DEFAULT_DATASTORE, DEFAULT_DECOSPECS, @@ -27,6 +28,7 @@ from .metaflow_current import current from metaflow.system import _system_monitor, _system_logger from .metaflow_environment import MetaflowEnvironment +from .package.mfenv import PackagedDistributionFinder from .plugins import ( DATASTORES, ENVIRONMENTS, @@ -326,6 +328,11 @@ def start( echo(" executing *%s*" % ctx.obj.flow.name, fg="magenta", nl=False) echo(" for *%s*" % resolve_identity(), fg="magenta") + # Check if we need to setup the distribution finder (if running ) + dist_info = read_included_dist_info() + if dist_info: + sys.meta_path.append(PackagedDistributionFinder(dist_info)) + # Setup the context cli_args._set_top_kwargs(ctx.params) ctx.obj.echo = echo diff --git a/metaflow/client/core.py b/metaflow/client/core.py index 4edbcdac00c..8f52eb746de 100644 --- a/metaflow/client/core.py +++ b/metaflow/client/core.py @@ -32,11 +32,13 @@ from metaflow.includefile import IncludedFile from metaflow.metaflow_config import DEFAULT_METADATA, MAX_ATTEMPTS from metaflow.metaflow_environment import MetaflowEnvironment +from metaflow.meta_files import MFCONF_DIR, MFENV_DIR +from metaflow.package.mfenv import MFEnv from metaflow.plugins import ENVIRONMENTS, METADATA_PROVIDERS +from metaflow.meta_files import MetaFile from metaflow.unbounded_foreach import CONTROL_TASK_TAG from metaflow.util import cached_property, is_stringish, resolve_identity, to_unicode -from ..info_file import INFO_FILE from .filecache import FileCache if TYPE_CHECKING: @@ -824,9 +826,8 @@ def __init__(self, flow_name: str, code_package: str): ) code_obj = BytesIO(blobdata) self._tar = tarfile.open(fileobj=code_obj, mode="r:gz") - # The JSON module in Python3 deals with Unicode. Tar gives bytes. - info_str = ( - self._tar.extractfile(os.path.basename(INFO_FILE)).read().decode("utf-8") + info_str = MFEnv.get_archive_content(self._tar, MetaFile.INFO_FILE).decode( + encoding="utf-8" ) self._info = json.loads(info_str) self._flowspec = self._tar.extractfile(self._info["script"]).read() @@ -917,6 +918,9 @@ def extract(self) -> TemporaryDirectory: # This file is created when using the conda/pypi features available in # nflx-metaflow-extensions: https://github.com/Netflix/metaflow-nflx-extensions "condav2-1.cnd", + # Going forward, we only need to exclude MFENV_DIR and MFCONF_DIR + MFENV_DIR, + MFCONF_DIR, ] members = [ m diff --git a/metaflow/decorators.py b/metaflow/decorators.py index 7c05bdcb312..22ac406bfbe 100644 --- a/metaflow/decorators.py +++ b/metaflow/decorators.py @@ -14,6 +14,7 @@ from .parameters import current_flow from .user_configs.config_decorators import CustomStepDecorator from .user_configs.config_parameters import ( + ConfigValue, UNPACK_KEY, resolve_delayed_evaluator, unpack_delayed_evaluator, @@ -21,11 +22,6 @@ from metaflow._vendor import click -try: - unicode -except NameError: - unicode = str - basestring = str # Contains the decorators on which _init was called. We want to ensure it is called # only once on each decorator and, as the _init() function below can be called in @@ -189,7 +185,7 @@ def make_decorator_spec(self): # escaping but for more complex types (typically dictionaries or lists), # we dump using JSON. for k, v in attrs.items(): - if isinstance(v, (int, float, unicode, basestring)): + if isinstance(v, (int, float, str)): attr_list.append("%s=%s" % (k, str(v))) else: attr_list.append("%s=%s" % (k, json.dumps(v).replace('"', '\\"'))) @@ -315,15 +311,26 @@ def package_init(self, flow, step_name, environment): def add_to_package(self): """ - Called to add custom packages needed for a decorator. This hook will be + Called to add custom files needed for a decorator. This hook will be called in the `MetaflowPackage` class where metaflow compiles the code package - tarball. This hook is invoked in the `MetaflowPackage`'s `path_tuples` - function. The `path_tuples` function is a generator that yields a tuple of - `(file_path, arcname)`.`file_path` is the path of the file in the local file system; - the `arcname` is the path of the file in the constructed tarball or the path of the file - after decompressing the tarball. - - Returns a list of tuples where each tuple represents (file_path, arcname) + tarball. This hook can return one of two things: + - a generator yielding a tuple of `(file_path, arcname)` to add files to + the code package. `file_path` is the path to the file on the local filesystem + and `arcname` is the path relative to the packaged code. + - a generator yielding a tuple of `(content, arcname, type)` where: + - type is a AddToPackageType + - for CODE_FILE: + - content: path to the file to include + - arcname: path relative to the code directory in the package + - for CODE_MODULE: + - content: name of the module + - arcame: None (ignored) + - for CONFIG_FILE: + - content: path to the file to include + - arcname: path relative to the config directory in the package + - for CONFIG_CONTENT: + - content: bytes to include + - arcname: path relative to the config directory in the package """ return [] @@ -685,12 +692,8 @@ def foo(self): f.is_step = True f.decorators = [] f.config_decorators = [] - try: - # python 3 - f.name = f.__name__ - except: - # python 2 - f.name = f.__func__.func_name + f.wrappers = [] + f.name = f.__name__ return f diff --git a/metaflow/extension_support/__init__.py b/metaflow/extension_support/__init__.py index 1173a6af71e..a45161ef15f 100644 --- a/metaflow/extension_support/__init__.py +++ b/metaflow/extension_support/__init__.py @@ -12,7 +12,7 @@ from itertools import chain from pathlib import Path -from metaflow.info_file import read_info_file +from metaflow.meta_files import read_info_file # diff --git a/metaflow/extension_support/_empty_file.py b/metaflow/extension_support/_empty_file.py index d59e1556ddb..dbdcba34c17 100644 --- a/metaflow/extension_support/_empty_file.py +++ b/metaflow/extension_support/_empty_file.py @@ -1,2 +1,2 @@ -# This file serves as a __init__.py for metaflow_extensions when it is packaged -# and needs to remain empty. +# This file serves as a __init__.py for metaflow_extensions or metaflow +# packages when they are packaged and needs to remain empty. diff --git a/metaflow/info_file.py b/metaflow/info_file.py deleted file mode 100644 index 6d56a6152ba..00000000000 --- a/metaflow/info_file.py +++ /dev/null @@ -1,25 +0,0 @@ -import json - -from os import path - -CURRENT_DIRECTORY = path.dirname(path.abspath(__file__)) -INFO_FILE = path.join(path.dirname(CURRENT_DIRECTORY), "INFO") - -_info_file_content = None -_info_file_present = None - - -def read_info_file(): - global _info_file_content - global _info_file_present - if _info_file_present is None: - _info_file_present = path.exists(INFO_FILE) - if _info_file_present: - try: - with open(INFO_FILE, "r", encoding="utf-8") as contents: - _info_file_content = json.load(contents) - except IOError: - pass - if _info_file_present: - return _info_file_content - return None diff --git a/metaflow/meta_files.py b/metaflow/meta_files.py new file mode 100644 index 00000000000..fd5c3c0aa16 --- /dev/null +++ b/metaflow/meta_files.py @@ -0,0 +1,132 @@ +import json +import os + +from enum import Enum +from typing import Any, Dict, Optional, Union + +from .util import get_metaflow_root + +_info_file_content = None +_info_file_present = None +_included_dist_info = None +_included_dist_present = None + +# Ideally these would be in package/mfenv.py but that would cause imports to fail so +# moving here. The reason is that this is needed to read extension information which needs +# to happen before mfenv gets packaged. + +MFENV_DIR = ( + ".mfenv" # Directory containing "system" code (metaflow and user dependencies) +) +MFCONF_DIR = ".mfconf" # Directory containing Metaflow's configuration files +MFENV_MARKER = ( + ".mfenv_install" # Special file containing metadata about how Metaflow is packaged +) + + +class MetaFile(Enum): + INFO_FILE = "INFO" + CONFIG_FILE = "CONFIG_PARAMETERS" + INCLUDED_DIST_INFO = "INCLUDED_DIST_INFO" + + +def meta_file_name(name: Union[MetaFile, str]) -> str: + if isinstance(name, MetaFile): + return name.value + return name + + +def generic_get_filename( + name: Union[MetaFile, str], is_meta: Optional[bool] = None +) -> Optional[str]: + # We are not in a MFEnv package (so this is an old style package). Everything + # is at metaflow root. There is no distinction between code and config. + real_name = meta_file_name(name) + + path_to_file = os.path.join(get_metaflow_root(), real_name) + if os.path.isfile(path_to_file): + return path_to_file + return None + + +def v1_get_filename( + name: Union[MetaFile, str], + meta_info: Dict[str, Any], + is_meta: Optional[bool] = None, +) -> Optional[str]: + if is_meta is None: + is_meta = isinstance(name, MetaFile) + if is_meta: + conf_dir = meta_info.get("conf_dir") + if conf_dir is None: + raise ValueError( + "Invalid package -- package info does not contain conf_dir key" + ) + return os.path.join(conf_dir, meta_file_name(name)) + # Not meta -- so code + code_dir = meta_info.get("code_dir") + if code_dir is None: + raise ValueError( + "Invalid package -- package info does not contain code_dir key" + ) + return os.path.join(code_dir, meta_file_name(name)) + + +get_filname_map = {1: v1_get_filename} + + +def get_filename( + name: Union[MetaFile, str], is_meta: Optional[bool] = None +) -> Optional[str]: + if os.path.exists(os.path.join(get_metaflow_root(), MFENV_MARKER)): + with open( + os.path.join(get_metaflow_root(), MFENV_MARKER), "r", encoding="utf-8" + ) as f: + meta_info = json.load(f) + version = meta_info.get("version") + if version not in get_filname_map: + raise ValueError( + "Unsupported packaging version '%s'. Please update Metaflow" % version + ) + return get_filname_map[version](name, meta_info, is_meta) + return generic_get_filename(name, is_meta) + + +def read_info_file(): + # The info file is a bit special because it needs to be read to determine what + # extensions to load. We need to therefore not load anything yet. This explains + # the slightly wheird code structure where there is a bit of the file loading logic + # here that is then used in MFEnv (it logically belongs in MFEnv but that file can't + # be loaded just yet). + global _info_file_content + global _info_file_present + + if _info_file_present is None: + info_filename = get_filename(MetaFile.INFO_FILE) + if info_filename is not None: + with open(info_filename, "r", encoding="utf-8") as f: + _info_file_content = json.load(f) + _info_file_present = True + else: + _info_file_present = False + if _info_file_present: + return _info_file_content + return None + + +def read_included_dist_info(): + global _included_dist_info + global _included_dist_present + + from metaflow.package.mfenv import MFEnv + + if _included_dist_present is None: + c = MFEnv.get_content(MetaFile.INCLUDED_DIST_INFO) + if c is not None: + _included_dist_info = json.loads(c.decode("utf-8")) + _included_dist_present = True + else: + _included_dist_present = False + if _included_dist_present: + return _included_dist_info + return None diff --git a/metaflow/metaflow_config.py b/metaflow/metaflow_config.py index 2a1a84eeb25..9e87f8a279e 100644 --- a/metaflow/metaflow_config.py +++ b/metaflow/metaflow_config.py @@ -460,6 +460,7 @@ "stubgen", "userconf", "conda", + "package", ] for typ in DEBUG_OPTIONS: diff --git a/metaflow/metaflow_environment.py b/metaflow/metaflow_environment.py index 354d21a8011..f71c83fe799 100644 --- a/metaflow/metaflow_environment.py +++ b/metaflow/metaflow_environment.py @@ -8,6 +8,8 @@ from metaflow.exception import MetaflowException from metaflow.extension_support import dump_module_info from metaflow.mflog import BASH_MFLOG, BASH_FLUSH_LOGS + +from .meta_files import MFENV_DIR from . import R @@ -49,8 +51,26 @@ def bootstrap_commands(self, step_name, datastore_type): def add_to_package(self): """ - A list of tuples (file, arcname) to add to the job package. - `arcname` is an alternative name for the file in the job package. + Called to add custom files needed for this environment. This hook will be + called in the `MetaflowPackage` class where metaflow compiles the code package + tarball. This hook can return one of two things: + - a generator yielding a tuple of `(file_path, arcname)` to add files to + the code package. `file_path` is the path to the file on the local filesystem + and `arcname` is the path relative to the packaged code. + - a generator yielding a tuple of `(content, arcname, type)` where: + - type is a AddToPackageType + - for CODE_FILE: + - content: path to the file to include + - arcname: path relative to the code directory in the package + - for CODE_MODULE: + - content: name of the module + - arcame: None (ignored) + - for CONFIG_FILE: + - content: path to the file to include + - arcname: path relative to the config directory in the package + - for CONFIG_CONTENT: + - content: bytes to include + - arcname: path relative to the config directory in the package """ return [] @@ -177,6 +197,7 @@ def get_package_commands(self, code_package_url, datastore_type): "after 6 tries. Exiting...' && exit 1; " "fi" % code_package_url, "TAR_OPTIONS='--warning=no-timestamp' tar xf job.tar", + "export PYTHONPATH=`pwd`/%s:$PYTHONPATH" % MFENV_DIR, "mflog 'Task is starting.'", "flush_mflogs", ] diff --git a/metaflow/metaflow_version.py b/metaflow/metaflow_version.py index 9f47444de24..90badf33657 100644 --- a/metaflow/metaflow_version.py +++ b/metaflow/metaflow_version.py @@ -11,7 +11,7 @@ from os import path, name, environ, listdir from metaflow.extension_support import update_package_info -from metaflow.info_file import CURRENT_DIRECTORY, read_info_file +from metaflow.meta_files import read_info_file # True/False correspond to the value `public`` in get_version diff --git a/metaflow/package.py b/metaflow/package.py deleted file mode 100644 index 1385883d5a7..00000000000 --- a/metaflow/package.py +++ /dev/null @@ -1,203 +0,0 @@ -import importlib -import os -import sys -import tarfile -import time -import json -from io import BytesIO - -from .user_configs.config_parameters import CONFIG_FILE, dump_config_values -from .extension_support import EXT_PKG, package_mfext_all -from .metaflow_config import DEFAULT_PACKAGE_SUFFIXES -from .exception import MetaflowException -from .util import to_unicode -from . import R -from .info_file import INFO_FILE - -DEFAULT_SUFFIXES_LIST = DEFAULT_PACKAGE_SUFFIXES.split(",") -METAFLOW_SUFFIXES_LIST = [".py", ".html", ".css", ".js"] - - -class NonUniqueFileNameToFilePathMappingException(MetaflowException): - headline = "Non Unique file path for a file name included in code package" - - def __init__(self, filename, file_paths, lineno=None): - msg = ( - "Filename %s included in the code package includes multiple different paths for the same name : %s.\n" - "The `filename` in the `add_to_package` decorator hook requires a unique `file_path` to `file_name` mapping" - % (filename, ", ".join(file_paths)) - ) - super().__init__(msg=msg, lineno=lineno) - - -# this is os.walk(follow_symlinks=True) with cycle detection -def walk_without_cycles(top_root): - seen = set() - - def _recurse(root): - for parent, dirs, files in os.walk(root): - for d in dirs: - path = os.path.join(parent, d) - if os.path.islink(path): - # Breaking loops: never follow the same symlink twice - # - # NOTE: this also means that links to sibling links are - # not followed. In this case: - # - # x -> y - # y -> oo - # oo/real_file - # - # real_file is only included twice, not three times - reallink = os.path.realpath(path) - if reallink not in seen: - seen.add(reallink) - for x in _recurse(path): - yield x - yield parent, files - - for x in _recurse(top_root): - yield x - - -class MetaflowPackage(object): - def __init__(self, flow, environment, echo, suffixes=DEFAULT_SUFFIXES_LIST): - self.suffixes = list(set().union(suffixes, DEFAULT_SUFFIXES_LIST)) - self.environment = environment - self.metaflow_root = os.path.dirname(__file__) - - self.flow_name = flow.name - self._flow = flow - self.create_time = time.time() - environment.init_environment(echo) - for step in flow: - for deco in step.decorators: - deco.package_init(flow, step.__name__, environment) - self.blob = self._make() - - def _walk(self, root, exclude_hidden=True, suffixes=None): - if suffixes is None: - suffixes = [] - root = to_unicode(root) # handle files/folder with non ascii chars - prefixlen = len("%s/" % os.path.dirname(root)) - for ( - path, - files, - ) in walk_without_cycles(root): - if exclude_hidden and "/." in path: - continue - # path = path[2:] # strip the ./ prefix - # if path and (path[0] == '.' or './' in path): - # continue - for fname in files: - if (fname[0] == "." and fname in suffixes) or ( - fname[0] != "." - and any(fname.endswith(suffix) for suffix in suffixes) - ): - p = os.path.join(path, fname) - yield p, p[prefixlen:] - - def path_tuples(self): - """ - Returns list of (path, arcname) to be added to the job package, where - `arcname` is the alternative name for the file in the package. - """ - # We want the following contents in the tarball - # Metaflow package itself - for path_tuple in self._walk( - self.metaflow_root, exclude_hidden=False, suffixes=METAFLOW_SUFFIXES_LIST - ): - yield path_tuple - - # Metaflow extensions; for now, we package *all* extensions but this may change - # at a later date; it is possible to call `package_mfext_package` instead of - # `package_mfext_all` but in that case, make sure to also add a - # metaflow_extensions/__init__.py file to properly "close" the metaflow_extensions - # package and prevent other extensions from being loaded that may be - # present in the rest of the system - for path_tuple in package_mfext_all(): - yield path_tuple - - # Any custom packages exposed via decorators - deco_module_paths = {} - for step in self._flow: - for deco in step.decorators: - for path_tuple in deco.add_to_package(): - file_path, file_name = path_tuple - # Check if the path is not duplicated as - # many steps can have the same packages being imported - if file_name not in deco_module_paths: - deco_module_paths[file_name] = file_path - yield path_tuple - elif deco_module_paths[file_name] != file_path: - raise NonUniqueFileNameToFilePathMappingException( - file_name, [deco_module_paths[file_name], file_path] - ) - - # the package folders for environment - for path_tuple in self.environment.add_to_package(): - yield path_tuple - if R.use_r(): - # the R working directory - for path_tuple in self._walk( - "%s/" % R.working_dir(), suffixes=self.suffixes - ): - yield path_tuple - # the R package - for path_tuple in R.package_paths(): - yield path_tuple - else: - # the user's working directory - flowdir = os.path.dirname(os.path.abspath(sys.argv[0])) + "/" - for path_tuple in self._walk(flowdir, suffixes=self.suffixes): - yield path_tuple - - def _add_configs(self, tar): - buf = BytesIO() - buf.write(json.dumps(dump_config_values(self._flow)).encode("utf-8")) - self._add_file(tar, os.path.basename(CONFIG_FILE), buf) - - def _add_info(self, tar): - buf = BytesIO() - buf.write( - json.dumps( - self.environment.get_environment_info(include_ext_info=True) - ).encode("utf-8") - ) - self._add_file(tar, os.path.basename(INFO_FILE), buf) - - @staticmethod - def _add_file(tar, filename, buf): - info = tarfile.TarInfo(filename) - buf.seek(0) - info.size = len(buf.getvalue()) - # Setting this default to Dec 3, 2019 - info.mtime = 1575360000 - tar.addfile(info, buf) - - def _make(self): - def no_mtime(tarinfo): - # a modification time change should not change the hash of - # the package. Only content modifications will. - # Setting this default to Dec 3, 2019 - tarinfo.mtime = 1575360000 - return tarinfo - - buf = BytesIO() - with tarfile.open( - fileobj=buf, mode="w:gz", compresslevel=3, dereference=True - ) as tar: - self._add_info(tar) - self._add_configs(tar) - for path, arcname in self.path_tuples(): - tar.add(path, arcname=arcname, recursive=False, filter=no_mtime) - - blob = bytearray(buf.getvalue()) - blob[4:8] = [0] * 4 # Reset 4 bytes from offset 4 to account for ts - return blob - - def __str__(self): - return "" % ( - self.flow_name, - time.strftime("%a, %d %b %Y %H:%M:%S", self.create_time), - ) diff --git a/metaflow/package/__init__.py b/metaflow/package/__init__.py new file mode 100644 index 00000000000..df64935078f --- /dev/null +++ b/metaflow/package/__init__.py @@ -0,0 +1,234 @@ +import os +import sys +import time +import json + +from hashlib import sha1 +from io import BytesIO + + +from .mfenv import AddToPackageType, MFEnv, MFEnvV1 +from .tar_backend import TarPackagingBackend +from .utils import walk +from ..metaflow_config import DEFAULT_PACKAGE_SUFFIXES +from ..exception import MetaflowException +from ..meta_files import MFENV_DIR, MetaFile, get_metaflow_root, meta_file_name +from ..user_configs.config_parameters import dump_config_values +from .. import R + +DEFAULT_SUFFIXES_LIST = DEFAULT_PACKAGE_SUFFIXES.split(",") + + +class NonUniqueFileNameToFilePathMappingException(MetaflowException): + headline = "Non-unique file path for a file name included in code package" + + def __init__(self, filename, file_paths, lineno=None): + msg = ( + "Filename %s included in the code package includes multiple different " + "paths for the same name : %s.\n" + "The `filename` in the `add_to_package` decorator hook requires a unique " + "`file_path` to `file_name` mapping" % (filename, ", ".join(file_paths)) + ) + super().__init__(msg=msg, lineno=lineno) + + +class MetaflowPackage(object): + def __init__( + self, + flow, + environment, + echo, + suffixes=DEFAULT_SUFFIXES_LIST, + code_env=None, + user_dir=None, + exclude_tl_dirs=None, + package_code_env_path=MFENV_DIR, + package_user_path=None, + backend=TarPackagingBackend, + ): + self.suffixes = list(set().union(suffixes, DEFAULT_SUFFIXES_LIST)) + + # We exclude the environment when packaging as this will be packaged separately. + # This comes into play primarily if packaging from a node already running packaged + # code. + # These directories are only excluded at the top-level (ie: not further down + # in sub-directories) + # "_escape_trampolines" is a special directory where trampoline escape hatch + # files are stored (used by Netflix Extension's Conda implementation). + self.exclude_tl_dirs = [MFENV_DIR, "_escape_trampolines"] + ( + exclude_tl_dirs or [] + ) + + self.package_user_path = package_user_path + self.user_dir = user_dir + + self.environment = environment + self.environment.init_environment(echo) + + self.metaflow_root = get_metaflow_root() + + self._flow = flow + self._backend = backend + self.create_time = time.time() + + # Can be called without a flow to package other things like functions. + if self._flow: + for step in self._flow: + for deco in step.decorators: + deco.package_init(flow, step.__name__, environment) + self.name = f"flow {self._flow.name}" + else: + self.name = "generic code" + + self._code_env = code_env or MFEnvV1( + lambda x: hasattr(x, "METAFLOW_PACKAGE"), + package_path=package_code_env_path, + ) + + # Add metacontent + self._code_env.add_meta_content( + json.dumps( + self.environment.get_environment_info(include_ext_info=True) + ).encode("utf-8"), + MetaFile.INFO_FILE, + ) + + if self._flow: + self._code_env.add_meta_content( + json.dumps(dump_config_values(self._flow)).encode("utf-8"), + MetaFile.CONFIG_FILE, + ) + + # Add user files (from decorators and environment) + self._add_addl_files() + + self.blob = self._make() + + def path_tuples(self): + # Files included in the environment + for path, arcname in self._code_env.content_names(): + yield path, arcname + + # Files included in the user code + for path, arcname in self._user_code_tuples(): + yield path, arcname + + def _add_addl_files(self): + # Look at all decorators that provide additional files + deco_module_paths = {} + addl_modules = set() + + def _check_tuple(path_tuple): + if len(path_tuple) == 2: + path_tuple = ( + path_tuple[0], + path_tuple[1], + AddToPackageType.CODE_FILE, + ) + file_path, file_name, file_type = path_tuple + if file_type == AddToPackageType.CODE_MODULE: + if file_path in addl_modules: + return None # Module was already added -- we don't add twice + addl_modules.add(file_path) + elif file_type == AddToPackageType.CONFIG_CONTENT: + # file_path is a content here (bytes) + file_name = meta_file_name(file_name) + if file_name not in deco_module_paths: + deco_module_paths[file_name] = sha1(file_path).hexdigest() + elif deco_module_paths[file_name] != sha1(file_path).hexdigest(): + raise NonUniqueFileNameToFilePathMappingException( + file_name, + [ + deco_module_paths[file_name], + sha1(file_path).hexdigest(), + ], + ) + else: + # These are files + # Check if the path is not duplicated as + # many steps can have the same packages being imported + if file_name not in deco_module_paths: + deco_module_paths[file_name] = file_path + elif deco_module_paths[file_name] != file_path: + raise NonUniqueFileNameToFilePathMappingException( + file_name, [deco_module_paths[file_name], file_path] + ) + return path_tuple + + def _add_tuple(path_tuple): + file_path, file_name, file_type = path_tuple + if file_type == AddToPackageType.CODE_MODULE: + self._code_env.add_module(file_path) + elif file_type == AddToPackageType.CONFIG_CONTENT: + # file_path is a content here (bytes) + self._code_env.add_meta_content(file_path, file_name) + elif file_type == AddToPackageType.CONFIG_FILE: + self._code_env.add_meta_file(file_path, file_name) + else: + self._code_env.add_code_file(file_path, file_name) + + for step in self._flow: + for deco in step.decorators: + for path_tuple in deco.add_to_package(): + path_tuple = _check_tuple(path_tuple) + if path_tuple is None: + continue + _add_tuple(path_tuple) + + # the package folders for environment + for path_tuple in self.environment.add_to_package(): + path_tuple = _check_tuple(path_tuple) + if path_tuple is None: + continue + _add_tuple(path_tuple) + + def _user_code_tuples(self): + if R.use_r(): + # the R working directory + for path_tuple in walk("%s/" % R.working_dir(), suffixes=self.suffixes): + yield path_tuple + # the R package + for path_tuple in R.package_paths(): + yield path_tuple + else: + # the user's working directory + if self.user_dir: + flowdir = os.path.abspath(self.user_dir) + else: + flowdir = os.path.dirname(os.path.abspath(sys.argv[0])) + "/" + + for path_tuple in walk( + flowdir, suffixes=self.suffixes, exclude_tl_dirs=self.exclude_tl_dirs + ): + # TODO: This is where we will check if the file is already included + # in the mfenv portion using path_in_archive. If it is, we just need to + # include a symlink. + if self.package_user_path: + yield ( + path_tuple[0], + os.path.join(self.package_user_path, path_tuple[1]), + ) + else: + yield path_tuple + + def _make(self): + backend = self._backend() + with backend.create() as archive: + # Package the environment + for path, arcname in self._code_env.content(AddToPackageType.FILES_ONLY): + archive.add_file(path, arcname=arcname) + + for content, arcname in self._code_env.content( + AddToPackageType.CONFIG_CONTENT + ): + if isinstance(content, str): + content = content.encode() + archive.add_data(BytesIO(content), arcname=arcname) + + # Package the user code + for path, arcname in self._user_code_tuples(): + archive.add_file(path, arcname=arcname) + return backend.get_blob() + + def __str__(self): + return f"" diff --git a/metaflow/package/backend.py b/metaflow/package/backend.py new file mode 100644 index 00000000000..e4d19e080e1 --- /dev/null +++ b/metaflow/package/backend.py @@ -0,0 +1,66 @@ +from abc import ABC, abstractmethod +from io import BytesIO +from typing import Any, List, Optional, Union + + +class PackagingBackend(ABC): + + def __init__(self): + self._archive = None + + @abstractmethod + def create(self) -> "PackagingBackend": + pass + + @abstractmethod + def add_file(self, filename: str, arcname: Optional[str] = None): + pass + + @abstractmethod + def add_data(self, data: BytesIO, arcname: str): + pass + + @abstractmethod + def close(self): + pass + + @abstractmethod + def get_blob(self) -> Optional[Union[bytes, bytearray]]: + pass + + @classmethod + @abstractmethod + def cls_has_member(cls, archive: Any, name: str) -> bool: + pass + + @classmethod + @abstractmethod + def cls_extract_member(cls, archive: Any, name: str) -> Optional[bytes]: + pass + + @classmethod + @abstractmethod + def cls_list_members(cls, archive: Any) -> Optional[List[str]]: + pass + + def has_member(self, name: str) -> bool: + if self._archive: + return self.cls_has_member(self._archive, name) + return False + + def extract_member(self, name: str) -> Optional[bytes]: + if self._archive: + return self.cls_extract_member(self._archive, name) + return None + + def list_members(self) -> Optional[List[str]]: + if self._archive: + return self.cls_list_members(self._archive) + return None + + def __enter__(self): + self.create() + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.close() diff --git a/metaflow/package/mfenv.py b/metaflow/package/mfenv.py new file mode 100644 index 00000000000..2fad6979be9 --- /dev/null +++ b/metaflow/package/mfenv.py @@ -0,0 +1,867 @@ +import inspect +import json +import os +import re +import sys +import tarfile + +from collections import defaultdict +from enum import IntEnum +from pathlib import Path +from typing import ( + Any, + Callable, + Dict, + Generator, + List, + Mapping, + NamedTuple, + Optional, + Set, + Tuple, + TYPE_CHECKING, + Union, +) + +from types import ModuleType + +from .tar_backend import TarPackagingBackend +from .utils import walk + +from ..debug import debug +from ..exception import MetaflowException +from ..extension_support import EXT_EXCLUDE_SUFFIXES, metadata, package_mfext_all + +from ..meta_files import ( + MFCONF_DIR, + MFENV_DIR, + MFENV_MARKER, + MetaFile, + generic_get_filename, + v1_get_filename, + meta_file_name, +) +from ..util import get_metaflow_root + +packages_distributions = None + +if sys.version_info[:2] >= (3, 10): + packages_distributions = metadata.packages_distributions +else: + # This is the code present in 3.10+ -- we replicate here for other versions + def _packages_distributions() -> Mapping[str, List[str]]: + """ + Return a mapping of top-level packages to their + distributions. + """ + pkg_to_dist = defaultdict(list) + for dist in metadata.distributions(): + for pkg in _top_level_declared(dist) or _top_level_inferred(dist): + pkg_to_dist[pkg].append(dist.metadata["Name"]) + return dict(pkg_to_dist) + + def _top_level_declared(dist: metadata.Distribution) -> List[str]: + return (dist.read_text("top_level.txt") or "").split() + + def _topmost(name: "pathlib.PurePosixPath") -> Optional[str]: + """ + Return the top-most parent as long as there is a parent. + """ + top, *rest = name.parts + return top if rest else None + + def _get_toplevel_name(name: "pathlib.PurePosixPath") -> str: + return _topmost(name) or ( + # python/typeshed#10328 + inspect.getmodulename(name) # type: ignore + or str(name) + ) + + def _top_level_inferred(dist: "metadata.Distribution"): + opt_names = set(map(_get_toplevel_name, dist.files or [])) + + def importable_name(name): + return "." not in name + + return filter(importable_name, opt_names) + + packages_distributions = _packages_distributions + + +if TYPE_CHECKING: + import pathlib + + +_cached_distributions = None + +name_normalizer = re.compile(r"[-_.]+") + + +class AddToPackageType(IntEnum): + CODE_ONLY = 0x100 + CONFIG_ONLY = 0x200 + FILES_ONLY = 0x400 + ALL = 0xFFF + CODE_FILE = 0x501 + CODE_MODULE = 0x502 + CODE_METAFLOW = 0x504 + CONFIG_FILE = 0x601 + CONFIG_CONTENT = 0x202 + + +def modules_to_distributions() -> Dict[str, List[metadata.Distribution]]: + """ + Return a mapping of top-level modules to their distributions. + + Returns + ------- + Dict[str, List[metadata.Distribution]] + A mapping of top-level modules to their distributions. + """ + global _cached_distributions + if _cached_distributions is None: + _cached_distributions = { + k: [metadata.distribution(d) for d in v] + for k, v in packages_distributions().items() + } + return _cached_distributions + + +_ModuleInfo = NamedTuple( + "_ModuleInfo", [("name", str), ("root_paths", Set[str]), ("module", ModuleType)] +) + + +class PackagedDistribution(metadata.Distribution): + """ + A Python Package packaged within a MFEnv. This allows users to use use importlib + as they would regularly and the packaged Python Package would be considered as a + distribution even if it really isn't (since it is just included in the PythonPath). + """ + + def __init__(self, root: str, content: Dict[str, str]): + self._root = Path(root) + self._content = content + + # Strongly inspired from PathDistribution in metadata.py + def read_text(self, filename: Union[str, os.PathLike]) -> Optional[str]: + if str(filename) in self._content: + return self._content[str(filename)] + return None + + read_text.__doc__ = metadata.Distribution.read_text.__doc__ + + # Returns a metadata.SimplePath but not always present in importlib.metadata libs so + # skipping return type. + def locate_file(self, path: Union[str, os.PathLike]): + return self._root / path + + +class PackagedDistributionFinder(metadata.DistributionFinder): + + def __init__(self, dist_info: Dict[str, Dict[str, str]]): + self._dist_info = dist_info + + def find_distributions(self, context=metadata.DistributionFinder.Context()): + if context.name is None: + # Yields all known distributions + for name, info in self._dist_info.items(): + yield PackagedDistribution( + os.path.join(get_metaflow_root(), name), info + ) + name = name_normalizer.sub("-", context.name).lower() + if name in self._dist_info: + yield PackagedDistribution( + os.path.join(get_metaflow_root(), context.name), + self._dist_info[name], + ) + return None + + +class MFEnv: + _cached_mfenv_info = {} + + _mappings = {} + + @classmethod + def get_filename( + cls, name: Union[MetaFile, str], is_meta: Optional[bool] = None + ) -> Optional[str]: + """ + Get the filename of a file in the expanded package. The filename will point to + a path on the local filesystem. + + Parameters + ---------- + name : Union[MetaFile, str] + Filename to look for. If it is a MetaFile, it is assumed to be relative + to the configuration directory. If it is a string, it is assumed to be + relative to the code directory. + is_meta : bool, optional, default None + If None, the default behavior above is assumed. If True, the file will be + searched for in the configuration directory and if False, it will be + searched for in the code directory. + + Returns + ------- + Optional[str] + The file path of the file if it is exists -- None if there is no such file. + """ + # Get the filename of the expanded file. + # Two cases: + # 1. The file was encoded prior to MFEnv packaging -- it will be next to + # Metaflow (sibling) + # 2. The file was encoded as part of the MFEnv packaging, we redirect + # it to the proper version of MFEnv to do the extraction + mfenv_info = cls._extract_mfenv_info() + handling_cls = cls._get_mfenv_class(mfenv_info) + if handling_cls: + return handling_cls._get_filename(name, mfenv_info, is_meta) + + return generic_get_filename(name, is_meta) + + @classmethod + def get_content( + cls, name: Union[MetaFile, str], is_meta: Optional[bool] = None + ) -> Optional[bytes]: + """ + Get the content of a file in the expanded package. The content is returned as + bytes + + Parameters + ---------- + name : Union[MetaFile, str] + Filename to look for. If it is a MetaFile, it is assumed to be relative + to the configuration directory. If it is a string, it is assumed to be + relative to the code directory. + is_meta : bool, optional, default None + If None, the default behavior above is assumed. If True, the file will be + searched for in the configuration directory and if False, it will be + searched for in the code directory. + + Returns + ------- + Optional[bytes] + The binary content of the file -- None if there is no such file. + """ + file_to_read = cls.get_filename(name, is_meta) + if file_to_read: + with open(file_to_read, "rb") as f: + return f.read() + return None + + @classmethod + def get_archive_filename( + cls, + archive: Any, + name: Union[MetaFile, str], + is_meta: Optional[bool] = None, + packaging_backend=TarPackagingBackend, + ) -> Optional[str]: + """ + Get the filename of a file in the archive. The filename will point to + a path in the archive. + + Parameters + ---------- + name : Union[MetaFile, str] + Filename to look for. If it is a MetaFile, it is assumed to be relative + to the configuration directory. If it is a string, it is assumed to be + relative to the code directory. + is_meta : bool, optional, default None + If None, the default behavior above is assumed. If True, the file will be + searched for in the configuration directory and if False, it will be + searched for in the code directory. + + Returns + ------- + Optional[str] + The file path of the file if it is exists -- None if there is no such file. + """ + mfenv_info = cls._extract_archive_mfenv_info(archive, packaging_backend) + handling_cls = cls._get_mfenv_class(mfenv_info) + if handling_cls: + return handling_cls._get_archive_filename( + archive, name, mfenv_info, is_meta, packaging_backend + ) + # Backward compatible way of accessing all special files. Prior to MFEnv, they + # were stored at the TL of the archive. There is no distinction between code and + # config. + real_name = meta_file_name(name) + if packaging_backend.cls_has_member(archive, real_name): + return real_name + return None + + @classmethod + def get_archive_content( + cls, + archive: tarfile.TarFile, + name: Union[MetaFile, str], + is_meta: Optional[bool] = None, + packaging_backend=TarPackagingBackend, + ) -> Optional[bytes]: + """ + Get the content of a file in archive. The content is returned as + bytes + + Parameters + ---------- + name : Union[MetaFile, str] + Filename to look for. If it is a MetaFile, it is assumed to be relative + to the configuration directory. If it is a string, it is assumed to be + relative to the code directory. + is_meta : bool, optional, default None + If None, the default behavior above is assumed. If True, the file will be + searched for in the configuration directory and if False, it will be + searched for in the code directory. + + Returns + ------- + Optional[bytes] + The binary content of the file -- None if there is no such file. + """ + file_to_extract = cls.get_archive_filename( + archive, name, is_meta, packaging_backend + ) + if file_to_extract: + return packaging_backend.cls_extract_member(archive, file_to_extract) + return None + + def __init_subclass__(cls, version_id, **kwargs) -> None: + super().__init_subclass__(**kwargs) + if version_id in MFEnv._mappings: + raise ValueError( + "Version ID %s already exists in MFEnv mappings " + "-- this is a bug in Metaflow." % str(version_id) + ) + MFEnv._mappings[version_id] = cls + + @classmethod + def _get_mfenv_class(cls, info: Optional[Dict[str, Any]]): + if info is None: + return None + if "version" not in info: + raise MetaflowException( + "Invalid package -- missing version in info: %s" % info + ) + version = info["version"] + if version not in cls._mappings: + raise MetaflowException( + "Invalid package -- unknown version %s in info: %s" % (version, info) + ) + + return cls._mappings[version] + + @classmethod + def _extract_archive_mfenv_info( + cls, archive: Any, packaging_backend=TarPackagingBackend + ): + if id(archive) in cls._cached_mfenv_info: + return cls._cached_mfenv_info[id(archive)] + + mfenv_info = None + # Here we need to extract the information from the archive + if packaging_backend.cls_has_member(archive, MFENV_MARKER): + # The MFENV_MARKER file is present in the archive + # We can extract the information from it + mfenv_info = packaging_backend.cls_extract_member(archive, MFENV_MARKER) + if mfenv_info: + mfenv_info = json.loads(mfenv_info) + cls._cached_mfenv_info[id(archive)] = mfenv_info + return mfenv_info + + @classmethod + def _extract_mfenv_info(cls): + if "_local" in cls._cached_mfenv_info: + return cls._cached_mfenv_info["_local"] + + mfenv_info = None + if os.path.exists(os.path.join(get_metaflow_root(), MFENV_MARKER)): + with open( + os.path.join(get_metaflow_root(), MFENV_MARKER), "r", encoding="utf-8" + ) as f: + mfenv_info = json.load(f) + cls._cached_mfenv_info["_local"] = mfenv_info + return mfenv_info + + +class MFEnvV1(MFEnv, version_id=1): + + METAFLOW_SUFFIXES_LIST = [".py", ".html", ".css", ".js"] + + @classmethod + def _get_filename( + cls, + name: Union[MetaFile, str], + meta_info: Dict[str, Any], + is_meta: Optional[bool] = None, + ) -> Optional[str]: + return v1_get_filename(name, meta_info, is_meta) + + @classmethod + def _get_archive_filename( + cls, + archive: Any, + name: Union[MetaFile, str], + meta_info: Dict[str, Any], + is_meta: Optional[bool] = None, + packaging_backend=TarPackagingBackend, + ) -> Optional[str]: + if is_meta is None: + is_meta = isinstance(name, MetaFile) + if is_meta: + conf_dir = meta_info.get("conf_dir") + if conf_dir is None: + raise MetaflowException( + "Invalid package -- package info does not contain conf_dir key" + ) + path_to_search = os.path.join(conf_dir, meta_file_name(name)) + else: + code_dir = meta_info.get("code_dir") + if code_dir is None: + raise MetaflowException( + "Invalid package -- package info does not contain code_dir key" + ) + path_to_search = os.path.join(code_dir, meta_file_name(name)) + if packaging_backend.cls_has_member(archive, path_to_search): + return path_to_search + return None + + def __init__( + self, + criteria: Callable[[ModuleType], bool], + package_path=MFENV_DIR, + config_path=MFCONF_DIR, + ) -> None: + # package_path is the directory within the archive where the files will be + # stored (by default MFENV_DIR). This is used in internal Netflix code. + + # Look at top-level modules that are present when MFEnv is initialized + modules = filter(lambda x: "." not in x[0], sys.modules.items()) + + # Determine the version of Metaflow that we are part of + self._metaflow_root = get_metaflow_root() + + self._package_path = package_path + self._config_path = config_path + + self._modules = { + name: _ModuleInfo( + name, + set(Path(p).resolve().as_posix() for p in getattr(mod, "__path__", [])), + mod, + ) + for name, mod in dict(modules).items() + } # type: Dict[str, Set[str]] + + # Filter the modules + self._modules = { + name: info for name, info in self._modules.items() if criteria(info.module) + } + + # Contain metadata information regarding the distributions packaged. + # This allows Metaflow to "fake" distribution information when packaged + self._distmetainfo = {} # type: Dict[str, Dict[str, str]] + + # Maps an absolute path on the filesystem to the path of the file in the + # archive. + self._files = {} # type: Dict[str, str] + self._files_from_modules = {} # type: Dict[str, str] + + self._metacontent = {} # type: Dict[Union[MetaFile, str], bytes] + self._metafiles = {} # type: Dict[Union[MetaFile, str], str] + + debug.package_exec(f"Used system modules found: {str(self._modules)}") + + # Populate with files from the third party modules + for k, v in self._modules.items(): + self._files_from_modules.update(self._module_files(k, v.root_paths)) + + def add_meta_content(self, content: bytes, name: Union[MetaFile, str]) -> None: + """ + Adds metadata to code package. This content will be included under `config_path` + which defaults to `MFCONF_DIR`. + + Parameters + ---------- + content : bytes + The content of the metadata file. + name : Union[MetaFile, str] + A known metadata file to add or the name of another metadata file. + """ + debug.package_exec( + f"Adding meta content {meta_file_name(name)} to the MF environment" + ) + if name in self._metacontent: + # TODO: We could check a hash but this seems like a really corner case. + raise MetaflowException( + f"Metadata {meta_file_name(name)} already present in the code package" + ) + self._metacontent[name] = content + + def add_meta_file(self, file_path: str, file_name: Union[MetaFile, str]) -> None: + """ + Adds metadata to code package. This file will be included under `config_path` + which defaults to `MFCONF_DIR`. + + Parameters + ---------- + file_path : str + Path to the file to add in the filesystem + file_name : Union[MetaFile, str] + Name of the file to add (ie: name in the archive). This is always relative + to self._config_path + """ + arcname = meta_file_name(file_name) + file_path = os.path.realpath(file_path) + debug.package_exec( + f"Adding meta file {file_path} as {file_name} to the MF environment" + ) + if file_path in self._metafiles and self._metafiles[file_path] != os.path.join( + self._config_path, arcname.lstrip("/") + ): + raise MetaflowException( + "File %s is already present in the MF environment with a different name: %s" + % (file_path, self._metafiles[file_path]) + ) + self._metafiles[file_path] = os.path.join( + self._config_path, arcname.lstrip("/") + ) + + def add_module(self, module: ModuleType) -> None: + """ + Add a module to the MF environment. + + This module will be included in the resulting code package in tar_dir + (defaults to `MFENV_DIR`). + + Parameters + ---------- + module : ModuleType + The module to include in the MF environment + """ + name = module.__name__ + debug.package_exec(f"Adding module {name} to the MF environment") + # If the module is a single file, we handle this here by looking at __file__ + # which will point to the single file. If it is an actual module, __path__ + # will contain the path(s) to the module + self._modules[name] = _ModuleInfo( + name, + set( + Path(p).resolve().as_posix() + for p in getattr(module, __path__, module.__file__) + ), + module, + ) + self._files_from_modules.update( + self._module_files(name, self._modules[name].root_paths) + ) + + def add_code_directory( + self, + directory: str, + criteria: Callable[[str], bool], + ) -> None: + """ + Add a directory to the MF environment. + + This directory will be included in the resulting code package in tar_dir + (defaults to`MFENV_DIR`). + You can optionally specify a criteria function that takes a file path and + returns a boolean indicating whether or not the file should be included in the + code package. + + At runtime, the content of the directory will be accessible through the usual + PYTHONPATH mechanism but also through `current.envdir`. + + Parameters + ---------- + directory : str + The directory to include in the MF environment + criteria : Callable[[str], bool] + A function that takes a file path and returns a boolean indicating whether or + not the file should be included in the code package + """ + directory = os.path.realpath(directory) + name = os.path.basename(directory) + debug.package_exec(f"Adding directory {directory} to the MF environment") + for root, _, files in os.walk(directory): + for file in files: + if any(file.endswith(x) for x in EXT_EXCLUDE_SUFFIXES): + continue + path = os.path.join(root, file) + relpath = os.path.relpath(path, directory) + if criteria(path): + self._files[path] = os.path.join(self._package_path, name, relpath) + + def add_code_file(self, file_path: str, file_name: str) -> None: + """ + Add a file to the MF environment. + + These files will be included in the resulting code directory (defaults to + `MFENV_DIR`). + + + Parameters + ---------- + file_path: str + The path to the file to include in the MF environment + file_name: str + The name of the file to include in the MF environment. This is the name + that will be used in the archive. + """ + file_path = os.path.realpath(file_path) + debug.package_exec( + f"Adding file {file_path} as {file_name} to the MF environment" + ) + + if file_path in self._files and self._files[file_path] != os.path.join( + self._config_path, file_name.lstrip("/") + ): + raise MetaflowException( + "File %s is already present in the MF environment with a different name: %s" + % (file_path, self._files[file_path]) + ) + self._files[file_path] = os.path.join(self._package_path, file_name.lstrip("/")) + + def path_in_code(self, path: str) -> Optional[str]: + """ + Return the path of the file in the code package if it is included through + add_directory or add_code_file. + + Parameters + ---------- + path : str + The path of the file on the filesystem + + Returns + ------- + Optional[str] + The path of the file in the code package or None if the file is not included + """ + return self._files.get(os.path.realpath(path)) + + def content_names( + self, + content_type: AddToPackageType = AddToPackageType.ALL, + ) -> Generator[Tuple[str, str], None, None]: + """ + Return a generator of all files that will be included (restricted to the type + requested) + + Returns + ------- + Generator[Tuple[str, str], None, None] + A generator of all files included in the MF environment. The first element of + the tuple is the path to the file in the filesystem; the second element is the + path in the archive. + """ + yield from self._content(content_type=content_type, generate_value=False) + + def content( + self, + content_type: AddToPackageType = AddToPackageType.ALL, + ) -> Generator[Tuple[Union[str, bytes], str], None, None]: + """ + Return a generator of all files that will be included (restricted to the type + requested) + + Returns + ------- + Generator[Tuple[str, str], None, None] + A generator of all files included in the MF environment. The first element of + the tuple is the path to the file in the filesystem (or bytes); + the second element is the path in the archive. + """ + yield from self._content(content_type=content_type, generate_value=True) + + def _content( + self, + content_type: AddToPackageType = AddToPackageType.ALL, + generate_value: bool = False, + ) -> Generator[Tuple[str, str], None, None]: + """ + Return a generator of all files that will be included (restricted to the type + requested) + + Returns + ------- + Generator[Tuple[str, str], None, None] + A generator of all files included in the MF environment. The first element of + the tuple is the path to the file in the filesystem; the second element is the + path in the archive. + """ + content_type = content_type.value + if AddToPackageType.CODE_FILE & content_type: + yield from self._files.items() + if AddToPackageType.CODE_MODULE & content_type: + yield from self._files_from_modules.items() + if AddToPackageType.CODE_METAFLOW & content_type: + debug.package_exec("Packaging metaflow code...") + yield from self._metaflow_distribution_files() + yield from self._metaflow_extension_files() + if AddToPackageType.CONFIG_FILE & content_type: + yield from self._metafiles.items() + if AddToPackageType.CONFIG_CONTENT & content_type: + if generate_value: + yield from self._meta_non_file_content(generate_value=generate_value) + else: + for _, name in self._meta_non_file_content(False): + yield "generated-%s" % name, name + + def _meta_non_file_content( + self, generate_value: bool = False + ) -> Generator[Tuple[Optional[bytes], str], None, None]: + """ + Return a generator of all non file meta data. This will only generate the + meta data if generate_value is True + + Parameters + ---------- + generate_value : bool, default False + If True, the generator will generate the meta data. If False, it will not + generate but just return the name of the file in the archive + + Returns + ------- + Generator[Tuple[Optional[bytes], str], None, None] + A generator of all metafiles included in the MF environment. The first + element of the tuple is the content to add; the second element is path in the + archive. + """ + # Generate the marker + if generate_value: + yield ( + json.dumps( + { + "version": 1, + "code_dir": self._package_path, + "conf_dir": self._config_path, + } + ).encode("utf-8"), + os.path.join(self._package_path, MFENV_MARKER), + ) + else: + yield None, os.path.join(self._package_path, MFENV_MARKER) + + # All other meta files + for name in self._metacontent: + yield None, os.path.join(self._config_path, name.value) + # Include distribution information if present + if self._distmetainfo: + if generate_value: + yield ( + json.dumps(self._distmetainfo).encode("utf-8"), + os.path.join(self._config_path, MetaFile.INCLUDED_DIST_INFO.value), + ) + else: + yield None, os.path.join( + self._config_path, MetaFile.INCLUDED_DIST_INFO.value + ) + + def _module_files( + self, name: str, paths: Set[str] + ) -> Generator[Tuple[str, str], None, None]: + debug.package_exec( + " Looking for distributions for module %s in %s" % (name, paths) + ) + paths = set(paths) # Do not modify external paths + has_init = False + distributions = modules_to_distributions().get(name) + prefix = "%s/" % name + init_file = "%s__init__.py" % prefix + + seen_distributions = set() + if distributions: + for dist in distributions: + dist_name = dist.metadata["Name"] # dist.name not always present + if dist_name in seen_distributions: + continue + # For some reason, sometimes the same distribution appears twice. We + # don't need to process twice. + seen_distributions.add(dist_name) + debug.package_exec( + " Including distribution '%s' for module '%s'" + % (dist_name, name) + ) + dist_root = str(dist.locate_file(name)) + if dist_root not in paths: + # This is an error because it means that this distribution is + # not contributing to the module. + raise RuntimeError( + "Distribution '%s' is not contributing to module '%s' as " + "expected (got '%s' when expected one of %s)" + % (dist.metadata["Name"], name, dist_root, paths) + ) + paths.discard(dist_root) + if dist_name not in self._distmetainfo: + # Possible that a distribution contributes to multiple modules + self._distmetainfo[dist_name] = { + # We can add more if needed but these are likely the most + # useful (captures, name, version, etc and files which can + # be used to find non-python files in the distribution). + "METADATA": dist.read_text("METADATA"), + "RECORD": dist.read_text("RECORD"), + } + for file in dist.files or []: + # Skip files that do not belong to this module (distribution may + # provide multiple modules) + if file.parts[0] != name: + continue + if file == init_file: + has_init = True + yield str( + dist.locate_file(file).resolve().as_posix() + ), os.path.join(self._package_path, str(file)) + + # Now if there are more paths left in paths, it means there is a non-distribution + # component to this package which we also include. + debug.package_exec( + " Looking for non-distribution files for module '%s' in %s" + % (name, paths) + ) + for path in paths: + if not Path(path).is_dir(): + # Single file for the module -- this will be something like .py + yield path, os.path.join(self._package_path, os.path.basename(path)) + else: + for root, _, files in os.walk(path): + for file in files: + if any(file.endswith(x) for x in EXT_EXCLUDE_SUFFIXES): + continue + rel_path = os.path.relpath(os.path.join(root, file), path) + if rel_path == "__init__.py": + has_init = True + yield os.path.join(root, file), os.path.join( + self._package_path, + name, + rel_path, + ) + # We now include an empty __init__.py file to close the module and prevent + # leaks from possible namespace packages + if not has_init: + yield os.path.join( + self._metaflow_root, "metaflow", "extension_support", "_empty_file.py" + ), os.path.join(self._package_path, name, "__init__.py") + + def _metaflow_distribution_files(self) -> Generator[Tuple[str, str], None, None]: + debug.package_exec(" Including Metaflow from '%s'" % self._metaflow_root) + for path_tuple in walk( + os.path.join(self._metaflow_root, "metaflow"), + exclude_hidden=False, + suffixes=self.METAFLOW_SUFFIXES_LIST, + ): + yield path_tuple[0], os.path.join(self._package_path, path_tuple[1]) + + def _metaflow_extension_files(self) -> Generator[Tuple[str, str], None, None]: + # Metaflow extensions; for now, we package *all* extensions but this may change + # at a later date; it is possible to call `package_mfext_package` instead of + # `package_mfext_all` but in that case, make sure to also add a + # metaflow_extensions/__init__.py file to properly "close" the metaflow_extensions + # package and prevent other extensions from being loaded that may be + # present in the rest of the system + for path_tuple in package_mfext_all(): + debug.package_exec( + " Including Metaflow extension file '%s'" % path_tuple[0] + ) + yield path_tuple[0], os.path.join(self._package_path, path_tuple[1]) diff --git a/metaflow/package/tar_backend.py b/metaflow/package/tar_backend.py new file mode 100644 index 00000000000..7ba347cbbc4 --- /dev/null +++ b/metaflow/package/tar_backend.py @@ -0,0 +1,66 @@ +import tarfile + +from io import BytesIO +from typing import List, Optional, Union + +from .backend import PackagingBackend + + +class TarPackagingBackend(PackagingBackend): + + def __init__(self): + super().__init__() + self._buf = None + + def create(self): + self._buf = BytesIO() + self._archive = tarfile.open( + fileobj=self._buf, mode="w:gz", compresslevel=3, dereference=True + ) + return self + + def add_file(self, filename: str, arcname: Optional[str] = None): + info = self._archive.gettarinfo(filename, arcname) + # Setting this default to Dec 3, 2019 + info.mtime = 1575360000 + with open(filename, mode="rb") as f: + self._archive.addfile(info, f) + + def add_data(self, data: BytesIO, arcname: str): + info = tarfile.TarInfo(arcname) + data.seek(0) + info.size = len(data.getvalue()) + # Setting this default to Dec 3, 2019 + info.mtime = 1575360000 + self._archive.addfile(info, data) + + def close(self): + if self._archive: + self._archive.close() + + def get_blob(self) -> Optional[Union[bytes, bytearray]]: + if self._buf: + blob = bytearray(self._buf.getvalue()) + blob[4:8] = [0] * 4 # Reset 4 bytes from offset 4 to account for ts + return blob + return None + + @classmethod + def cls_has_member(cls, archive: tarfile.TarFile, name: str) -> bool: + try: + archive.getmember(name) + return True + except KeyError: + return False + + @classmethod + def cls_extract_member(cls, archive: tarfile.TarFile, name: str) -> Optional[bytes]: + try: + member = archive.getmember(name) + return archive.extractfile(member).read() + except KeyError: + return None + + @classmethod + def cls_list_members(cls, archive: tarfile.TarFile) -> Optional[List[str]]: + return archive.getnames() or None diff --git a/metaflow/package/utils.py b/metaflow/package/utils.py new file mode 100644 index 00000000000..261d031f350 --- /dev/null +++ b/metaflow/package/utils.py @@ -0,0 +1,72 @@ +import os +from typing import Generator, List, Optional, Tuple + +from ..util import to_unicode + + +# this is os.walk(follow_symlinks=True) with cycle detection +def walk_without_cycles( + top_root: str, + exclude_dirs: Optional[List[str]] = None, +) -> Generator[Tuple[str, List[str]], None, None]: + seen = set() + + default_skip_dirs = ["__pycache__"] + + def _recurse(root, skip_dirs): + for parent, dirs, files in os.walk(root): + dirs[:] = [d for d in dirs if d not in skip_dirs] + for d in dirs: + path = os.path.join(parent, d) + if os.path.islink(path): + # Breaking loops: never follow the same symlink twice + # + # NOTE: this also means that links to sibling links are + # not followed. In this case: + # + # x -> y + # y -> oo + # oo/real_file + # + # real_file is only included twice, not three times + reallink = os.path.realpath(path) + if reallink not in seen: + seen.add(reallink) + for x in _recurse(path, default_skip_dirs): + yield x + yield parent, files + + skip_dirs = set(default_skip_dirs + (exclude_dirs or [])) + for x in _recurse(top_root, skip_dirs): + skip_dirs = default_skip_dirs + yield x + + +def walk( + root: str, + exclude_hidden: bool = True, + suffixes: Optional[List[str]] = None, + exclude_tl_dirs: Optional[List[str]] = None, +) -> Generator[Tuple[str, str], None, None]: + root = to_unicode(root) # handle files/folder with non ascii chars + prefixlen = len("%s/" % os.path.dirname(root)) + for ( + path, + files, + ) in walk_without_cycles(root, exclude_tl_dirs): + if exclude_hidden and "/." in path: + continue + # path = path[2:] # strip the ./ prefix + # if path and (path[0] == '.' or './' in path): + # continue + for fname in files: + if ( + suffixes is None + or (fname[0] == "." and fname in suffixes) + or ( + fname[0] != "." + and any(fname.endswith(suffix) for suffix in suffixes) + ) + ): + p = os.path.join(path, fname) + yield p, p[prefixlen:] diff --git a/metaflow/plugins/pypi/conda_decorator.py b/metaflow/plugins/pypi/conda_decorator.py index c4d5bd5fb3a..8cc1f25b84c 100644 --- a/metaflow/plugins/pypi/conda_decorator.py +++ b/metaflow/plugins/pypi/conda_decorator.py @@ -10,10 +10,10 @@ from metaflow.extension_support import EXT_PKG from metaflow.metadata_provider import MetaDatum from metaflow.metaflow_environment import InvalidEnvironmentException +from metaflow.package.mfenv import MFEnv +from metaflow.meta_files import MetaFile from metaflow.util import get_metaflow_root -from ...info_file import INFO_FILE - class CondaStepDecorator(StepDecorator): """ @@ -159,11 +159,11 @@ def runtime_init(self, flow, graph, package, run_id): os.path.join(self.metaflow_dir.name, "metaflow"), ) - info = os.path.join(get_metaflow_root(), os.path.basename(INFO_FILE)) + info = MFEnv.get_filename(MetaFile.INFO_FILE) # Symlink the INFO file as well to properly propagate down the Metaflow version - if os.path.isfile(info): + if info: os.symlink( - info, os.path.join(self.metaflow_dir.name, os.path.basename(INFO_FILE)) + info, os.path.join(self.metaflow_dir.name, os.path.basename(info)) ) else: # If there is no info file, we will actually create one in this new @@ -173,7 +173,10 @@ def runtime_init(self, flow, graph, package, run_id): # EXT_PKG extensions are PYTHONPATH extensions. Instead of re-resolving, # we use the resolved information that is written out to the INFO file. with open( - os.path.join(self.metaflow_dir.name, os.path.basename(INFO_FILE)), + os.path.join( + self.metaflow_dir.name, + os.path.basename(MetaFile.INFO_FILE.value), + ), mode="wt", encoding="utf-8", ) as f: diff --git a/metaflow/plugins/pypi/conda_environment.py b/metaflow/plugins/pypi/conda_environment.py index 75a954a3023..bb3924ed7c8 100644 --- a/metaflow/plugins/pypi/conda_environment.py +++ b/metaflow/plugins/pypi/conda_environment.py @@ -17,6 +17,7 @@ from metaflow.exception import MetaflowException from metaflow.metaflow_config import get_pinned_conda_libs from metaflow.metaflow_environment import MetaflowEnvironment +from metaflow.package.mfenv import AddToPackageType from . import MAGIC_FILE, _datastore_packageroot from .utils import conda_platform @@ -470,7 +471,9 @@ def add_to_package(self): files = [] manifest = self.get_environment_manifest_path() if os.path.exists(manifest): - files.append((manifest, os.path.basename(manifest))) + files.append( + (manifest, os.path.basename(manifest), AddToPackageType.CONFIG_FILE) + ) return files def bootstrap_commands(self, step_name, datastore_type): diff --git a/metaflow/plugins/uv/bootstrap.py b/metaflow/plugins/uv/bootstrap.py index 619b85496c1..b645888d7d4 100644 --- a/metaflow/plugins/uv/bootstrap.py +++ b/metaflow/plugins/uv/bootstrap.py @@ -93,6 +93,7 @@ def skip_metaflow_dependencies(): return skip_pkgs def sync_uv_project(datastore_type): + # TODO: NEED TO POINT UV TO FILES IN THE CONFIG DIR print("Syncing uv project...") dependencies = " ".join(get_dependencies(datastore_type)) skip_pkgs = " ".join( diff --git a/metaflow/plugins/uv/uv_environment.py b/metaflow/plugins/uv/uv_environment.py index cc361650802..f396a9cc054 100644 --- a/metaflow/plugins/uv/uv_environment.py +++ b/metaflow/plugins/uv/uv_environment.py @@ -2,6 +2,7 @@ from metaflow.exception import MetaflowException from metaflow.metaflow_environment import MetaflowEnvironment +from metaflow.package.mfenv import AddToPackageType class UVException(MetaflowException): @@ -43,8 +44,8 @@ def _find(filename): pyproject_path = _find("pyproject.toml") uv_lock_path = _find("uv.lock") files = [ - (uv_lock_path, "uv.lock"), - (pyproject_path, "pyproject.toml"), + (uv_lock_path, "uv.lock", AddToPackageType.CONFIG_FILE), + (pyproject_path, "pyproject.toml", AddToPackageType.CONFIG_FILE), ] return files diff --git a/metaflow/user_configs/config_options.py b/metaflow/user_configs/config_options.py index 9770cc80df4..d94fa3574ae 100644 --- a/metaflow/user_configs/config_options.py +++ b/metaflow/user_configs/config_options.py @@ -7,9 +7,11 @@ from metaflow._vendor import click from metaflow.debug import debug -from .config_parameters import CONFIG_FILE, ConfigValue +from .config_parameters import ConfigValue from ..exception import MetaflowException, MetaflowInternalError +from ..package.mfenv import MFEnv from ..parameters import DeployTimeField, ParameterContext, current_flow +from ..meta_files import MetaFile from ..util import get_username @@ -24,7 +26,7 @@ def _load_config_values(info_file: Optional[str] = None) -> Optional[Dict[Any, Any]]: if info_file is None: - info_file = os.path.basename(CONFIG_FILE) + info_file = MFEnv.get_filename(MetaFile.CONFIG_FILE) try: with open(info_file, encoding="utf-8") as contents: return json.load(contents).get("user_configs", {}) @@ -433,7 +435,7 @@ class LocalFileInput(click.Path): # Small wrapper around click.Path to set the value from which to read configuration # values. This is set immediately upon processing the --local-config-file # option and will therefore then be available when processing any of the other - # --config options (which will call ConfigInput.process_configs + # --config options (which will call ConfigInput.process_configs) name = "LocalFileInput" def convert(self, value, param, ctx): diff --git a/metaflow/util.py b/metaflow/util.py index 8636b8253bc..c0383766b5d 100644 --- a/metaflow/util.py +++ b/metaflow/util.py @@ -9,7 +9,6 @@ from itertools import takewhile import re -from metaflow.exception import MetaflowUnknownUser, MetaflowInternalError try: # python2 @@ -162,6 +161,8 @@ def get_username(): def resolve_identity_as_tuple(): + from metaflow.exception import MetaflowUnknownUser + prod_token = os.environ.get("METAFLOW_PRODUCTION_TOKEN") if prod_token: return "production", prod_token @@ -236,6 +237,8 @@ class of the given object. def compress_list(lst, separator=",", rangedelim=":", zlibmarker="!", zlibmin=500): + from metaflow.exception import MetaflowInternalError + bad_items = [x for x in lst if separator in x or rangedelim in x or zlibmarker in x] if bad_items: raise MetaflowInternalError(