diff --git a/setup.py b/setup.py index 8aefcb5..2a7ec7b 100644 --- a/setup.py +++ b/setup.py @@ -50,4 +50,9 @@ package_dir={"": "src"}, python_requires=">=3.8", package_data={"glassflow": ["py.typed"]}, + entry_points={ + "console_scripts": [ + "glassflow = cli.cli:glassflow", + ], + }, ) diff --git a/src/cli/__init__.py b/src/cli/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/cli/cli.py b/src/cli/cli.py new file mode 100644 index 0000000..1dae9eb --- /dev/null +++ b/src/cli/cli.py @@ -0,0 +1,43 @@ +import click + +from .commands import get_started + + +@click.group() +def glassflow(): + """Glassflow CLI - Manage and control Glassflow SDK""" + pass + + +@click.command() +@click.argument("command", required=False) +def help(command): + """Displays help information about Glassflow CLI and its commands.""" + + commands = { + "get-started": "Initialize Glassflow with an access token.\nUsage: " + "glassflow get-started --token YOUR_TOKEN", + "help": "Shows help information.\nUsage: glassflow help [command]", + } + + if command: + if command in commands: + click.echo(f"ā„¹ļø Help for `{command}`:\n{commands[command]}") + else: + click.echo( + f"āŒ Unknown command: `{command}`. Run `glassflow help` for a " + f"list of commands." + ) + else: + click.echo("šŸ“– Glassflow CLI Help:") + for cmd, desc in commands.items(): + click.echo(f" āžœ {cmd}: {desc.splitlines()[0]}") + click.echo("\nRun `glassflow help ` for more details.") + + +# Add commands to CLI group +glassflow.add_command(get_started) +glassflow.add_command(help) + +if __name__ == "__main__": + glassflow() diff --git a/src/cli/commands/__init__.py b/src/cli/commands/__init__.py new file mode 100644 index 0000000..09e9bb2 --- /dev/null +++ b/src/cli/commands/__init__.py @@ -0,0 +1 @@ +from .get_started import get_started as get_started diff --git a/src/cli/commands/get_started.py b/src/cli/commands/get_started.py new file mode 100644 index 0000000..b43e1de --- /dev/null +++ b/src/cli/commands/get_started.py @@ -0,0 +1,107 @@ +import os + +import click +from dotenv import load_dotenv + + +@click.command() +@click.option( + "--personal-access-token", "-pat", default=None, help="Personal access token." +) +@click.option( + "--env-file", + "-e", + default=".env", + help="Path to the .env file (default: .env in current directory).", +) +def get_started(personal_access_token, env_file): + """Displays a welcome message and setup instructions.""" + + # Load token from .env if not provided in CLI + if personal_access_token is None: + if os.path.exists(env_file): + load_dotenv(env_file) # Load environment variables + personal_access_token = os.getenv("PERSONAL_ACCESS_TOKEN") + else: + click.echo("āš ļø No token provided and .env file not found!", err=True) + return + + if not personal_access_token: + click.echo("āŒ Error: Personal access token is required.", err=True) + return + + click.echo("šŸš€ Welcome to Glassflow! \n") + click.echo( + f"šŸ”‘ Using Personal Access Token: {personal_access_token[:4]}... " + f"(hidden for security)" + ) + click.echo("\nšŸ“ In this getting started guide, we will do the following:") + click.echo("1. Define a data transformation function in Python.\n") + click.echo("2. Create a pipeline with the function.\n") + click.echo("3. Send events to the pipeline.\n") + click.echo("4. Consume transformed events in real-time from the pipeline\n") + click.echo("5. Monitor the pipeline and view logs.\n") + + filename = create_transformation_function() + pipeline = create_space_pipeline(personal_access_token, filename) + send_consume_events(pipeline) + + click.echo( + "\nšŸŽ‰ Congratulations! You have successfully created a pipeline and sent" + " events to it.\n" + ) + click.echo( + "šŸ’» View the logs and monitor the Pipeline in the " + f"Glassflow Web App at https://app.glassflow.dev/pipelines/{pipeline.id}" + ) + + +def create_transformation_function(filename="transform_getting_started.py"): + file_content = """import json +import logging + +def handler(data: dict, log: logging.Logger): + log.info("Echo: " + json.dumps(data)) + data['transformed_by'] = "glassflow" + + return data +""" + with open(filename, "w") as f: + f.write(file_content) + click.echo(f"āœ… Transformation function created in {filename}") + click.echo("The transformation function is:\n") + click.echo(file_content) + click.echo("šŸ“ You can modify the transformation function in the file.") + return filename + + +def create_space_pipeline(personal_access_token, transform_filename): + import glassflow + + # create glassflow client to interact with GlassFlow + client = glassflow.GlassFlowClient(personal_access_token=personal_access_token) + example_space = client.create_space(name="getting-started") + pipeline = client.create_pipeline( + name="getting-started-pipeline", + transformation_file=transform_filename, + space_id=example_space.id, + ) + click.echo(f"āœ… Created a pipeline with pipeline_id {pipeline.id}") + return pipeline + + +def send_consume_events(pipeline): + click.echo("šŸ”„ Sending some generated events to pipeline .....") + data_source = pipeline.get_source() + for i in range(10): + event = {"data": f"hello GF {i}"} + res = data_source.publish(event) + if res.status_code == 200: + click.echo(f"Sent event: {event}") + + click.echo("šŸ“” Consuming transformed events from the pipeline") + data_sink = pipeline.get_sink() + for _ in range(10): + resp = data_sink.consume() + if resp.status_code == 200: + click.echo(f"Consumed event: {resp.event()} ") diff --git a/src/glassflow/models/responses/pipeline.py b/src/glassflow/models/responses/pipeline.py index 1ab8441..c794516 100644 --- a/src/glassflow/models/responses/pipeline.py +++ b/src/glassflow/models/responses/pipeline.py @@ -87,7 +87,7 @@ class ConsumeEventResponse(BaseModel): def event(self): if self.body: - return self.body["response"] + return self.body.response return None diff --git a/src/glassflow/pipeline_data.py b/src/glassflow/pipeline_data.py index a0dfd1f..a4cfb16 100644 --- a/src/glassflow/pipeline_data.py +++ b/src/glassflow/pipeline_data.py @@ -83,7 +83,6 @@ def publish(self, request_body: dict) -> responses.PublishEventResponse: ClientError: If an error occurred while publishing the event """ endpoint = f"/pipelines/{self.pipeline_id}/topics/input/events" - print("request_body", request_body) http_res = self._request(method="POST", endpoint=endpoint, json=request_body) return responses.PublishEventResponse( status_code=http_res.status_code,