Skip to content

Getting started #41

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,9 @@
package_dir={"": "src"},
python_requires=">=3.8",
package_data={"glassflow": ["py.typed"]},
entry_points={
"console_scripts": [
"glassflow = cli.cli:glassflow",
],
},
)
Empty file added src/cli/__init__.py
Empty file.
43 changes: 43 additions & 0 deletions src/cli/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import click

from .commands import get_started


@click.group()
def glassflow():
"""Glassflow CLI - Manage and control Glassflow SDK"""
pass


@click.command()
@click.argument("command", required=False)
def help(command):
"""Displays help information about Glassflow CLI and its commands."""

commands = {
"get-started": "Initialize Glassflow with an access token.\nUsage: "
"glassflow get-started --token YOUR_TOKEN",
"help": "Shows help information.\nUsage: glassflow help [command]",
}

if command:
if command in commands:
click.echo(f"ℹ️ Help for `{command}`:\n{commands[command]}")
else:
click.echo(
f"❌ Unknown command: `{command}`. Run `glassflow help` for a "
f"list of commands."
)
else:
click.echo("📖 Glassflow CLI Help:")
for cmd, desc in commands.items():
click.echo(f" ➜ {cmd}: {desc.splitlines()[0]}")
click.echo("\nRun `glassflow help <command>` for more details.")


# Add commands to CLI group
glassflow.add_command(get_started)
glassflow.add_command(help)

if __name__ == "__main__":
glassflow()
1 change: 1 addition & 0 deletions src/cli/commands/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .get_started import get_started as get_started
107 changes: 107 additions & 0 deletions src/cli/commands/get_started.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import os

import click
from dotenv import load_dotenv


@click.command()
@click.option(
"--personal-access-token", "-pat", default=None, help="Personal access token."
)
@click.option(
"--env-file",
"-e",
default=".env",
help="Path to the .env file (default: .env in current directory).",
)
def get_started(personal_access_token, env_file):
"""Displays a welcome message and setup instructions."""

# Load token from .env if not provided in CLI
if personal_access_token is None:
if os.path.exists(env_file):
load_dotenv(env_file) # Load environment variables
personal_access_token = os.getenv("PERSONAL_ACCESS_TOKEN")
else:
click.echo("⚠️ No token provided and .env file not found!", err=True)
return

if not personal_access_token:
click.echo("❌ Error: Personal access token is required.", err=True)
return

click.echo("🚀 Welcome to Glassflow! \n")
click.echo(
f"🔑 Using Personal Access Token: {personal_access_token[:4]}... "
f"(hidden for security)"
)
click.echo("\n📝 In this getting started guide, we will do the following:")
click.echo("1. Define a data transformation function in Python.\n")
click.echo("2. Create a pipeline with the function.\n")
click.echo("3. Send events to the pipeline.\n")
click.echo("4. Consume transformed events in real-time from the pipeline\n")
click.echo("5. Monitor the pipeline and view logs.\n")

filename = create_transformation_function()
pipeline = create_space_pipeline(personal_access_token, filename)
send_consume_events(pipeline)

click.echo(
"\n🎉 Congratulations! You have successfully created a pipeline and sent"
" events to it.\n"
)
click.echo(
"💻 View the logs and monitor the Pipeline in the "
f"Glassflow Web App at https://app.glassflow.dev/pipelines/{pipeline.id}"
)


def create_transformation_function(filename="transform_getting_started.py"):
file_content = """import json
import logging

def handler(data: dict, log: logging.Logger):
log.info("Echo: " + json.dumps(data))
data['transformed_by'] = "glassflow"

return data
"""
with open(filename, "w") as f:
f.write(file_content)
click.echo(f"✅ Transformation function created in {filename}")
click.echo("The transformation function is:\n")
click.echo(file_content)
click.echo("📝 You can modify the transformation function in the file.")
return filename


def create_space_pipeline(personal_access_token, transform_filename):
import glassflow

# create glassflow client to interact with GlassFlow
client = glassflow.GlassFlowClient(personal_access_token=personal_access_token)
example_space = client.create_space(name="getting-started")
pipeline = client.create_pipeline(
name="getting-started-pipeline",
transformation_file=transform_filename,
space_id=example_space.id,
)
click.echo(f"✅ Created a pipeline with pipeline_id {pipeline.id}")
return pipeline


def send_consume_events(pipeline):
click.echo("🔄 Sending some generated events to pipeline .....")
data_source = pipeline.get_source()
for i in range(10):
event = {"data": f"hello GF {i}"}
res = data_source.publish(event)
if res.status_code == 200:
click.echo(f"Sent event: {event}")

click.echo("📡 Consuming transformed events from the pipeline")
data_sink = pipeline.get_sink()
for _ in range(10):
resp = data_sink.consume()
if resp.status_code == 200:
click.echo(f"Consumed event: {resp.event()} ")
2 changes: 1 addition & 1 deletion src/glassflow/models/responses/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class ConsumeEventResponse(BaseModel):

def event(self):
if self.body:
return self.body["response"]
return self.body.response
return None


Expand Down
1 change: 0 additions & 1 deletion src/glassflow/pipeline_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ def publish(self, request_body: dict) -> responses.PublishEventResponse:
ClientError: If an error occurred while publishing the event
"""
endpoint = f"/pipelines/{self.pipeline_id}/topics/input/events"
print("request_body", request_body)
http_res = self._request(method="POST", endpoint=endpoint, json=request_body)
return responses.PublishEventResponse(
status_code=http_res.status_code,
Expand Down