From 3f1c9c318dce7e9413a507cbc19e401f8365854f Mon Sep 17 00:00:00 2001 From: Pablo Pardo Garcia Date: Mon, 13 Jan 2025 11:42:50 +0100 Subject: [PATCH 1/4] add script to push pipelines to glassflow --- setup.py | 8 + src/glassflow/utils/yaml_models.py | 127 +++++++++++++ src/glassflow/utils/yaml_operations.py | 241 +++++++++++++++++++++++++ 3 files changed, 376 insertions(+) create mode 100644 src/glassflow/utils/yaml_models.py create mode 100644 src/glassflow/utils/yaml_operations.py diff --git a/setup.py b/setup.py index 799548a..e9e7d1c 100644 --- a/setup.py +++ b/setup.py @@ -34,6 +34,9 @@ "typing-inspect>=0.9.0", "typing_extensions>=4.7.1", "python-dotenv==1.0.1", + "pyyaml>=6.0.2", + "pydantic>=2.10.5", + "eval_type_backport>=0.2.0", ], extras_require={ "dev": [ @@ -49,4 +52,9 @@ package_dir={"": "src"}, python_requires=">=3.8", package_data={"glassflow": ["py.typed"]}, + entry_points={ + "console_scripts": [ + "glassflow-push-pipelines=glassflow.utils.yaml_operations:main", + ] + }, ) diff --git a/src/glassflow/utils/yaml_models.py b/src/glassflow/utils/yaml_models.py new file mode 100644 index 0000000..84ec4b8 --- /dev/null +++ b/src/glassflow/utils/yaml_models.py @@ -0,0 +1,127 @@ +""" +Keep here until we transition to full support of YAML +""" + +from __future__ import annotations + +import uuid +from typing import Annotated, Literal, Union + +from pydantic import BaseModel, Field, ValidationError, model_validator + + +class GlassFlowConfig(BaseModel): + organization_id: uuid.UUID + + +class Pipeline(BaseModel): + name: str + pipeline_id: uuid.UUID | None = Field(None) + space_id: uuid.UUID + blocks: list[Block] + + @model_validator(mode="after") + def check_blocks(self): + """Validate pipeline has source, transformer and sink""" + + assert len(self.blocks) == 3 + + source = [b for b in self.blocks if b.type == "source"] + transformer = [b for b in self.blocks if b.type == "transformer"] + sink = [b for b in self.blocks if b.type == "sink"] + + assert len(source) == 1 + assert len(transformer) == 1 + assert len(sink) == 1 + + assert source[0].next_block_id == transformer[0].id + assert transformer[0].next_block_id == sink[0].id + return self + + +class EnvironmentVariable(BaseModel): + name: str + value: str | None = Field(None) + value_secret_ref: str | None = Field(None) + + @model_validator(mode="after") + def check_filled(self): + if self.value_secret_ref is None and self.value is None: + raise ValidationError("value or value_secret_ref must be filled") + return self + + +class Requirements(BaseModel): + path: str | None = Field(None) + value: str | None = Field(None) + + @model_validator(mode="after") + def check_filled(self): + if self.path is None and self.value is None: + raise ValidationError("Path or value must be filled") + return self + + +class Transformation(BaseModel): + path: str | None = Field(None) + value: str | None = Field(None) + + @model_validator(mode="after") + def check_filled(self): + if self.path is None and self.value is None: + raise ValidationError("Path or value must be filled") + return self + + +class BaseBlock(BaseModel): + id: str + name: str + type: str + + +class TransformerBlock(BaseBlock): + type: Literal["transformer"] + requirements: Requirements + transformation: Transformation + next_block_id: str + env_vars: list[EnvironmentVariable] + + +class SourceBlock(BaseBlock): + type: Literal["source"] + next_block_id: str + kind: str | None = Field(None) + config: dict | None = Field(None) + config_secret_ref: str | None = Field(None) + + @model_validator(mode="after") + def check_filled(self): + if ( + self.kind is not None + and self.config is None + and self.config_secret_ref is None + ): + raise ValidationError("config or config_secret_ref must be filled") + return self + + +class SinkBlock(BaseBlock): + type: Literal["sink"] + kind: str | None = Field(None) + config: dict | None = Field(None) + config_secret_ref: str | None = Field(None) + + @model_validator(mode="after") + def check_filled(self): + if ( + self.kind is not None + and self.config is None + and self.config_secret_ref is None + ): + raise ValidationError("config or config_secret_ref must be filled") + return self + + +Block = Annotated[ + Union[TransformerBlock, SourceBlock, SinkBlock], Field(discriminator="type") +] diff --git a/src/glassflow/utils/yaml_operations.py b/src/glassflow/utils/yaml_operations.py new file mode 100644 index 0000000..83c7c16 --- /dev/null +++ b/src/glassflow/utils/yaml_operations.py @@ -0,0 +1,241 @@ +import argparse +import itertools +import logging +import sys +from pathlib import Path + +from yaml import safe_load + +from glassflow import GlassFlowClient +from glassflow import Pipeline as GlassFlowPipeline +from glassflow.utils.yaml_models import Pipeline + +logging.basicConfig(stream=sys.stdout, level=logging.INFO) +log = logging.getLogger(__name__) + +# TODO: handle deleted pipelines + + +def load_yaml_file(file): + """Loads Pipeline YAML file""" + # Load YAML + with open(file) as f: + yaml_data = safe_load(f) + + return Pipeline(**yaml_data) + + +def yaml_file_to_pipeline( + yaml_path: Path, personal_access_token: str +) -> GlassFlowPipeline: + """ + Converts a Pipeline YAML file into GlassFlow SDK Pipeline + """ + yaml_file_dir = yaml_path.parent + p = load_yaml_file(yaml_path) + + # We have one source, transformer and sink blocks + source = [b for b in p.blocks if b.type == "source"][0] + transformer = [b for b in p.blocks if b.type == "transformer"][0] + sink = [b for b in p.blocks if b.type == "sink"][0] + + if transformer.requirements is not None: + if transformer.requirements.value is not None: + requirements = transformer.requirements.value + else: + with open(yaml_file_dir / transformer.requirements.path) as f: + requirements = f.read() + else: + requirements = None + + if transformer.transformation.path is not None: + transform = str(yaml_file_dir / transformer.transformation.path) + else: + transform = str(yaml_file_dir / "handler.py") + with open(transform, "w") as f: + f.write(transformer.transformation.value) + + pipeline_id = str(p.pipeline_id) if p.pipeline_id is not None else None + env_vars = [e.model_dump(exclude_none=True) for e in transformer.env_vars] + + # TODO: Handle source and sink config_secret_ref + # TODO: Handle env_var value_secret_ref + return GlassFlowPipeline( + personal_access_token=personal_access_token, + id=pipeline_id, + name=p.name, + space_id=p.space_id.__str__(), + env_vars=env_vars, + transformation_file=transform, + requirements=requirements, + sink_kind=sink.kind, + sink_config=sink.config, + source_kind=source.kind, + source_config=source.config, + ) + + +def get_yaml_files_with_changes(filter_dir: Path, files: list[Path]) -> set[Path]: + """ + Given a list of pipeline file (`.yaml`, `.yml`, `.py` or + `requirements.txt`) it returns the pipeline YAML files that + the files belong to. + """ + pipeline_2_files = map_yaml_to_files(filter_dir) + + pipelines_changed = set() + for file in files: + if file.suffix in [".yaml", ".yml"]: + if filter_dir in file.parents: + pipelines_changed.add(file) + else: + # Ignore YAML files outside path + continue + elif file.suffix == ".py" or file.name == "requirements.txt": + for k in pipeline_2_files: + if file in pipeline_2_files[k]: + pipelines_changed.add(k) + else: + continue + + return pipelines_changed + + +def map_yaml_to_files(path: Path) -> dict[Path, list[Path]]: + """Maps Pipeline YAML files to .py and requirements.txt files""" + yml_files = itertools.chain(path.rglob("*.yaml"), path.rglob("*.yml")) + mapping = {} + for file in yml_files: + mapping[file] = [] + for b in load_yaml_file(file).blocks: + if b.type == "transformer": + transformer = b + break + else: + continue + + if transformer.requirements.path is not None: + path = file.parent / transformer.requirements.path + mapping[file].append(path) + + if transformer.transformation.path is not None: + path = file.parent / transformer.transformation.path + mapping[file].append(path) + return mapping + + +def add_pipeline_id_to_yaml(yaml_path: Path, pipeline_id: str): + """Prepend the pipeline id to the yaml file""" + with open(yaml_path, "r+") as f: + content = f.read() + f.seek(0, 0) + f.write(f"pipeline_id: {pipeline_id}" + "\n" + content) + + +def query_yes_no(question: str, default="yes") -> bool: + valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False} + if default is None: + prompt = " [y/n] " + elif default == "yes": + prompt = " [Y/n] " + elif default == "no": + prompt = " [y/N] " + else: + raise ValueError(f"invalid default answer: '{default}'") + + while True: + sys.stdout.write(question + prompt) + choice = input().lower() + if default is not None and choice == "": + return valid[default] + elif choice in valid: + return valid[choice] + else: + log.info("Please respond with 'yes' or 'no' " "(or 'y' or 'n').\n") + + +def get_changes_summary(pipelines: list[GlassFlowPipeline]): + """Returns a dictionary of changes that will be applied""" + to_create = len([p for p in pipelines if p.id is None]) + to_update = len(pipelines) - to_create + to_update_ids = [p.id for p in pipelines if p.id is not None] + + log.info( + f""" +Expected changes on your GlassFlow pipelines: +\t‣ Create {to_create} pipelines +\t‣ Update {to_update} pipelines {"" if to_update == 0 else f'(IDs: {to_update_ids})'} + """ + ) + return { + "to_create": {"quantity": to_create}, + "to_update": {"quantity": to_update, "ids": to_update_ids}, + } + + +def push_to_cloud( + files_changed: list[Path], pipeline_id: Path, client: GlassFlowClient +): + yaml_files_to_update = get_yaml_files_with_changes( + filter_dir=pipeline_id, files=files_changed + ) + pipelines = [ + yaml_file_to_pipeline(yaml_file, client.personal_access_token) + for yaml_file in yaml_files_to_update + ] + + get_changes_summary(pipelines) + update = query_yes_no("Do you want to proceed?", default="no") + if not update: + sys.stdout.write("Pipelines update cancelled!\n") + exit(0) + + for pipeline, yaml_file in zip(pipelines, yaml_files_to_update): + if pipeline.id is None: + # Create pipeline + new_pipeline = pipeline.create() + add_pipeline_id_to_yaml(yaml_file, new_pipeline.id) + else: + # Update pipeline + existing_pipeline = client.get_pipeline(pipeline.id) + existing_pipeline.update( + name=pipeline.name, + transformation_file=pipeline.transformation_file, + requirements=pipeline.requirements, + sink_kind=pipeline.sink_kind, + sink_config=pipeline.sink_config, + source_kind=pipeline.source_kind, + source_config=pipeline.source_config, + env_vars=pipeline.env_vars, + ) + + +def main(): + parser = argparse.ArgumentParser("Push pipelines configuration to GlassFlow cloud") + parser.add_argument( + "-f", + "--files", + help="List of files (`.yaml`, `.yml`, `.py` or `requirements.txt`) " + "to sync to GlassFlow.", + type=Path, + nargs="+", + ) + parser.add_argument( + "--dir", + help="Directory from where to sync pipelines to GlassFlow.", + type=Path, + ) + parser.add_argument( + "-t", + "--personal-access-token", + help="GlassFlow Personal Access Token.", + type=str, + ) + args = parser.parse_args() + + client = GlassFlowClient(personal_access_token=args.personal_access_token) + push_to_cloud(args.files, args.dir, client) + + +if __name__ == "__main__": + main() From ccf1f99d72a4e08ce84cf2632075467e817c7b5a Mon Sep 17 00:00:00 2001 From: Pablo Pardo Garcia Date: Mon, 13 Jan 2025 13:27:24 +0100 Subject: [PATCH 2/4] add --aut-approve argument --- src/glassflow/utils/yaml_operations.py | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/src/glassflow/utils/yaml_operations.py b/src/glassflow/utils/yaml_operations.py index 83c7c16..4ad5214 100644 --- a/src/glassflow/utils/yaml_operations.py +++ b/src/glassflow/utils/yaml_operations.py @@ -10,7 +10,7 @@ from glassflow import Pipeline as GlassFlowPipeline from glassflow.utils.yaml_models import Pipeline -logging.basicConfig(stream=sys.stdout, level=logging.INFO) +logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") log = logging.getLogger(__name__) # TODO: handle deleted pipelines @@ -174,7 +174,10 @@ def get_changes_summary(pipelines: list[GlassFlowPipeline]): def push_to_cloud( - files_changed: list[Path], pipeline_id: Path, client: GlassFlowClient + files_changed: list[Path], + pipeline_id: Path, + client: GlassFlowClient, + auto_approve: bool = False, ): yaml_files_to_update = get_yaml_files_with_changes( filter_dir=pipeline_id, files=files_changed @@ -185,10 +188,11 @@ def push_to_cloud( ] get_changes_summary(pipelines) - update = query_yes_no("Do you want to proceed?", default="no") - if not update: - sys.stdout.write("Pipelines update cancelled!\n") - exit(0) + if not auto_approve: + update = query_yes_no("Do you want to proceed?", default="no") + if not update: + log.info("Pipelines update cancelled!\n") + exit(0) for pipeline, yaml_file in zip(pipelines, yaml_files_to_update): if pipeline.id is None: @@ -231,10 +235,18 @@ def main(): help="GlassFlow Personal Access Token.", type=str, ) + parser.add_argument( + "-y", + "--auto-approve", + action="store_true", + default=False, + required=False, + help="Automatically approve pipelines without prompting for input.", + ) args = parser.parse_args() client = GlassFlowClient(personal_access_token=args.personal_access_token) - push_to_cloud(args.files, args.dir, client) + push_to_cloud(args.files, args.dir, client, args.auto_approve) if __name__ == "__main__": From c9af368876b1ee02d280f6173fcb4897f8ade42a Mon Sep 17 00:00:00 2001 From: Pablo Pardo Garcia Date: Mon, 13 Jan 2025 15:57:32 +0100 Subject: [PATCH 3/4] support delete files and update script args --- src/glassflow/utils/yaml_operations.py | 102 +++++++++++++++++-------- 1 file changed, 71 insertions(+), 31 deletions(-) diff --git a/src/glassflow/utils/yaml_operations.py b/src/glassflow/utils/yaml_operations.py index 4ad5214..341880d 100644 --- a/src/glassflow/utils/yaml_operations.py +++ b/src/glassflow/utils/yaml_operations.py @@ -13,8 +13,6 @@ logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") log = logging.getLogger(__name__) -# TODO: handle deleted pipelines - def load_yaml_file(file): """Loads Pipeline YAML file""" @@ -75,22 +73,18 @@ def yaml_file_to_pipeline( ) -def get_yaml_files_with_changes(filter_dir: Path, files: list[Path]) -> set[Path]: +def get_yaml_files_with_changes(pipelines_dir: Path, files: list[Path]) -> set[Path]: """ Given a list of pipeline file (`.yaml`, `.yml`, `.py` or `requirements.txt`) it returns the pipeline YAML files that the files belong to. """ - pipeline_2_files = map_yaml_to_files(filter_dir) + pipeline_2_files = map_yaml_to_files(pipelines_dir) pipelines_changed = set() for file in files: if file.suffix in [".yaml", ".yml"]: - if filter_dir in file.parents: - pipelines_changed.add(file) - else: - # Ignore YAML files outside path - continue + pipelines_changed.add(file) elif file.suffix == ".py" or file.name == "requirements.txt": for k in pipeline_2_files: if file in pipeline_2_files[k]: @@ -154,47 +148,78 @@ def query_yes_no(question: str, default="yes") -> bool: log.info("Please respond with 'yes' or 'no' " "(or 'y' or 'n').\n") -def get_changes_summary(pipelines: list[GlassFlowPipeline]): +def get_changes_summary( + changed_pipelines: list[GlassFlowPipeline], + deleted_pipelines: list[GlassFlowPipeline], +): """Returns a dictionary of changes that will be applied""" - to_create = len([p for p in pipelines if p.id is None]) - to_update = len(pipelines) - to_create - to_update_ids = [p.id for p in pipelines if p.id is not None] + to_create = len([p for p in changed_pipelines if p.id is None]) + to_update = len(changed_pipelines) - to_create + to_update_ids = [p.id for p in changed_pipelines if p.id is not None] + to_delete = len(deleted_pipelines) + to_delete_ids = [p.id for p in deleted_pipelines] log.info( f""" Expected changes on your GlassFlow pipelines: \t‣ Create {to_create} pipelines \t‣ Update {to_update} pipelines {"" if to_update == 0 else f'(IDs: {to_update_ids})'} +\t‣ Delete {to_delete} pipelines {"" if to_update == 0 else f'(IDs: {to_delete_ids})'} """ ) return { "to_create": {"quantity": to_create}, "to_update": {"quantity": to_update, "ids": to_update_ids}, + "to_delete": {"quantity": to_delete, "ids": to_delete_ids}, } +def delete_pipelines(files_deleted: list[Path], client: GlassFlowClient): + for file in files_deleted: + if file.suffix in [".yaml", ".yml"]: + p = yaml_file_to_pipeline( + yaml_path=file, personal_access_token=client.personal_access_token + ) + p.delete() + log.info(f"Deleted pipeline {p.id}") + + def push_to_cloud( files_changed: list[Path], - pipeline_id: Path, + files_deleted: list[Path], + pipelines_dir: Path, client: GlassFlowClient, auto_approve: bool = False, ): - yaml_files_to_update = get_yaml_files_with_changes( - filter_dir=pipeline_id, files=files_changed - ) - pipelines = [ - yaml_file_to_pipeline(yaml_file, client.personal_access_token) - for yaml_file in yaml_files_to_update - ] + if files_deleted is not None: + deleted_pipelines = [ + yaml_file_to_pipeline(f, client.personal_access_token) + for f in files_deleted + if f.suffix in [".yaml", ".yml"] + ] + else: + deleted_pipelines = [] + + if files_changed is not None: + yaml_files_to_update = get_yaml_files_with_changes( + pipelines_dir=pipelines_dir, files=files_changed + ) + changed_pipelines = [ + yaml_file_to_pipeline(yaml_file, client.personal_access_token) + for yaml_file in yaml_files_to_update + ] + else: + yaml_files_to_update = [] + changed_pipelines = [] - get_changes_summary(pipelines) + get_changes_summary(changed_pipelines, deleted_pipelines) if not auto_approve: update = query_yes_no("Do you want to proceed?", default="no") if not update: - log.info("Pipelines update cancelled!\n") + log.info("Pipelines push cancelled!") exit(0) - for pipeline, yaml_file in zip(pipelines, yaml_files_to_update): + for pipeline, yaml_file in zip(changed_pipelines, yaml_files_to_update): if pipeline.id is None: # Create pipeline new_pipeline = pipeline.create() @@ -217,17 +242,26 @@ def push_to_cloud( def main(): parser = argparse.ArgumentParser("Push pipelines configuration to GlassFlow cloud") parser.add_argument( - "-f", - "--files", - help="List of files (`.yaml`, `.yml`, `.py` or `requirements.txt`) " - "to sync to GlassFlow.", + "-d", + "--files-deleted", + help="List of files that were deleted (`.yaml`, `.yml`, `.py` or " + "`requirements.txt`) to sync to GlassFlow.", type=Path, nargs="+", ) parser.add_argument( - "--dir", - help="Directory from where to sync pipelines to GlassFlow.", + "-a", + "--files-changed", + help="List of files with changes (`.yaml`, `.yml`, `.py` or " + "`requirements.txt`) to sync to GlassFlow.", type=Path, + nargs="+", + ) + parser.add_argument( + "--pipelines-dir", + help="Path to directory with your GlassFlow pipelines.", + type=Path, + default="pipelines/", ) parser.add_argument( "-t", @@ -246,7 +280,13 @@ def main(): args = parser.parse_args() client = GlassFlowClient(personal_access_token=args.personal_access_token) - push_to_cloud(args.files, args.dir, client, args.auto_approve) + push_to_cloud( + files_deleted=args.files_deleted, + files_changed=args.files_changed, + pipelines_dir=args.pipelines_dir, + client=client, + auto_approve=args.auto_approve, + ) if __name__ == "__main__": From cb544ff573219474787c81471d825030eb089f9f Mon Sep 17 00:00:00 2001 From: Pablo Pardo Garcia Date: Tue, 14 Jan 2025 17:18:17 +0100 Subject: [PATCH 4/4] update YAML specification --- src/glassflow/utils/yaml_models.py | 35 ++++++++++++++------------ src/glassflow/utils/yaml_operations.py | 14 +++++------ 2 files changed, 26 insertions(+), 23 deletions(-) diff --git a/src/glassflow/utils/yaml_models.py b/src/glassflow/utils/yaml_models.py index 84ec4b8..c387acf 100644 --- a/src/glassflow/utils/yaml_models.py +++ b/src/glassflow/utils/yaml_models.py @@ -18,24 +18,25 @@ class Pipeline(BaseModel): name: str pipeline_id: uuid.UUID | None = Field(None) space_id: uuid.UUID - blocks: list[Block] + components: list[Component] @model_validator(mode="after") - def check_blocks(self): + def check_components(self): """Validate pipeline has source, transformer and sink""" - assert len(self.blocks) == 3 + assert len(self.components) == 3 - source = [b for b in self.blocks if b.type == "source"] - transformer = [b for b in self.blocks if b.type == "transformer"] - sink = [b for b in self.blocks if b.type == "sink"] + source = [c for c in self.components if c.type == "source"] + transformer = [c for c in self.components if c.type == "transformer"] + sink = [c for c in self.components if c.type == "sink"] assert len(source) == 1 assert len(transformer) == 1 assert len(sink) == 1 - assert source[0].next_block_id == transformer[0].id - assert transformer[0].next_block_id == sink[0].id + assert transformer[0].inputs[0] == source[0].id + assert sink[0].inputs[0] == transformer[0].id + return self @@ -73,23 +74,23 @@ def check_filled(self): return self -class BaseBlock(BaseModel): +class BaseComponent(BaseModel): id: str name: str type: str -class TransformerBlock(BaseBlock): +class TransformerComponent(BaseComponent): type: Literal["transformer"] requirements: Requirements transformation: Transformation - next_block_id: str + inputs: list[str] env_vars: list[EnvironmentVariable] -class SourceBlock(BaseBlock): +class SourceComponent(BaseComponent): type: Literal["source"] - next_block_id: str + inputs: list[str] kind: str | None = Field(None) config: dict | None = Field(None) config_secret_ref: str | None = Field(None) @@ -105,11 +106,12 @@ def check_filled(self): return self -class SinkBlock(BaseBlock): +class SinkComponent(BaseComponent): type: Literal["sink"] kind: str | None = Field(None) config: dict | None = Field(None) config_secret_ref: str | None = Field(None) + inputs: list[str] @model_validator(mode="after") def check_filled(self): @@ -122,6 +124,7 @@ def check_filled(self): return self -Block = Annotated[ - Union[TransformerBlock, SourceBlock, SinkBlock], Field(discriminator="type") +Component = Annotated[ + Union[TransformerComponent, SourceComponent, SinkComponent], + Field(discriminator="type"), ] diff --git a/src/glassflow/utils/yaml_operations.py b/src/glassflow/utils/yaml_operations.py index 341880d..faf7b37 100644 --- a/src/glassflow/utils/yaml_operations.py +++ b/src/glassflow/utils/yaml_operations.py @@ -32,10 +32,10 @@ def yaml_file_to_pipeline( yaml_file_dir = yaml_path.parent p = load_yaml_file(yaml_path) - # We have one source, transformer and sink blocks - source = [b for b in p.blocks if b.type == "source"][0] - transformer = [b for b in p.blocks if b.type == "transformer"][0] - sink = [b for b in p.blocks if b.type == "sink"][0] + # We have one source, transformer and sink components + source = [c for c in p.components if c.type == "source"][0] + transformer = [c for c in p.components if c.type == "transformer"][0] + sink = [c for c in p.components if c.type == "sink"][0] if transformer.requirements is not None: if transformer.requirements.value is not None: @@ -101,9 +101,9 @@ def map_yaml_to_files(path: Path) -> dict[Path, list[Path]]: mapping = {} for file in yml_files: mapping[file] = [] - for b in load_yaml_file(file).blocks: - if b.type == "transformer": - transformer = b + for c in load_yaml_file(file).components: + if c.type == "transformer": + transformer = c break else: continue