From 6643b5fea79c4943a2b3a23143c0d37c9732ab44 Mon Sep 17 00:00:00 2001 From: Ashish Bagri Date: Tue, 11 Feb 2025 16:24:58 +0100 Subject: [PATCH 1/5] fix pipeline response --- src/glassflow/models/responses/pipeline.py | 2 +- transform.py | 0 2 files changed, 1 insertion(+), 1 deletion(-) create mode 100644 transform.py diff --git a/src/glassflow/models/responses/pipeline.py b/src/glassflow/models/responses/pipeline.py index 1ab8441..c794516 100644 --- a/src/glassflow/models/responses/pipeline.py +++ b/src/glassflow/models/responses/pipeline.py @@ -87,7 +87,7 @@ class ConsumeEventResponse(BaseModel): def event(self): if self.body: - return self.body["response"] + return self.body.response return None diff --git a/transform.py b/transform.py new file mode 100644 index 0000000..e69de29 From f1f2b8af3155845be68251545ac46857f806b0de Mon Sep 17 00:00:00 2001 From: Ashish Bagri Date: Wed, 12 Feb 2025 13:40:12 +0100 Subject: [PATCH 2/5] added a getting-started script --- setup.py | 6 ++ src/glassflow/cli.py | 122 +++++++++++++++++++++++++++++++++ src/glassflow/pipeline_data.py | 1 - transform.py | 0 4 files changed, 128 insertions(+), 1 deletion(-) create mode 100644 src/glassflow/cli.py delete mode 100644 transform.py diff --git a/setup.py b/setup.py index 8aefcb5..dca41b3 100644 --- a/setup.py +++ b/setup.py @@ -1,4 +1,5 @@ import setuptools +from importlib_metadata import entry_points try: with open("README.md") as fh: @@ -50,4 +51,9 @@ package_dir={"": "src"}, python_requires=">=3.8", package_data={"glassflow": ["py.typed"]}, + entry_points={ + "console_scripts": [ + "glassflow = glassflow.cli:glassflow", + ], + }, ) diff --git a/src/glassflow/cli.py b/src/glassflow/cli.py new file mode 100644 index 0000000..b76379f --- /dev/null +++ b/src/glassflow/cli.py @@ -0,0 +1,122 @@ +import os +import click +from dotenv import load_dotenv + +def create_transformation_function(filename = "transform_gettingstarted.py"): + file_content = """import json +import logging + +def handler(data: dict, log: logging.Logger): + log.info("Echo: " + json.dumps(data)) + data['transformed_by'] = "glassflow" + + return data +""" + with open(filename, "w") as f: + f.write(file_content) + click.echo(f"āœ… Transformation function created in {filename}") + click.echo("The transformation function is:\n") + click.echo(file_content) + click.echo("šŸ“ You can modify the transformation function in the file.") + return filename + +def create_space_pipeline(personal_access_token, transform_filename): + import glassflow + # create glassflow client to interact with GlassFlow + client = glassflow.GlassFlowClient( + personal_access_token=personal_access_token) + example_space = client.create_space(name="getting-started") + pipeline = client.create_pipeline( + name="getting-started-pipeline", + transformation_file=transform_filename, + space_id=example_space.id) + click.echo(f"āœ… Created a pipeline with pipeline_id {pipeline.id}") + return pipeline + +def send_consume_events(pipeline): + click.echo("šŸ”„ Sending some generated events to pipeline .....") + data_source = pipeline.get_source() + for i in range(10): + event = {"data": "hello GF {}".format(i)} + res = data_source.publish(event) + if res.status_code == 200: + click.echo("Sent event: {event}".format(event=event)) + + click.echo("šŸ“” Consuming transformed events from the pipeline") + data_sink = pipeline.get_sink() + for i in range(10): + resp = data_sink.consume() + if resp.status_code == 200: + click.echo("Consumed event: {event} ".format(event=resp.event())) + +@click.group() +def glassflow(): + """Glassflow CLI - Manage and control Glassflow SDK""" + pass + + +@click.command() +@click.option("--personal-access-token", "-pat", default=None, help="Personal access token.") +@click.option("--env-file", "-e", default=".env", help="Path to the .env file (default: .env in current directory).") +def get_started(personal_access_token, env_file): + """Displays a welcome message and setup instructions.""" + + # Load token from .env if not provided in CLI + if personal_access_token is None: + if os.path.exists(env_file): + load_dotenv(env_file) # Load environment variables + personal_access_token = os.getenv("PERSONAL_ACCESS_TOKEN") + else: + click.echo("āš ļø No token provided and .env file not found!", err=True) + return + + if not personal_access_token: + click.echo("āŒ Error: Personal access token is required.", err=True) + return + + click.echo("šŸš€ Welcome to Glassflow! \n") + click.echo(f"šŸ”‘ Using Personal Access Token: {personal_access_token[:4]}... (hidden for security)") + click.echo("\nšŸ“ In this getting started guide, we will do the following:") + click.echo("1. Define a data transformation function in Python.\n") + click.echo("2. Create a pipeline with the function.\n") + click.echo("3. Send events to the pipeline.\n") + click.echo("4. Consume transformed events in real-time from the pipeline\n") + click.echo("5. Monitor the pipeline and view logs.\n") + + filename = create_transformation_function() + pipeline = create_space_pipeline(personal_access_token, filename) + send_consume_events(pipeline) + + click.echo("\nšŸŽ‰ Congratulations! You have successfully created a pipeline and sent events to it.\n") + click.echo("šŸ’» View the logs and monitor the Pipeline in the " + "Glassflow Web App at https://app.glassflow.dev/pipelines/{pipeline_id}".format(pipeline_id=pipeline.id)) + + +@click.command() +@click.argument("command", required=False) +def help(command): + """Displays help information about Glassflow CLI and its commands.""" + + commands = { + "get-started": "Initialize Glassflow with an access token.\nUsage: glassflow get-started --token YOUR_TOKEN", + "help": "Shows help information.\nUsage: glassflow help [command]", + } + + if command: + if command in commands: + click.echo(f"ā„¹ļø Help for `{command}`:\n{commands[command]}") + else: + click.echo(f"āŒ Unknown command: `{command}`. Run `glassflow help` for a list of commands.") + else: + click.echo("šŸ“– Glassflow CLI Help:") + for cmd, desc in commands.items(): + click.echo(f" āžœ {cmd}: {desc.splitlines()[0]}") + click.echo("\nRun `glassflow help ` for more details.") + + +# Add commands to CLI group +glassflow.add_command(get_started) +glassflow.add_command(help) + +if __name__ == "__main__": + glassflow() diff --git a/src/glassflow/pipeline_data.py b/src/glassflow/pipeline_data.py index a0dfd1f..a4cfb16 100644 --- a/src/glassflow/pipeline_data.py +++ b/src/glassflow/pipeline_data.py @@ -83,7 +83,6 @@ def publish(self, request_body: dict) -> responses.PublishEventResponse: ClientError: If an error occurred while publishing the event """ endpoint = f"/pipelines/{self.pipeline_id}/topics/input/events" - print("request_body", request_body) http_res = self._request(method="POST", endpoint=endpoint, json=request_body) return responses.PublishEventResponse( status_code=http_res.status_code, diff --git a/transform.py b/transform.py deleted file mode 100644 index e69de29..0000000 From 93e416d2e15f66c9724b3672704a77bb27d47b91 Mon Sep 17 00:00:00 2001 From: Pablo Pardo Garcia Date: Wed, 12 Feb 2025 18:29:59 +0100 Subject: [PATCH 3/5] format code --- setup.py | 1 - src/glassflow/cli.py | 57 +++++++++++++++++++++++++++++++------------- 2 files changed, 41 insertions(+), 17 deletions(-) diff --git a/setup.py b/setup.py index dca41b3..278064b 100644 --- a/setup.py +++ b/setup.py @@ -1,5 +1,4 @@ import setuptools -from importlib_metadata import entry_points try: with open("README.md") as fh: diff --git a/src/glassflow/cli.py b/src/glassflow/cli.py index b76379f..db68dfb 100644 --- a/src/glassflow/cli.py +++ b/src/glassflow/cli.py @@ -1,8 +1,10 @@ import os + import click from dotenv import load_dotenv -def create_transformation_function(filename = "transform_gettingstarted.py"): + +def create_transformation_function(filename="transform_gettingstarted.py"): file_content = """import json import logging @@ -20,34 +22,38 @@ def handler(data: dict, log: logging.Logger): click.echo("šŸ“ You can modify the transformation function in the file.") return filename + def create_space_pipeline(personal_access_token, transform_filename): import glassflow + # create glassflow client to interact with GlassFlow - client = glassflow.GlassFlowClient( - personal_access_token=personal_access_token) + client = glassflow.GlassFlowClient(personal_access_token=personal_access_token) example_space = client.create_space(name="getting-started") pipeline = client.create_pipeline( name="getting-started-pipeline", transformation_file=transform_filename, - space_id=example_space.id) + space_id=example_space.id, + ) click.echo(f"āœ… Created a pipeline with pipeline_id {pipeline.id}") return pipeline + def send_consume_events(pipeline): click.echo("šŸ”„ Sending some generated events to pipeline .....") data_source = pipeline.get_source() for i in range(10): - event = {"data": "hello GF {}".format(i)} + event = {"data": f"hello GF {i}"} res = data_source.publish(event) if res.status_code == 200: - click.echo("Sent event: {event}".format(event=event)) + click.echo(f"Sent event: {event}") click.echo("šŸ“” Consuming transformed events from the pipeline") data_sink = pipeline.get_sink() - for i in range(10): + for _ in range(10): resp = data_sink.consume() if resp.status_code == 200: - click.echo("Consumed event: {event} ".format(event=resp.event())) + click.echo(f"Consumed event: {resp.event()} ") + @click.group() def glassflow(): @@ -56,8 +62,15 @@ def glassflow(): @click.command() -@click.option("--personal-access-token", "-pat", default=None, help="Personal access token.") -@click.option("--env-file", "-e", default=".env", help="Path to the .env file (default: .env in current directory).") +@click.option( + "--personal-access-token", "-pat", default=None, help="Personal access token." +) +@click.option( + "--env-file", + "-e", + default=".env", + help="Path to the .env file (default: .env in current directory).", +) def get_started(personal_access_token, env_file): """Displays a welcome message and setup instructions.""" @@ -75,7 +88,10 @@ def get_started(personal_access_token, env_file): return click.echo("šŸš€ Welcome to Glassflow! \n") - click.echo(f"šŸ”‘ Using Personal Access Token: {personal_access_token[:4]}... (hidden for security)") + click.echo( + f"šŸ”‘ Using Personal Access Token: {personal_access_token[:4]}... " + f"(hidden for security)" + ) click.echo("\nšŸ“ In this getting started guide, we will do the following:") click.echo("1. Define a data transformation function in Python.\n") click.echo("2. Create a pipeline with the function.\n") @@ -87,9 +103,14 @@ def get_started(personal_access_token, env_file): pipeline = create_space_pipeline(personal_access_token, filename) send_consume_events(pipeline) - click.echo("\nšŸŽ‰ Congratulations! You have successfully created a pipeline and sent events to it.\n") - click.echo("šŸ’» View the logs and monitor the Pipeline in the " - "Glassflow Web App at https://app.glassflow.dev/pipelines/{pipeline_id}".format(pipeline_id=pipeline.id)) + click.echo( + "\nšŸŽ‰ Congratulations! You have successfully created a pipeline and sent" + " events to it.\n" + ) + click.echo( + "šŸ’» View the logs and monitor the Pipeline in the " + f"Glassflow Web App at https://app.glassflow.dev/pipelines/{pipeline.id}" + ) @click.command() @@ -98,7 +119,8 @@ def help(command): """Displays help information about Glassflow CLI and its commands.""" commands = { - "get-started": "Initialize Glassflow with an access token.\nUsage: glassflow get-started --token YOUR_TOKEN", + "get-started": "Initialize Glassflow with an access token.\nUsage: " + "glassflow get-started --token YOUR_TOKEN", "help": "Shows help information.\nUsage: glassflow help [command]", } @@ -106,7 +128,10 @@ def help(command): if command in commands: click.echo(f"ā„¹ļø Help for `{command}`:\n{commands[command]}") else: - click.echo(f"āŒ Unknown command: `{command}`. Run `glassflow help` for a list of commands.") + click.echo( + f"āŒ Unknown command: `{command}`. Run `glassflow help` for a " + f"list of commands." + ) else: click.echo("šŸ“– Glassflow CLI Help:") for cmd, desc in commands.items(): From 2b842df2764ebb27759b7668ab63805d39b03490 Mon Sep 17 00:00:00 2001 From: Pablo Pardo Garcia Date: Wed, 12 Feb 2025 18:54:27 +0100 Subject: [PATCH 4/5] move cli code to separated module --- setup.py | 2 +- src/cli/__init__.py | 0 src/cli/cli.py | 44 ++++++ src/cli/commands/__init__.py | 1 + .../cli.py => cli/commands/get_started.py} | 138 +++++++----------- 5 files changed, 95 insertions(+), 90 deletions(-) create mode 100644 src/cli/__init__.py create mode 100644 src/cli/cli.py create mode 100644 src/cli/commands/__init__.py rename src/{glassflow/cli.py => cli/commands/get_started.py} (74%) diff --git a/setup.py b/setup.py index 278064b..2a7ec7b 100644 --- a/setup.py +++ b/setup.py @@ -52,7 +52,7 @@ package_data={"glassflow": ["py.typed"]}, entry_points={ "console_scripts": [ - "glassflow = glassflow.cli:glassflow", + "glassflow = cli.cli:glassflow", ], }, ) diff --git a/src/cli/__init__.py b/src/cli/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/cli/cli.py b/src/cli/cli.py new file mode 100644 index 0000000..2e9669e --- /dev/null +++ b/src/cli/cli.py @@ -0,0 +1,44 @@ +import click + +from .commands import get_started + + +@click.group() +def glassflow(): + """Glassflow CLI - Manage and control Glassflow SDK""" + pass + + + +@click.command() +@click.argument("command", required=False) +def help(command): + """Displays help information about Glassflow CLI and its commands.""" + + commands = { + "get-started": "Initialize Glassflow with an access token.\nUsage: " + "glassflow get-started --token YOUR_TOKEN", + "help": "Shows help information.\nUsage: glassflow help [command]", + } + + if command: + if command in commands: + click.echo(f"ā„¹ļø Help for `{command}`:\n{commands[command]}") + else: + click.echo( + f"āŒ Unknown command: `{command}`. Run `glassflow help` for a " + f"list of commands." + ) + else: + click.echo("šŸ“– Glassflow CLI Help:") + for cmd, desc in commands.items(): + click.echo(f" āžœ {cmd}: {desc.splitlines()[0]}") + click.echo("\nRun `glassflow help ` for more details.") + + +# Add commands to CLI group +glassflow.add_command(get_started) +glassflow.add_command(help) + +if __name__ == "__main__": + glassflow() diff --git a/src/cli/commands/__init__.py b/src/cli/commands/__init__.py new file mode 100644 index 0000000..9590477 --- /dev/null +++ b/src/cli/commands/__init__.py @@ -0,0 +1 @@ +from .get_started import get_started as get_started \ No newline at end of file diff --git a/src/glassflow/cli.py b/src/cli/commands/get_started.py similarity index 74% rename from src/glassflow/cli.py rename to src/cli/commands/get_started.py index db68dfb..b43e1de 100644 --- a/src/glassflow/cli.py +++ b/src/cli/commands/get_started.py @@ -4,63 +4,6 @@ from dotenv import load_dotenv -def create_transformation_function(filename="transform_gettingstarted.py"): - file_content = """import json -import logging - -def handler(data: dict, log: logging.Logger): - log.info("Echo: " + json.dumps(data)) - data['transformed_by'] = "glassflow" - - return data -""" - with open(filename, "w") as f: - f.write(file_content) - click.echo(f"āœ… Transformation function created in {filename}") - click.echo("The transformation function is:\n") - click.echo(file_content) - click.echo("šŸ“ You can modify the transformation function in the file.") - return filename - - -def create_space_pipeline(personal_access_token, transform_filename): - import glassflow - - # create glassflow client to interact with GlassFlow - client = glassflow.GlassFlowClient(personal_access_token=personal_access_token) - example_space = client.create_space(name="getting-started") - pipeline = client.create_pipeline( - name="getting-started-pipeline", - transformation_file=transform_filename, - space_id=example_space.id, - ) - click.echo(f"āœ… Created a pipeline with pipeline_id {pipeline.id}") - return pipeline - - -def send_consume_events(pipeline): - click.echo("šŸ”„ Sending some generated events to pipeline .....") - data_source = pipeline.get_source() - for i in range(10): - event = {"data": f"hello GF {i}"} - res = data_source.publish(event) - if res.status_code == 200: - click.echo(f"Sent event: {event}") - - click.echo("šŸ“” Consuming transformed events from the pipeline") - data_sink = pipeline.get_sink() - for _ in range(10): - resp = data_sink.consume() - if resp.status_code == 200: - click.echo(f"Consumed event: {resp.event()} ") - - -@click.group() -def glassflow(): - """Glassflow CLI - Manage and control Glassflow SDK""" - pass - - @click.command() @click.option( "--personal-access-token", "-pat", default=None, help="Personal access token." @@ -113,35 +56,52 @@ def get_started(personal_access_token, env_file): ) -@click.command() -@click.argument("command", required=False) -def help(command): - """Displays help information about Glassflow CLI and its commands.""" - - commands = { - "get-started": "Initialize Glassflow with an access token.\nUsage: " - "glassflow get-started --token YOUR_TOKEN", - "help": "Shows help information.\nUsage: glassflow help [command]", - } - - if command: - if command in commands: - click.echo(f"ā„¹ļø Help for `{command}`:\n{commands[command]}") - else: - click.echo( - f"āŒ Unknown command: `{command}`. Run `glassflow help` for a " - f"list of commands." - ) - else: - click.echo("šŸ“– Glassflow CLI Help:") - for cmd, desc in commands.items(): - click.echo(f" āžœ {cmd}: {desc.splitlines()[0]}") - click.echo("\nRun `glassflow help ` for more details.") - - -# Add commands to CLI group -glassflow.add_command(get_started) -glassflow.add_command(help) - -if __name__ == "__main__": - glassflow() +def create_transformation_function(filename="transform_getting_started.py"): + file_content = """import json +import logging + +def handler(data: dict, log: logging.Logger): + log.info("Echo: " + json.dumps(data)) + data['transformed_by'] = "glassflow" + + return data +""" + with open(filename, "w") as f: + f.write(file_content) + click.echo(f"āœ… Transformation function created in {filename}") + click.echo("The transformation function is:\n") + click.echo(file_content) + click.echo("šŸ“ You can modify the transformation function in the file.") + return filename + + +def create_space_pipeline(personal_access_token, transform_filename): + import glassflow + + # create glassflow client to interact with GlassFlow + client = glassflow.GlassFlowClient(personal_access_token=personal_access_token) + example_space = client.create_space(name="getting-started") + pipeline = client.create_pipeline( + name="getting-started-pipeline", + transformation_file=transform_filename, + space_id=example_space.id, + ) + click.echo(f"āœ… Created a pipeline with pipeline_id {pipeline.id}") + return pipeline + + +def send_consume_events(pipeline): + click.echo("šŸ”„ Sending some generated events to pipeline .....") + data_source = pipeline.get_source() + for i in range(10): + event = {"data": f"hello GF {i}"} + res = data_source.publish(event) + if res.status_code == 200: + click.echo(f"Sent event: {event}") + + click.echo("šŸ“” Consuming transformed events from the pipeline") + data_sink = pipeline.get_sink() + for _ in range(10): + resp = data_sink.consume() + if resp.status_code == 200: + click.echo(f"Consumed event: {resp.event()} ") From c16778cf52676a40196cb919a43c33e678da3eee Mon Sep 17 00:00:00 2001 From: Pablo Pardo Garcia Date: Wed, 12 Feb 2025 18:55:25 +0100 Subject: [PATCH 5/5] format code --- src/cli/cli.py | 1 - src/cli/commands/__init__.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/cli/cli.py b/src/cli/cli.py index 2e9669e..1dae9eb 100644 --- a/src/cli/cli.py +++ b/src/cli/cli.py @@ -9,7 +9,6 @@ def glassflow(): pass - @click.command() @click.argument("command", required=False) def help(command): diff --git a/src/cli/commands/__init__.py b/src/cli/commands/__init__.py index 9590477..09e9bb2 100644 --- a/src/cli/commands/__init__.py +++ b/src/cli/commands/__init__.py @@ -1 +1 @@ -from .get_started import get_started as get_started \ No newline at end of file +from .get_started import get_started as get_started