diff --git a/.github/workflows/on_pr.yaml b/.github/workflows/on_pr.yaml
index dbc6786..5584701 100644
--- a/.github/workflows/on_pr.yaml
+++ b/.github/workflows/on_pr.yaml
@@ -4,6 +4,12 @@ on:
pull_request:
branches:
- main
+ - release/*
+
+permissions:
+ contents: write
+ checks: write
+ pull-requests: write
jobs:
tests:
@@ -24,11 +30,16 @@ jobs:
- name: Install dependencies
run: pip install -e .[dev]
- - name: Run Integration Tests
- run: pytest tests/glassflow/integration_tests/
+ - name: Run Tests
+ run: pytest
env:
- PIPELINE_ID: ${{ secrets.PIPELINE_ID }}
- PIPELINE_ACCESS_TOKEN: ${{ secrets.PIPELINE_ACCESS_TOKEN }}
+ PERSONAL_ACCESS_TOKEN: ${{ secrets.INTEGRATION_PERSONAL_ACCESS_TOKEN }}
+
+ - name: Upload coverage report
+ uses: actions/upload-artifact@v4
+ with:
+ name: coverageReport
+ path: tests/reports/
checks:
name: Run code checks
@@ -48,8 +59,46 @@ jobs:
- name: Install dependencies
run: pip install -e .[dev]
- - name: Run isort
- run: isort .
-
- - name: Run linter and code formatter
+ - name: Run ruff linter checks
run: ruff check .
+
+ - name: Run ruff formatting checks
+ run: ruff format --check .
+
+ coverage:
+ runs-on: ubuntu-latest
+ needs: [tests]
+ steps:
+ - uses: actions/checkout@v3
+ with:
+ persist-credentials: false # otherwise, the token used is the GITHUB_TOKEN, instead of your personal token
+ fetch-depth: 0 # otherwise, you will fail to push refs to dest repo
+
+ - name: Download coverage report
+ uses: actions/download-artifact@v4
+ with:
+ name: coverageReport
+
+ - name: Pytest coverage comment
+ id: coverageComment
+ uses: MishaKav/pytest-coverage-comment@main
+ with:
+ pytest-xml-coverage-path: ./coverage.xml
+ junitxml-path: ./pytest.xml
+
+ - name: Update Readme with Coverage Html
+ if: ${{ github.ref == 'refs/heads/main' }}
+ run: |
+ sed -i '//,//c\\n\${{ steps.coverageComment.outputs.coverageHtml }}\n' ./README.md
+
+ - name: Commit & Push changes to Readme
+ id: commit
+ if: ${{ github.ref == 'refs/heads/main' }}
+ run: |
+ git config --global user.name 'GitHub Actions Workflow glassflow-python-sdk/.github/workflows/on_pr.yaml'
+ git config --global user.email 'glassflow-actions-workflow@users.noreply.github.com'
+
+ git add README.md
+ git commit -m "Update coverage on Readme"
+
+ git push origin master
diff --git a/.gitignore b/.gitignore
index af095a4..249da53 100644
--- a/.gitignore
+++ b/.gitignore
@@ -10,4 +10,6 @@ site**
.pypirc
dist/
build
-.env
\ No newline at end of file
+.env
+tests/reports
+.coverage
\ No newline at end of file
diff --git a/README.md b/README.md
index ea291a3..f5ee09c 100644
--- a/README.md
+++ b/README.md
@@ -8,6 +8,8 @@
+
+
# GlassFlow Python SDK
@@ -22,11 +24,12 @@ You can install the GlassFlow Python SDK using pip:
pip install glassflow
```
-## Available Operations
+## Data Operations
* [publish](#publish) - Publish a new event into the pipeline
* [consume](#consume) - Consume the transformed event from the pipeline
* [consume failed](#consume-failed) - Consume the events that failed from the pipeline
+* [validate credentials](#validate-credentials) - Validate pipeline credentials
## publish
@@ -36,12 +39,11 @@ Publish a new event into the pipeline
### Example Usage
```python
-import glassflow
+from glassflow import PipelineDataSource
-client = glassflow.GlassFlowClient()
-pipeline_client = client.pipeline_client(pipeline_id="")
+```
+
+Now you can perform CRUD operations on your pipelines:
+
+* [list_pipelines](#list_pipelines) - Returns the list of pipelines available
+* [get_pipeline](#get_pipeline) - Returns a pipeline object from a given pipeline ID
+* [create](#create) - Create a new pipeline
+* [delete](#delete) - Delete an existing pipeline
+
+## list_pipelines
+
+Returns information about the available pipelines. It can be restricted to a
+specific space by passing the `space_id`.
+
+### Example Usage
+
+```python
+from glassflow import GlassFlowClient
+
+client = GlassFlowClient(personal_access_token="")
+res = client.list_pipelines()
+```
+
+## get_pipeline
+
+Gets information about a pipeline from a given pipeline ID. It returns a Pipeline object
+which can be used manage the Pipeline.
+
+### Example Usage
+
+```python
+from glassflow import GlassFlowClient
+
+client = GlassFlowClient(personal_access_token="")
+pipeline = client.get_pipeline(pipeline_id="")
+
+print("Name:", pipeline.name)
```
+## create
+
+The Pipeline object has a create method that creates a new GlassFlow pipeline.
+
+### Example Usage
+
+```python
+from glassflow import Pipeline
+
+pipeline = Pipeline(
+ name="",
+ transformation_file="path/to/transformation.py",
+ space_id="",
+ personal_access_token=""
+).create()
+```
+
+In the next example we create a pipeline with Google PubSub source
+and a webhook sink:
+
+```python
+from glassflow import Pipeline
+
+pipeline = Pipeline(
+ name="",
+ transformation_file="path/to/transformation.py",
+ space_id="",
+ personal_access_token="",
+ source_kind="google_pubsub",
+ source_config={
+ "project_id": "",
+ "subscription_id": "",
+ "credentials_json": ""
+ },
+ sink_kind="webhook",
+ sink_config={
+ "url": "",
+ "method": "",
+ "headers": [{"header1": "header1_value"}]
+ }
+).create()
+```
+
+## delete
+
+The Pipeline object has a delete method to delete a pipeline
+
+### Example Usage
+
+```python
+from glassflow import Pipeline
+
+pipeline = Pipeline(
+ name="",
+ transformation_file="path/to/transformation.py",
+ space_id="",
+ personal_access_token=""
+).create()
+
+pipeline.delete()
+```
## Quickstart
diff --git a/USAGE.md b/USAGE.md
index 5c58996..75634e0 100644
--- a/USAGE.md
+++ b/USAGE.md
@@ -1,11 +1,13 @@
```python
import glassflow
-client = glassflow.GlassFlowClient()
-pipeline = client.pipeline_client(pipeline_id="", pipeline_access_token="")
+source = glassflow.PipelineDataSource(pipeline_id="", pipeline_access_token="")
data = {
"name": "Hello World",
"id": 1
}
-res = pipeline.publish(request_body=data)
+source_res = source.publish(request_body=data)
+
+sink = glassflow.PipelineDataSink(pipeline_id="", pipeline_access_token="")
+sink_res = sink.consume()
```
\ No newline at end of file
diff --git a/docs/index.md b/docs/index.md
index f7a7894..1c8d43c 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -1,6 +1,6 @@
# Welcome to GlassFlow Python SDK Docs
-The [GlassFlow](https://www.glassflow.dev/) Python SDK provides a convenient way to interact with the GlassFlow API in your Python applications. The SDK is used to publish and consume events to your [GlassFlow pipelines](https://learn.glassflow.dev/docs/concepts/pipeline-configuration).
+The [GlassFlow](https://www.glassflow.dev/) Python SDK provides a convenient way to interact with the GlassFlow API in your Python applications. The SDK is used to publish and consume events to your [GlassFlow pipelines](https://docs.glassflow.dev/concepts/pipeline).
## Installation
@@ -11,101 +11,12 @@ You can install the GlassFlow Python SDK using pip:
pip install glassflow
```
-## Available Operations
+## Content
-* [publish](#publish) - Publish a new event into the pipeline
-* [consume](#consume) - Consume the transformed event from the pipeline
-* [consume failed](#consume-failed) - Consume the events that failed from the pipeline
-* [is access token valid](#is-access-token-valid) - Validates Pipeline Access Token
-* [is_valid](#is-valid) - Check if pipeline credentials are valid
+* [Publish and Consume Events](publish_and_consume.md) - Learn how to publish and consume events to/from a pipeline.
+* [Pipeline Management](pipeline_management.md) - Learn how to manage your pipelines from the SDK.
-## publish
-
-Publish a new event into the pipeline
-
-### Example Usage
-
-```python
-import glassflow
-
-client = glassflow.GlassFlowClient()
-pipeline_client = client.pipeline_client(pipeline_id="")
+```
+
+Here is a list of operations you can do with the `GlassFlowClient`:
+
+* [List Pipelines](#list-pipelines) - Returns a list with all your pipelines
+* [Get Pipeline](#get-pipeline) - Returns a pipeline object from a given pipeline ID
+* [Create Pipeline](#create-pipeline) - Create a new pipeline
+* [List Spaces](#list-spaces) - Returns a list with all your spaces
+* [Create Space](#create-space) - Create a new space
+
+You can also interact directly with the `Pipeline` or `Space` objects. They
+allow for some extra functionalities like delete or update.
+
+* [Update Pipeline](#update-pipeline) - Update an existing pipeline
+* [Delete Pipeline](#delete-pipeline) - Delete an existing pipeline
+* [Delete Space](#delete-space) - Delete an existing pipeline
+
+## List Pipelines
+
+Returns information about the available pipelines. It can be restricted to a
+specific space by passing the `space_id`.
+
+### Example Usage
+
+```python
+from glassflow import GlassFlowClient
+
+client = GlassFlowClient(personal_access_token="")
+res = client.list_pipelines()
+```
+
+## Get Pipeline
+
+Gets information about a pipeline from a given pipeline ID. It returns a Pipeline object
+which can be used manage the Pipeline.
+
+### Example Usage
+
+```python
+from glassflow import GlassFlowClient
+
+client = GlassFlowClient(personal_access_token="")
+pipeline = client.get_pipeline(pipeline_id="")
+
+print("Name:", pipeline.name)
+```
+
+## Create Pipeline
+
+Creates a new pipeline and returns a `Pipeline` object.
+
+### Example Usage
+
+```python
+from glassflow import GlassFlowClient
+
+client = GlassFlowClient(personal_access_token="")
+pipeline = client.create_pipeline(
+ name="MyFirstPipeline",
+ space_id="",
+ transformation_file="path/to/transformation.py"
+)
+
+print("Pipeline ID:", pipeline.id)
+```
+
+In the next example we create a pipeline with Google PubSub source
+and a webhook sink:
+
+```python
+from glassflow import GlassFlowClient
+
+client = GlassFlowClient(personal_access_token="")
+
+pipeline = client.create_pipeline(
+ name="MyFirstPipeline",
+ space_id="",
+ transformation_file="path/to/transformation.py",
+ source_kind="google_pubsub",
+ source_config={
+ "project_id": "",
+ "subscription_id": "",
+ "credentials_json": ""
+ },
+ sink_kind="webhook",
+ sink_config={
+ "url": "www.my-webhook-url.com",
+ "method": "POST",
+ "headers": [{"header1": "header1_value"}]
+ }
+)
+```
+
+## Update Pipeline
+
+The Pipeline object has an update method.
+
+### Example Usage
+
+```python
+from glassflow import Pipeline
+
+pipeline = Pipeline(
+ id="",
+ personal_access_token="",
+)
+
+pipeline.update(
+ transformation_file="path/to/new/transformation.py",
+ name="NewPipelineName",
+)
+```
+
+## Delete Pipeline
+
+The Pipeline object has a delete method to delete a pipeline
+
+### Example Usage
+
+```python
+from glassflow import Pipeline
+
+pipeline = Pipeline(
+ id="",
+ personal_access_token=""
+)
+pipeline.delete()
+```
+
+## List Spaces
+
+Returns information about the available spaces.
+
+### Example Usage
+
+```python
+from glassflow import GlassFlowClient
+
+client = GlassFlowClient(personal_access_token="")
+res = client.list_spaces()
+```
+
+
+## Create Space
+
+Creates a new space and returns a `Space` object.
+
+```python
+from glassflow import GlassFlowClient
+
+client = GlassFlowClient(personal_access_token="")
+space = client.create_space(name="MyFirstSpace")
+```
+
+
+## Delete Space
+
+The Space object has a delete method to delete a space
+
+### Example Usage
+
+```python
+from glassflow import Space
+
+space = Space(
+ id="",
+ personal_access_token=""
+)
+space.delete()
+```
\ No newline at end of file
diff --git a/docs/publish_and_consume.md b/docs/publish_and_consume.md
new file mode 100644
index 0000000..223b7f4
--- /dev/null
+++ b/docs/publish_and_consume.md
@@ -0,0 +1,80 @@
+# Publish and Consume Events
+
+
+* [Publish](#publish) - Publish a new event into the pipeline from a data source
+* [Consume](#consume) - Consume the transformed event from the pipeline in a data sink
+* [Consume Failed](#consume-failed) - Consume the events that failed from the pipeline in a
+* [Validate Credentials](#validate-credentials) - Validate pipeline credentials
+
+
+## Publish
+
+Publish a new event into the pipeline
+
+### Example Usage
+
+```python
+from glassflow import PipelineDataSource
+
+source = PipelineDataSource(pipeline_id="=61.0"]
-build-backend = "setuptools.build_meta"
\ No newline at end of file
+build-backend = "setuptools.build_meta"
+
+[tool.pytest.ini_options]
+addopts = """
+ --import-mode=importlib \
+ --junitxml=tests/reports/pytest.xml \
+ --cov=src/glassflow \
+ --cov-report xml:tests/reports/coverage.xml \
+ -ra -q
+"""
+testpaths = [
+ "tests",
+]
+
+[tool.coverage.html]
+skip_covered = true
+
+[tool.ruff.lint]
+select = [
+ # pycodestyle
+ "E",
+ # Pyflakes
+ "F",
+ # pyupgrade
+ "UP",
+ # flake8-bugbear
+ "B",
+ # flake8-simplify
+ "SIM",
+ # isort
+ "I",
+]
+
+[tool.ruff.lint.pydocstyle]
+convention = "google"
+
+[tool.datamodel-codegen]
+field-constraints = true
+snake-case-field = true
+strip-default-none = false
+target-python-version = "3.7"
+use-title-as-name = true
+disable-timestamp = true
+enable-version-header = true
+use-double-quotes = true
+use-subclass-enum=true
+input-file-type = "openapi"
+output-model-type = "dataclasses.dataclass"
\ No newline at end of file
diff --git a/setup.py b/setup.py
index e42e251..06b4a95 100644
--- a/setup.py
+++ b/setup.py
@@ -1,14 +1,14 @@
import setuptools
try:
- with open("README.md", "r") as fh:
+ with open("README.md") as fh:
long_description = fh.read()
except FileNotFoundError:
long_description = ""
setuptools.setup(
name="glassflow",
- version="1.0.11",
+ version="2.0.2rc7",
author="glassflow",
description="GlassFlow Python Client SDK",
url="https://learn.glassflow.dev/docs",
@@ -36,7 +36,15 @@
"python-dotenv==1.0.1",
],
extras_require={
- "dev": ["pylint==2.16.2", "pytest==8.3.2", "isort==5.13.2", "ruff==0.6.3"]
+ "dev": [
+ "pylint==2.16.2",
+ "pytest==8.3.2",
+ "pytest-cov==5.0.0",
+ "datamodel-code-generator[http]==0.26.0",
+ "requests-mock==1.12.1",
+ "isort==5.13.2",
+ "ruff==0.6.3",
+ ]
},
package_dir={"": "src"},
python_requires=">=3.8",
diff --git a/src/glassflow/__init__.py b/src/glassflow/__init__.py
index 17ec359..e3b7ab2 100644
--- a/src/glassflow/__init__.py
+++ b/src/glassflow/__init__.py
@@ -1,2 +1,7 @@
from .client import GlassFlowClient as GlassFlowClient
from .config import GlassFlowConfig as GlassFlowConfig
+from .models import errors as errors
+from .pipeline import Pipeline as Pipeline
+from .pipeline_data import PipelineDataSink as PipelineDataSink
+from .pipeline_data import PipelineDataSource as PipelineDataSource
+from .space import Space as Space
diff --git a/src/glassflow/api_client.py b/src/glassflow/api_client.py
new file mode 100644
index 0000000..369362a
--- /dev/null
+++ b/src/glassflow/api_client.py
@@ -0,0 +1,94 @@
+from __future__ import annotations
+
+import sys
+
+import requests as requests_http
+
+from .config import GlassFlowConfig
+from .models import errors
+from .models.operations.base import BaseRequest, BaseResponse
+from .utils import utils as utils
+
+
+class APIClient:
+ glassflow_config = GlassFlowConfig()
+
+ def __init__(self):
+ super().__init__()
+ self.client = requests_http.Session()
+
+ def _get_headers(
+ self, request: BaseRequest, req_content_type: str | None = None
+ ) -> dict:
+ headers = utils.get_req_specific_headers(request)
+ headers["Accept"] = "application/json"
+ headers["Gf-Client"] = self.glassflow_config.glassflow_client
+ headers["User-Agent"] = self.glassflow_config.user_agent
+ headers["Gf-Python-Version"] = (
+ f"{sys.version_info.major}."
+ f"{sys.version_info.minor}."
+ f"{sys.version_info.micro}"
+ )
+
+ if req_content_type and req_content_type not in (
+ "multipart/form-data",
+ "multipart/mixed",
+ ):
+ headers["content-type"] = req_content_type
+
+ return headers
+
+ def _request(
+ self,
+ method: str,
+ endpoint: str,
+ request: BaseRequest,
+ serialization_method: str = "json",
+ ) -> BaseResponse:
+ request_type = type(request)
+
+ url = utils.generate_url(
+ request_type,
+ self.glassflow_config.server_url,
+ endpoint,
+ request,
+ )
+
+ req_content_type, data, form = utils.serialize_request_body(
+ request=request,
+ request_type=request_type,
+ request_field_name="request_body",
+ nullable=False,
+ optional=True,
+ serialization_method=serialization_method,
+ )
+ if method == "GET":
+ data = None
+
+ headers = self._get_headers(request, req_content_type)
+ query_params = utils.get_query_params(request_type, request)
+
+ # make the request
+ http_res = self.client.request(
+ method, url=url, params=query_params, headers=headers, data=data, files=form
+ )
+ content_type = http_res.headers.get("Content-Type")
+
+ res = BaseResponse(
+ status_code=http_res.status_code,
+ content_type=content_type,
+ raw_response=http_res,
+ )
+
+ if http_res.status_code in [400, 500]:
+ out = utils.unmarshal_json(http_res.text, errors.Error)
+ out.raw_response = http_res
+ raise out
+ elif http_res.status_code == 429:
+ pass
+ elif 400 < http_res.status_code < 600:
+ raise errors.ClientError(
+ "API error occurred", http_res.status_code, http_res.text, http_res
+ )
+
+ return res
diff --git a/src/glassflow/client.py b/src/glassflow/client.py
index 632861f..06c0fb1 100644
--- a/src/glassflow/client.py
+++ b/src/glassflow/client.py
@@ -1,68 +1,202 @@
"""GlassFlow Python Client to interact with GlassFlow API"""
-import os
-from typing import Optional
+from __future__ import annotations
-import requests as requests_http
+from .api_client import APIClient
+from .models import errors, operations
+from .models.api import PipelineState
+from .pipeline import Pipeline
+from .space import Space
-from .config import GlassFlowConfig
-from .pipelines import PipelineClient
-
-class GlassFlowClient:
- """GlassFlow Client to interact with GlassFlow API and manage pipelines and other resources
+class GlassFlowClient(APIClient):
+ """
+ GlassFlow Client to interact with GlassFlow API and manage pipelines
+ and other resources
Attributes:
- rclient: requests.Session object to make HTTP requests to GlassFlow API
+ client: requests.Session object to make HTTP requests to GlassFlow API
glassflow_config: GlassFlowConfig object to store configuration
- organization_id: Organization ID of the user. If not provided, the default organization will be used
+ organization_id: Organization ID of the user. If not provided,
+ the default organization will be used
"""
- glassflow_config: GlassFlowConfig
-
- def __init__(self, organization_id: str = None) -> None:
+ def __init__(
+ self, personal_access_token: str = None, organization_id: str = None
+ ) -> None:
"""Create a new GlassFlowClient object
Args:
- organization_id: Organization ID of the user. If not provided, the default organization will be used
+ personal_access_token: GlassFlow Personal Access Token
+ organization_id: Organization ID of the user. If not provided,
+ the default organization will be used
"""
- rclient = requests_http.Session()
- self.glassflow_config = GlassFlowConfig(rclient)
+ super().__init__()
+ self.personal_access_token = personal_access_token
self.organization_id = organization_id
- def pipeline_client(
+ def get_pipeline(self, pipeline_id: str) -> Pipeline:
+ """Gets a Pipeline object from the GlassFlow API
+
+ Args:
+ pipeline_id: UUID of the pipeline
+
+ Returns:
+ Pipeline: Pipeline object from the GlassFlow API
+
+ Raises:
+ PipelineNotFoundError: Pipeline does not exist
+ UnauthorizedError: User does not have permission to perform the
+ requested operation
+ ClientError: GlassFlow Client Error
+ """
+ return Pipeline(
+ personal_access_token=self.personal_access_token, id=pipeline_id
+ ).fetch()
+
+ def create_pipeline(
self,
- pipeline_id: Optional[str] = None,
- pipeline_access_token: Optional[str] = None,
- space_id: Optional[str] = None,
- ) -> PipelineClient:
- """Create a new PipelineClient object to interact with a specific pipeline
+ name: str,
+ space_id: str,
+ transformation_file: str = None,
+ requirements: str = None,
+ source_kind: str = None,
+ source_config: dict = None,
+ sink_kind: str = None,
+ sink_config: dict = None,
+ env_vars: list[dict[str, str]] = None,
+ state: PipelineState = "running",
+ metadata: dict = None,
+ ) -> Pipeline:
+ """Creates a new GlassFlow pipeline
+
+ Args:
+ name: Name of the pipeline
+ space_id: ID of the GlassFlow Space you want to create the pipeline in
+ transformation_file: Path to file with transformation function of
+ the pipeline.
+ requirements: Requirements.txt of the pipeline
+ source_kind: Kind of source for the pipeline. If no source is
+ provided, the default source will be SDK
+ source_config: Configuration of the pipeline's source
+ sink_kind: Kind of sink for the pipeline. If no sink is provided,
+ the default sink will be SDK
+ sink_config: Configuration of the pipeline's sink
+ env_vars: Environment variables to pass to the pipeline
+ state: State of the pipeline after creation.
+ It can be either "running" or "paused"
+ metadata: Metadata of the pipeline
+
+ Returns:
+ Pipeline: New pipeline
+
+ Raises:
+ UnauthorizedError: User does not have permission to perform
+ the requested operation
+ """
+ return Pipeline(
+ name=name,
+ space_id=space_id,
+ transformation_file=transformation_file,
+ requirements=requirements,
+ source_kind=source_kind,
+ source_config=source_config,
+ sink_kind=sink_kind,
+ sink_config=sink_config,
+ env_vars=env_vars,
+ state=state,
+ metadata=metadata,
+ organization_id=self.organization_id,
+ personal_access_token=self.personal_access_token,
+ ).create()
+
+ def list_pipelines(
+ self, space_ids: list[str] | None = None
+ ) -> operations.ListPipelinesResponse:
+ """
+ Lists all pipelines in the GlassFlow API
Args:
- pipeline_id: The pipeline id to interact with
- pipeline_access_token: The access token to access the pipeline
+ space_ids: List of Space IDs of the pipelines to list.
+ If not specified, all the pipelines will be listed.
Returns:
- PipelineClient: Client object to publish and consume events from the given pipeline.
+ ListPipelinesResponse: Response object with the pipelines listed
+
+ Raises:
+ UnauthorizedError: User does not have permission to perform the
"""
- # if no pipeline_id or pipeline_access_token is provided, try to read from environment variables
- if not pipeline_id:
- pipeline_id = os.getenv("PIPELINE_ID")
- if not pipeline_access_token:
- pipeline_access_token = os.getenv("PIPELINE_ACCESS_TOKEN")
- # no pipeline_id provided explicitly or in environment variables
- if not pipeline_id:
- raise ValueError(
- "PIPELINE_ID must be set as an environment variable or provided explicitly"
+ request = operations.ListPipelinesRequest(
+ space_id=space_ids,
+ organization_id=self.organization_id,
+ personal_access_token=self.personal_access_token,
+ )
+ try:
+ res = self._request(
+ method="GET",
+ endpoint="/pipelines",
+ request=request,
)
- if not pipeline_access_token:
- raise ValueError(
- "PIPELINE_ACCESS_TOKEN must be set as an environment variable or provided explicitly"
+ res_json = res.raw_response.json()
+ except errors.ClientError as e:
+ if e.status_code == 401:
+ raise errors.UnauthorizedError(e.raw_response) from e
+ else:
+ raise e
+
+ return operations.ListPipelinesResponse(
+ content_type=res.content_type,
+ status_code=res.status_code,
+ raw_response=res.raw_response,
+ total_amount=res_json["total_amount"],
+ pipelines=res_json["pipelines"],
+ )
+
+ def list_spaces(self) -> operations.ListSpacesResponse:
+ request = operations.ListSpacesRequest(
+ organization_id=self.organization_id,
+ personal_access_token=self.personal_access_token,
+ )
+ try:
+ res = self._request(
+ method="GET",
+ endpoint="/spaces",
+ request=request,
)
+ res_json = res.raw_response.json()
+ except errors.ClientError as e:
+ if e.status_code == 401:
+ raise errors.UnauthorizedError(e.raw_response) from e
+ else:
+ raise e
- return PipelineClient(
- glassflow_client=self,
- pipeline_id=pipeline_id,
- pipeline_access_token=pipeline_access_token,
+ return operations.ListSpacesResponse(
+ content_type=res.content_type,
+ status_code=res.status_code,
+ raw_response=res.raw_response,
+ total_amount=res_json["total_amount"],
+ spaces=res_json["spaces"],
)
+
+ def create_space(
+ self,
+ name: str,
+ ) -> Space:
+ """Creates a new Space
+
+ Args:
+ name: Name of the Space
+
+ Returns:
+ Space: New space
+
+ Raises:
+ UnauthorizedError: User does not have permission to perform
+ the requested operation
+ """
+ return Space(
+ name=name,
+ personal_access_token=self.personal_access_token,
+ organization_id=self.organization_id,
+ ).create()
diff --git a/src/glassflow/config.py b/src/glassflow/config.py
index d67bd14..206a2e8 100644
--- a/src/glassflow/config.py
+++ b/src/glassflow/config.py
@@ -1,23 +1,19 @@
from dataclasses import dataclass
from importlib.metadata import version
-import requests
-
@dataclass
class GlassFlowConfig:
"""Configuration object for GlassFlowClient
Attributes:
- client: requests.Session object to interact with the GlassFlow API
server_url: The base URL of the GlassFlow API
sdk_version: The version of the GlassFlow Python SDK
user_agent: The user agent to be used in the requests
"""
- client: requests.Session
server_url: str = "https://api.glassflow.dev/v1"
sdk_version: str = version("glassflow")
- user_agent: str = "glassflow-python-sdk/{}".format(sdk_version)
- glassflow_client: str = "python-sdk/{}".format(sdk_version)
+ user_agent: str = f"glassflow-python-sdk/{sdk_version}"
+ glassflow_client: str = f"python-sdk/{sdk_version}"
diff --git a/src/glassflow/models/api/__init__.py b/src/glassflow/models/api/__init__.py
new file mode 100644
index 0000000..f84fabc
--- /dev/null
+++ b/src/glassflow/models/api/__init__.py
@@ -0,0 +1,31 @@
+from .api import (
+ CreatePipeline,
+ CreateSpace,
+ FunctionEnvironments,
+ FunctionLogEntry,
+ FunctionLogs,
+ GetDetailedSpacePipeline,
+ PipelineState,
+ SeverityCodeInput,
+ SinkConnector,
+ SourceConnector,
+ SpacePipeline,
+ SpaceScope,
+ UpdatePipeline,
+)
+
+__all__ = [
+ "CreatePipeline",
+ "FunctionEnvironments",
+ "FunctionLogEntry",
+ "FunctionLogs",
+ "GetDetailedSpacePipeline",
+ "PipelineState",
+ "SeverityCodeInput",
+ "SinkConnector",
+ "SourceConnector",
+ "SpacePipeline",
+ "UpdatePipeline",
+ "SpaceScope",
+ "CreateSpace",
+]
diff --git a/src/glassflow/models/api/api.py b/src/glassflow/models/api/api.py
new file mode 100644
index 0000000..e88dc0a
--- /dev/null
+++ b/src/glassflow/models/api/api.py
@@ -0,0 +1,377 @@
+# ruff: noqa
+# generated by datamodel-codegen:
+# filename: https://api.glassflow.dev/v1/openapi.yaml
+# version: 0.26.0
+
+from __future__ import annotations
+from dataclasses_json import dataclass_json
+
+from dataclasses import dataclass
+from enum import Enum
+from typing import Any, Dict, List, Optional, Union
+
+
+@dataclass_json
+@dataclass
+class Error:
+ detail: str
+
+
+@dataclass_json
+@dataclass
+class CreateOrganization:
+ name: str
+
+
+@dataclass_json
+@dataclass
+class Organization(CreateOrganization):
+ id: str
+
+
+@dataclass_json
+@dataclass
+class OrganizationScope(Organization):
+ role: str
+
+
+OrganizationScopes = List[OrganizationScope]
+
+
+@dataclass_json
+@dataclass
+class SignUp:
+ access_token: str
+ id_token: str
+
+
+@dataclass_json
+@dataclass
+class BasePipeline:
+ name: str
+ space_id: str
+ metadata: Dict[str, Any]
+
+
+class PipelineState(str, Enum):
+ running = "running"
+ paused = "paused"
+
+
+@dataclass_json
+@dataclass
+class FunctionEnvironment:
+ name: str
+ value: str
+
+
+FunctionEnvironments = Optional[List[FunctionEnvironment]]
+
+
+class Kind(str, Enum):
+ google_pubsub = "google_pubsub"
+
+
+@dataclass_json
+@dataclass
+class Config:
+ project_id: str
+ subscription_id: str
+ credentials_json: str
+
+
+@dataclass_json
+@dataclass
+class SourceConnector1:
+ kind: Kind
+ config: Config
+
+
+class Kind1(str, Enum):
+ amazon_sqs = "amazon_sqs"
+
+
+@dataclass_json
+@dataclass
+class Config1:
+ queue_url: str
+ aws_region: str
+ aws_access_key: str
+ aws_secret_key: str
+
+
+@dataclass_json
+@dataclass
+class SourceConnector2:
+ kind: Kind1
+ config: Config1
+
+
+SourceConnector = Optional[Union[SourceConnector1, SourceConnector2]]
+
+
+class Kind2(str, Enum):
+ webhook = "webhook"
+
+
+class Method(str, Enum):
+ get = "GET"
+ post = "POST"
+ put = "PUT"
+ patch = "PATCH"
+ delete = "DELETE"
+
+
+@dataclass_json
+@dataclass
+class Header:
+ name: str
+ value: str
+
+
+@dataclass_json
+@dataclass
+class Config2:
+ url: str
+ method: Method
+ headers: List[Header]
+
+
+@dataclass_json
+@dataclass
+class SinkConnector1:
+ kind: Kind2
+ config: Config2
+
+
+class Kind3(str, Enum):
+ clickhouse = "clickhouse"
+
+
+@dataclass_json
+@dataclass
+class Config3:
+ addr: str
+ database: str
+ username: str
+ password: str
+ table: str
+
+
+@dataclass_json
+@dataclass
+class SinkConnector2:
+ kind: Kind3
+ config: Config3
+
+
+SinkConnector = Optional[Union[SinkConnector1, SinkConnector2]]
+
+
+@dataclass_json
+@dataclass
+class Pipeline(BasePipeline):
+ id: str
+ created_at: str
+ state: PipelineState
+
+
+@dataclass_json
+@dataclass
+class SpacePipeline(Pipeline):
+ space_name: str
+
+
+@dataclass_json
+@dataclass
+class GetDetailedSpacePipeline(SpacePipeline):
+ source_connector: SourceConnector
+ sink_connector: SinkConnector
+ environments: FunctionEnvironments
+
+
+@dataclass_json
+@dataclass
+class PipelineFunctionOutput:
+ environments: FunctionEnvironments
+
+
+SpacePipelines = List[SpacePipeline]
+
+
+@dataclass_json
+@dataclass
+class CreateSpace:
+ name: str
+
+
+@dataclass_json
+@dataclass
+class UpdateSpace:
+ name: str
+
+
+@dataclass_json
+@dataclass
+class Space(CreateSpace):
+ id: str
+ created_at: str
+
+
+@dataclass_json
+@dataclass
+class SpaceScope(Space):
+ permission: str
+
+
+SpaceScopes = List[SpaceScope]
+
+
+@dataclass_json
+@dataclass
+class Payload:
+ message: str
+
+
+class SeverityCodeInput(int, Enum):
+ integer_100 = 100
+ integer_200 = 200
+ integer_400 = 400
+ integer_500 = 500
+
+
+SeverityCode = int
+
+
+@dataclass_json
+@dataclass
+class CreateAccessToken:
+ name: str
+
+
+@dataclass_json
+@dataclass
+class AccessToken(CreateAccessToken):
+ id: str
+ token: str
+ created_at: str
+
+
+AccessTokens = List[AccessToken]
+
+
+@dataclass_json
+@dataclass
+class PaginationResponse:
+ total_amount: int
+
+
+@dataclass_json
+@dataclass
+class SourceFile:
+ name: str
+ content: str
+
+
+SourceFiles = List[SourceFile]
+
+
+@dataclass_json
+@dataclass
+class EventContext:
+ request_id: str
+ receive_time: str
+ external_id: Optional[str] = None
+
+
+PersonalAccessToken = str
+
+
+@dataclass_json
+@dataclass
+class Profile:
+ id: str
+ home_organization: Organization
+ name: str
+ email: str
+ provider: str
+ external_settings: Dict[str, Any]
+ subscriber_id: str
+
+
+@dataclass_json
+@dataclass
+class ListOrganizationScopes(PaginationResponse):
+ organizations: OrganizationScopes
+
+
+@dataclass_json
+@dataclass
+class UpdatePipeline:
+ name: str
+ transformation_function: Optional[str] = None
+ transformation_requirements: Optional[List[str]] = None
+ requirements_txt: Optional[str] = None
+ metadata: Optional[Dict[str, Any]] = None
+ source_connector: Optional[SourceConnector] = None
+ sink_connector: Optional[SinkConnector] = None
+ environments: Optional[FunctionEnvironments] = None
+
+
+@dataclass_json
+@dataclass
+class CreatePipeline(BasePipeline):
+ transformation_function: Optional[str] = None
+ transformation_requirements: Optional[List[str]] = None
+ requirements_txt: Optional[str] = None
+ source_connector: Optional[SourceConnector] = None
+ sink_connector: Optional[SinkConnector] = None
+ environments: Optional[FunctionEnvironments] = None
+ state: Optional[PipelineState] = None
+
+
+@dataclass_json
+@dataclass
+class ListPipelines(PaginationResponse):
+ pipelines: SpacePipelines
+
+
+@dataclass_json
+@dataclass
+class ListSpaceScopes(PaginationResponse):
+ spaces: SpaceScopes
+
+
+@dataclass_json
+@dataclass
+class FunctionLogEntry:
+ level: str
+ severity_code: SeverityCode
+ timestamp: str
+ payload: Payload
+
+
+@dataclass_json
+@dataclass
+class ListAccessTokens(PaginationResponse):
+ access_tokens: AccessTokens
+
+
+@dataclass_json
+@dataclass
+class ConsumeEvent:
+ payload: Dict[str, Any]
+ event_context: EventContext
+ status: str
+ response: Dict[str, Any]
+ req_id: Optional[str] = None
+ receive_time: Optional[str] = None
+
+
+@dataclass_json
+@dataclass
+class ListPersonalAccessTokens:
+ tokens: List[PersonalAccessToken]
+
+
+FunctionLogs = List[FunctionLogEntry]
diff --git a/src/glassflow/models/errors/__init__.py b/src/glassflow/models/errors/__init__.py
index a2e8aa4..6a8efd5 100644
--- a/src/glassflow/models/errors/__init__.py
+++ b/src/glassflow/models/errors/__init__.py
@@ -1,4 +1,21 @@
-from .clienterror import ClientError
+from .clienterror import (
+ ClientError,
+ PipelineAccessTokenInvalidError,
+ PipelineNotFoundError,
+ SpaceIsNotEmptyError,
+ SpaceNotFoundError,
+ UnauthorizedError,
+ UnknownContentTypeError,
+)
from .error import Error
-__all__ = ["Error", "ClientError"]
+__all__ = [
+ "Error",
+ "ClientError",
+ "PipelineNotFoundError",
+ "PipelineAccessTokenInvalidError",
+ "SpaceNotFoundError",
+ "UnknownContentTypeError",
+ "UnauthorizedError",
+ "SpaceIsNotEmptyError",
+]
diff --git a/src/glassflow/models/errors/clienterror.py b/src/glassflow/models/errors/clienterror.py
index d8117bd..f5ea477 100644
--- a/src/glassflow/models/errors/clienterror.py
+++ b/src/glassflow/models/errors/clienterror.py
@@ -49,3 +49,76 @@ def __str__(self):
body = f"\n{self.body}"
return f"{self.detail}: Status {self.status_code}{body}"
+
+
+class PipelineNotFoundError(ClientError):
+ """Error caused by a pipeline ID not found."""
+
+ def __init__(self, pipeline_id: str, raw_response: requests_http.Response):
+ super().__init__(
+ detail=f"Pipeline ID {pipeline_id} does not exist",
+ status_code=404,
+ body=raw_response.text,
+ raw_response=raw_response,
+ )
+
+
+class SpaceNotFoundError(ClientError):
+ """Error caused by a pipeline ID not found."""
+
+ def __init__(self, space_id: str, raw_response: requests_http.Response):
+ super().__init__(
+ detail=f"Space ID {space_id} does not exist",
+ status_code=404,
+ body=raw_response.text,
+ raw_response=raw_response,
+ )
+
+
+class UnauthorizedError(ClientError):
+ """Error caused by a user not authorized."""
+
+ def __init__(self, raw_response: requests_http.Response):
+ super().__init__(
+ detail="Unauthorized request, Personal Access Token used is invalid",
+ status_code=401,
+ body=raw_response.text,
+ raw_response=raw_response,
+ )
+
+
+class PipelineAccessTokenInvalidError(ClientError):
+ """Error caused by invalid access token."""
+
+ def __init__(self, raw_response: requests_http.Response):
+ super().__init__(
+ detail="The Pipeline Access Token used is invalid",
+ status_code=401,
+ body=raw_response.text,
+ raw_response=raw_response,
+ )
+
+
+class UnknownContentTypeError(ClientError):
+ """Error caused by an unknown content type response."""
+
+ def __init__(self, raw_response: requests_http.Response):
+ content_type = raw_response.headers.get("Content-Type")
+ super().__init__(
+ detail=f"unknown content-type received: {content_type}",
+ status_code=raw_response.status_code,
+ body=raw_response.text,
+ raw_response=raw_response,
+ )
+
+
+class SpaceIsNotEmptyError(ClientError):
+ """Error caused by trying to delete a space that is not empty."""
+
+ def __init__(self, raw_response: requests_http.Response):
+ super().__init__(
+ detail=raw_response.json()["msg"],
+ status_code=409,
+ body=raw_response.text,
+ raw_response=raw_response,
+ )
diff --git a/src/glassflow/models/operations/__init__.py b/src/glassflow/models/operations/__init__.py
index 5383ff1..5286126 100644
--- a/src/glassflow/models/operations/__init__.py
+++ b/src/glassflow/models/operations/__init__.py
@@ -1,21 +1,92 @@
-from .consumeevent import (ConsumeEventRequest, ConsumeEventResponse,
- ConsumeEventResponseBody)
-from .consumefailed import (ConsumeFailedRequest, ConsumeFailedResponse,
- ConsumeFailedResponseBody)
-from .publishevent import (PublishEventRequest, PublishEventRequestBody,
- PublishEventResponse, PublishEventResponseBody)
-from .status_access_token import StatusAccessTokenRequest
+from .access_token import ListAccessTokensRequest, StatusAccessTokenRequest
+from .artifact import (
+ GetArtifactRequest,
+ PostArtifactRequest,
+)
+from .base import (
+ BaseManagementRequest,
+ BasePipelineManagementRequest,
+ BaseRequest,
+ BaseResponse,
+ BaseSpaceManagementDataRequest,
+)
+from .consumeevent import (
+ ConsumeEventRequest,
+ ConsumeEventResponse,
+ ConsumeEventResponseBody,
+)
+from .consumefailed import (
+ ConsumeFailedRequest,
+ ConsumeFailedResponse,
+ ConsumeFailedResponseBody,
+)
+from .function import (
+ FetchFunctionRequest,
+ GetFunctionLogsRequest,
+ GetFunctionLogsResponse,
+ UpdateFunctionRequest,
+)
+from .pipeline_crud import (
+ CreatePipelineRequest,
+ CreatePipelineResponse,
+ DeletePipelineRequest,
+ GetPipelineRequest,
+ GetPipelineResponse,
+ ListPipelinesRequest,
+ ListPipelinesResponse,
+ UpdatePipelineRequest,
+ UpdatePipelineResponse,
+)
+from .publishevent import (
+ PublishEventRequest,
+ PublishEventRequestBody,
+ PublishEventResponse,
+ PublishEventResponseBody,
+)
+from .space_crud import (
+ CreateSpaceRequest,
+ CreateSpaceResponse,
+ DeleteSpaceRequest,
+ ListSpacesRequest,
+ ListSpacesResponse,
+)
__all__ = [
- "PublishEventRequest",
- "PublishEventRequestBody",
- "PublishEventResponse",
- "PublishEventResponseBody",
+ "BaseManagementRequest",
+ "BasePipelineManagementRequest",
+ "BaseRequest",
+ "BaseResponse",
+ "BaseSpaceManagementDataRequest",
"ConsumeEventRequest",
"ConsumeEventResponse",
"ConsumeEventResponseBody",
"ConsumeFailedRequest",
"ConsumeFailedResponse",
"ConsumeFailedResponseBody",
+ "CreatePipelineRequest",
+ "CreatePipelineResponse",
+ "DeletePipelineRequest",
+ "DeleteSpaceRequest",
+ "GetPipelineRequest",
+ "GetPipelineResponse",
+ "ListPipelinesRequest",
+ "ListPipelinesResponse",
+ "ListAccessTokensRequest",
+ "PublishEventRequest",
+ "PublishEventRequestBody",
+ "PublishEventResponse",
+ "PublishEventResponseBody",
+ "GetArtifactRequest",
+ "GetFunctionLogsRequest",
+ "GetFunctionLogsResponse",
"StatusAccessTokenRequest",
+ "ListSpacesResponse",
+ "ListSpacesRequest",
+ "CreateSpaceRequest",
+ "CreateSpaceResponse",
+ "UpdatePipelineRequest",
+ "UpdatePipelineResponse",
+ "UpdateFunctionRequest",
+ "FetchFunctionRequest",
+ "PostArtifactRequest",
]
diff --git a/src/glassflow/models/operations/access_token.py b/src/glassflow/models/operations/access_token.py
new file mode 100644
index 0000000..1832fc4
--- /dev/null
+++ b/src/glassflow/models/operations/access_token.py
@@ -0,0 +1,25 @@
+from __future__ import annotations
+
+import dataclasses
+
+from .base import BasePipelineDataRequest, BasePipelineManagementRequest
+
+
+@dataclasses.dataclass
+class StatusAccessTokenRequest(BasePipelineDataRequest):
+ """Request check the status of an access token
+
+ Attributes:
+ pipeline_id: The id of the pipeline
+ organization_id: The id of the organization
+ x_pipeline_access_token: The access token of the pipeline
+
+ """
+
+ pass
+
+
+@dataclasses.dataclass
+class ListAccessTokensRequest(BasePipelineManagementRequest):
+ page_size: int = 50
+ page: int = 1
diff --git a/src/glassflow/models/operations/artifact.py b/src/glassflow/models/operations/artifact.py
new file mode 100644
index 0000000..daad296
--- /dev/null
+++ b/src/glassflow/models/operations/artifact.py
@@ -0,0 +1,23 @@
+from __future__ import annotations
+
+import dataclasses
+
+from .base import BasePipelineManagementRequest
+
+
+@dataclasses.dataclass
+class GetArtifactRequest(BasePipelineManagementRequest):
+ pass
+
+
+@dataclasses.dataclass
+class PostArtifactRequest(BasePipelineManagementRequest):
+ file: str | None = dataclasses.field(
+ default=None,
+ metadata={
+ "multipart_form": {
+ "field_name": "file",
+ }
+ },
+ )
+ requirementsTxt: str | None = dataclasses.field(default=None)
diff --git a/src/glassflow/models/operations/base.py b/src/glassflow/models/operations/base.py
new file mode 100644
index 0000000..da5f539
--- /dev/null
+++ b/src/glassflow/models/operations/base.py
@@ -0,0 +1,92 @@
+import dataclasses
+from typing import Optional
+
+from requests import Response
+
+
+@dataclasses.dataclass()
+class BaseRequest:
+ pass
+
+
+@dataclasses.dataclass()
+class BaseManagementRequest(BaseRequest):
+ organization_id: Optional[str] = dataclasses.field(
+ default=None,
+ metadata={
+ "query_param": {
+ "field_name": "organization_id",
+ "style": "form",
+ "explode": True,
+ }
+ },
+ )
+ personal_access_token: str = dataclasses.field(
+ default=None,
+ metadata={
+ "header": {
+ "field_name": "Personal-Access-Token",
+ "style": "simple",
+ "explode": False,
+ }
+ },
+ )
+
+
+@dataclasses.dataclass
+class BasePipelineRequest(BaseRequest):
+ pipeline_id: str = dataclasses.field(
+ metadata={
+ "path_param": {
+ "field_name": "pipeline_id",
+ "style": "simple",
+ "explode": False,
+ }
+ }
+ )
+ organization_id: Optional[str] = dataclasses.field(
+ default=None,
+ metadata={
+ "query_param": {
+ "field_name": "organization_id",
+ "style": "form",
+ "explode": True,
+ }
+ },
+ )
+
+
+@dataclasses.dataclass
+class BasePipelineDataRequest(BasePipelineRequest):
+ x_pipeline_access_token: str = dataclasses.field(
+ default=None,
+ metadata={
+ "header": {
+ "field_name": "X-PIPELINE-ACCESS-TOKEN",
+ "style": "simple",
+ "explode": False,
+ }
+ },
+ )
+
+
+@dataclasses.dataclass
+class BaseSpaceRequest(BaseRequest):
+ space_id: str
+
+
+@dataclasses.dataclass
+class BasePipelineManagementRequest(BaseManagementRequest, BasePipelineRequest):
+ pass
+
+
+@dataclasses.dataclass
+class BaseSpaceManagementDataRequest(BaseManagementRequest, BaseSpaceRequest):
+ pass
+
+
+@dataclasses.dataclass
+class BaseResponse:
+ content_type: str = dataclasses.field()
+ status_code: int = dataclasses.field()
+ raw_response: Response = dataclasses.field()
diff --git a/src/glassflow/models/operations/consumeevent.py b/src/glassflow/models/operations/consumeevent.py
index 14847ab..5fe727c 100644
--- a/src/glassflow/models/operations/consumeevent.py
+++ b/src/glassflow/models/operations/consumeevent.py
@@ -3,14 +3,14 @@
from __future__ import annotations
import dataclasses
-from typing import Optional
-import requests as requests_http
from dataclasses_json import config, dataclass_json
+from .base import BasePipelineDataRequest, BaseResponse
+
@dataclasses.dataclass
-class ConsumeEventRequest:
+class ConsumeEventRequest(BasePipelineDataRequest):
"""Request to consume an event from a pipeline topic
Attributes:
@@ -20,35 +20,7 @@ class ConsumeEventRequest:
"""
- pipeline_id: str = dataclasses.field(
- metadata={
- "path_param": {
- "field_name": "pipeline_id",
- "style": "simple",
- "explode": False,
- }
- }
- )
- organization_id: Optional[str] = dataclasses.field(
- default=None,
- metadata={
- "query_param": {
- "field_name": "organization_id",
- "style": "form",
- "explode": True,
- }
- },
- )
- x_pipeline_access_token: str = dataclasses.field(
- default=None,
- metadata={
- "header": {
- "field_name": "X-PIPELINE-ACCESS-TOKEN",
- "style": "simple",
- "explode": False,
- }
- },
- )
+ pass
@dataclass_json
@@ -69,7 +41,7 @@ class ConsumeEventResponseBody:
@dataclasses.dataclass
-class ConsumeEventResponse:
+class ConsumeEventResponse(BaseResponse):
"""Response to consume an event from a pipeline topic
Attributes:
@@ -80,14 +52,11 @@ class ConsumeEventResponse:
"""
- content_type: str = dataclasses.field()
- status_code: int = dataclasses.field()
- raw_response: requests_http.Response = dataclasses.field()
- body: Optional[ConsumeEventResponseBody] = dataclasses.field(default=None)
+ body: ConsumeEventResponseBody | None = dataclasses.field(default=None)
def json(self):
"""Return the response body as a JSON object.
- This method is to have cmopatibility with the requests.Response.json() method
+ This method is to have compatibility with the requests.Response.json() method
Returns:
dict: The transformed event as a JSON object
diff --git a/src/glassflow/models/operations/consumefailed.py b/src/glassflow/models/operations/consumefailed.py
index de1bff2..12fadf4 100644
--- a/src/glassflow/models/operations/consumefailed.py
+++ b/src/glassflow/models/operations/consumefailed.py
@@ -3,14 +3,14 @@
from __future__ import annotations
import dataclasses
-from typing import Optional
-import requests as requests_http
from dataclasses_json import config, dataclass_json
+from .base import BasePipelineDataRequest, BaseResponse
+
@dataclasses.dataclass
-class ConsumeFailedRequest:
+class ConsumeFailedRequest(BasePipelineDataRequest):
"""Request to consume failed events from a pipeline
Attributes:
@@ -20,35 +20,7 @@ class ConsumeFailedRequest:
"""
- pipeline_id: str = dataclasses.field(
- metadata={
- "path_param": {
- "field_name": "pipeline_id",
- "style": "simple",
- "explode": False,
- }
- }
- )
- organization_id: Optional[str] = dataclasses.field(
- default=None,
- metadata={
- "query_param": {
- "field_name": "organization_id",
- "style": "form",
- "explode": True,
- }
- },
- )
- x_pipeline_access_token: str = dataclasses.field(
- default=None,
- metadata={
- "header": {
- "field_name": "X-PIPELINE-ACCESS-TOKEN",
- "style": "simple",
- "explode": False,
- }
- },
- )
+ pass
@dataclass_json
@@ -69,8 +41,8 @@ class ConsumeFailedResponseBody:
@dataclasses.dataclass
-class ConsumeFailedResponse:
- """Response to consume an failed event from a pipeline
+class ConsumeFailedResponse(BaseResponse):
+ """Response to consume a failed event from a pipeline
Attributes:
content_type: HTTP response content type for this operation
@@ -80,14 +52,11 @@ class ConsumeFailedResponse:
"""
- content_type: str = dataclasses.field()
- status_code: int = dataclasses.field()
- raw_response: requests_http.Response = dataclasses.field()
- body: Optional[ConsumeFailedResponseBody] = dataclasses.field(default=None)
+ body: ConsumeFailedResponseBody | None = dataclasses.field(default=None)
def json(self):
"""Return the response body as a JSON object.
- This method is to have cmopatibility with the requests.Response.json() method
+ This method is to have compatibility with the requests.Response.json() method
Returns:
dict: The transformed event as a JSON object
diff --git a/src/glassflow/models/operations/function.py b/src/glassflow/models/operations/function.py
new file mode 100644
index 0000000..93c8ecb
--- /dev/null
+++ b/src/glassflow/models/operations/function.py
@@ -0,0 +1,31 @@
+from __future__ import annotations
+
+import dataclasses
+
+from ..api import FunctionEnvironments, FunctionLogs, SeverityCodeInput
+from .base import BasePipelineManagementRequest, BaseResponse
+
+
+@dataclasses.dataclass
+class GetFunctionLogsRequest(BasePipelineManagementRequest):
+ page_size: int = 50
+ page_token: str = None
+ severity_code: SeverityCodeInput | None = None
+ start_time: str | None = None
+ end_time: str | None = None
+
+
+@dataclasses.dataclass
+class GetFunctionLogsResponse(BaseResponse):
+ logs: FunctionLogs
+ next: str
+
+
+@dataclasses.dataclass
+class FetchFunctionRequest(BasePipelineManagementRequest):
+ pass
+
+
+@dataclasses.dataclass
+class UpdateFunctionRequest(BasePipelineManagementRequest):
+ environments: FunctionEnvironments | None = dataclasses.field(default=None)
diff --git a/src/glassflow/models/operations/pipeline_crud.py b/src/glassflow/models/operations/pipeline_crud.py
new file mode 100644
index 0000000..72d0c0f
--- /dev/null
+++ b/src/glassflow/models/operations/pipeline_crud.py
@@ -0,0 +1,78 @@
+from __future__ import annotations
+
+import dataclasses
+from enum import Enum
+
+from ..api import (
+ CreatePipeline,
+ GetDetailedSpacePipeline,
+ PipelineState,
+ SinkConnector,
+ SourceConnector,
+ SpacePipeline,
+)
+from .base import BaseManagementRequest, BasePipelineManagementRequest, BaseResponse
+
+
+@dataclasses.dataclass
+class GetPipelineRequest(BasePipelineManagementRequest):
+ pass
+
+
+@dataclasses.dataclass
+class GetPipelineResponse(BaseResponse):
+ pipeline: GetDetailedSpacePipeline | None = dataclasses.field(default=None)
+
+
+@dataclasses.dataclass
+class CreatePipelineRequest(BaseManagementRequest, CreatePipeline):
+ pass
+
+
+@dataclasses.dataclass
+class UpdatePipelineRequest(BaseManagementRequest):
+ name: str | None = dataclasses.field(default=None)
+ state: PipelineState | None = dataclasses.field(default=None)
+ metadata: dict | None = dataclasses.field(default=None)
+ source_connector: SourceConnector | None = dataclasses.field(default=None)
+ sink_connector: SinkConnector | None = dataclasses.field(default=None)
+
+
+@dataclasses.dataclass
+class UpdatePipelineResponse(BaseResponse):
+ pipeline: GetDetailedSpacePipeline | None = dataclasses.field(default=None)
+
+
+@dataclasses.dataclass
+class CreatePipelineResponse(BaseResponse):
+ name: str
+ space_id: str
+ id: str
+ created_at: str
+ state: PipelineState
+ access_token: str
+ metadata: dict | None = dataclasses.field(default=None)
+
+
+@dataclasses.dataclass
+class DeletePipelineRequest(BasePipelineManagementRequest):
+ pass
+
+
+class Order(str, Enum):
+ asc = "asc"
+ desc = "desc"
+
+
+@dataclasses.dataclass
+class ListPipelinesRequest(BaseManagementRequest):
+ space_id: list[str] | None = None
+ page_size: int = 50
+ page: int = 1
+ order_by: Order = Order.asc
+
+
+@dataclasses.dataclass
+class ListPipelinesResponse(BaseResponse):
+ total_amount: int
+ pipelines: list[SpacePipeline]
diff --git a/src/glassflow/models/operations/publishevent.py b/src/glassflow/models/operations/publishevent.py
index 00a830f..f5e78ba 100644
--- a/src/glassflow/models/operations/publishevent.py
+++ b/src/glassflow/models/operations/publishevent.py
@@ -3,9 +3,8 @@
from __future__ import annotations
import dataclasses
-from typing import Optional
-import requests as requests_http
+from .base import BasePipelineDataRequest, BaseResponse
@dataclasses.dataclass
@@ -14,7 +13,7 @@ class PublishEventRequestBody:
@dataclasses.dataclass
-class PublishEventRequest:
+class PublishEventRequest(BasePipelineDataRequest):
"""Request to publish an event to a pipeline topic
Attributes:
@@ -24,35 +23,6 @@ class PublishEventRequest:
request_body: The request body / event that should be published to the pipeline
"""
- pipeline_id: str = dataclasses.field(
- metadata={
- "path_param": {
- "field_name": "pipeline_id",
- "style": "simple",
- "explode": False,
- }
- }
- )
- organization_id: Optional[str] = dataclasses.field(
- default=None,
- metadata={
- "query_param": {
- "field_name": "organization_id",
- "style": "form",
- "explode": True,
- }
- },
- )
- x_pipeline_access_token: str = dataclasses.field(
- default=None,
- metadata={
- "header": {
- "field_name": "X-PIPELINE-ACCESS-TOKEN",
- "style": "simple",
- "explode": False,
- }
- },
- )
request_body: dict = dataclasses.field(
default=None, metadata={"request": {"media_type": "application/json"}}
)
@@ -64,7 +34,7 @@ class PublishEventResponseBody:
@dataclasses.dataclass
-class PublishEventResponse:
+class PublishEventResponse(BaseResponse):
"""Response object for publish event operation
Attributes:
@@ -75,7 +45,4 @@ class PublishEventResponse:
"""
- content_type: str = dataclasses.field()
- status_code: int = dataclasses.field()
- raw_response: requests_http.Response = dataclasses.field()
- object: Optional[PublishEventResponseBody] = dataclasses.field(default=None)
+ object: PublishEventResponseBody | None = dataclasses.field(default=None)
diff --git a/src/glassflow/models/operations/space_crud.py b/src/glassflow/models/operations/space_crud.py
new file mode 100644
index 0000000..993c096
--- /dev/null
+++ b/src/glassflow/models/operations/space_crud.py
@@ -0,0 +1,42 @@
+from __future__ import annotations
+
+import dataclasses
+from enum import Enum
+
+from ..api import CreateSpace, SpaceScope
+from .base import BaseManagementRequest, BaseResponse, BaseSpaceManagementDataRequest
+
+
+@dataclasses.dataclass
+class ListSpacesResponse(BaseResponse):
+ total_amount: int
+ spaces: list[SpaceScope]
+
+
+class Order(str, Enum):
+ asc = "asc"
+ desc = "desc"
+
+
+@dataclasses.dataclass
+class ListSpacesRequest(BaseManagementRequest):
+ page_size: int = 50
+ page: int = 1
+ order_by: Order = Order.asc
+
+
+@dataclasses.dataclass
+class CreateSpaceRequest(BaseManagementRequest, CreateSpace):
+ pass
+
+
+@dataclasses.dataclass
+class CreateSpaceResponse(BaseResponse):
+ name: str
+ id: str
+ created_at: str
+
+
+@dataclasses.dataclass
+class DeleteSpaceRequest(BaseSpaceManagementDataRequest):
+ pass
diff --git a/src/glassflow/models/operations/status_access_token.py b/src/glassflow/models/operations/status_access_token.py
deleted file mode 100644
index 037d943..0000000
--- a/src/glassflow/models/operations/status_access_token.py
+++ /dev/null
@@ -1,34 +0,0 @@
-from __future__ import annotations
-
-import dataclasses
-
-
-@dataclasses.dataclass
-class StatusAccessTokenRequest:
- """Request check the status of an access token
-
- Attributes:
- pipeline_id: The id of the pipeline
- x_pipeline_access_token: The access token of the pipeline
-
- """
-
- pipeline_id: str = dataclasses.field(
- metadata={
- "path_param": {
- "field_name": "pipeline_id",
- "style": "simple",
- "explode": False,
- }
- }
- )
- x_pipeline_access_token: str = dataclasses.field(
- default=None,
- metadata={
- "header": {
- "field_name": "X-PIPELINE-ACCESS-TOKEN",
- "style": "simple",
- "explode": False,
- }
- },
- )
diff --git a/src/glassflow/pipeline.py b/src/glassflow/pipeline.py
new file mode 100644
index 0000000..bde51d2
--- /dev/null
+++ b/src/glassflow/pipeline.py
@@ -0,0 +1,553 @@
+from __future__ import annotations
+
+from .client import APIClient
+from .models import api, errors, operations
+from .pipeline_data import PipelineDataSink, PipelineDataSource
+
+
+class Pipeline(APIClient):
+ def __init__(
+ self,
+ personal_access_token: str,
+ name: str | None = None,
+ space_id: str | None = None,
+ id: str | None = None,
+ source_kind: str | None = None,
+ source_config: dict | None = None,
+ sink_kind: str | None = None,
+ sink_config: dict | None = None,
+ requirements: str | None = None,
+ transformation_file: str | None = None,
+ env_vars: list[dict[str, str]] | None = None,
+ state: api.PipelineState = "running",
+ organization_id: str | None = None,
+ metadata: dict | None = None,
+ created_at: str | None = None,
+ ):
+ """Creates a new GlassFlow pipeline object
+
+ Args:
+ personal_access_token: The personal access token to authenticate
+ against GlassFlow
+ id: Pipeline ID
+ name: Name of the pipeline
+ space_id: ID of the GlassFlow Space you want to create the pipeline in
+ transformation_file: Path to file with transformation function of
+ the pipeline.
+ requirements: Requirements.txt of the pipeline
+ source_kind: Kind of source for the pipeline. If no source is
+ provided, the default source will be SDK
+ source_config: Configuration of the pipeline's source
+ sink_kind: Kind of sink for the pipeline. If no sink is provided,
+ the default sink will be SDK
+ sink_config: Configuration of the pipeline's sink
+ env_vars: Environment variables to pass to the pipeline
+ state: State of the pipeline after creation.
+ It can be either "running" or "paused"
+ metadata: Metadata of the pipeline
+ created_at: Timestamp when the pipeline was created
+
+ Returns:
+ Pipeline: Pipeline object to interact with the GlassFlow API
+
+ Raises:
+ FailNotFoundError: If the transformation file is provided and
+ does not exist
+ """
+ super().__init__()
+ self.id = id
+ self.name = name
+ self.space_id = space_id
+ self.personal_access_token = personal_access_token
+ self.source_kind = source_kind
+ self.source_config = source_config
+ self.sink_kind = sink_kind
+ self.sink_config = sink_config
+ self.requirements = requirements
+ self.transformation_code = None
+ self.transformation_file = transformation_file
+ self.env_vars = env_vars
+ self.state = state
+ self.organization_id = organization_id
+ self.metadata = metadata if metadata is not None else {}
+ self.created_at = created_at
+ self.access_tokens = []
+
+ if self.transformation_file is not None:
+ self._read_transformation_file()
+
+ if source_kind is not None and self.source_config is not None:
+ self.source_connector = dict(
+ kind=self.source_kind,
+ config=self.source_config,
+ )
+ elif self.source_kind is None and self.source_config is None:
+ self.source_connector = None
+ else:
+ raise ValueError("Both source_kind and source_config must be provided")
+
+ if self.sink_kind is not None and self.sink_config is not None:
+ self.sink_connector = dict(
+ kind=sink_kind,
+ config=sink_config,
+ )
+ elif self.sink_kind is None and self.sink_config is None:
+ self.sink_connector = None
+ else:
+ raise ValueError("Both sink_kind and sink_config must be provided")
+
+ def fetch(self) -> Pipeline:
+ """
+ Fetches pipeline information from the GlassFlow API
+
+ Returns:
+ self: Pipeline object
+
+ Raises:
+ ValueError: If ID is not provided in the constructor
+ PipelineNotFoundError: If ID provided does not match any
+ existing pipeline in GlassFlow
+ UnauthorizedError: If the Personal Access Token is not
+ provider or is invalid
+ """
+ if self.id is None:
+ raise ValueError(
+ "Pipeline id must be provided in order to fetch it's details"
+ )
+
+ request = operations.GetPipelineRequest(
+ pipeline_id=self.id,
+ organization_id=self.organization_id,
+ personal_access_token=self.personal_access_token,
+ )
+
+ base_res = self._request(
+ method="GET",
+ endpoint=f"/pipelines/{self.id}",
+ request=request,
+ )
+ self._fill_pipeline_details(base_res.raw_response.json())
+
+ # Fetch Pipeline Access Tokens
+ self._list_access_tokens()
+
+ # Fetch function source
+ self._get_function_artifact()
+
+ return self
+
+ def create(self) -> Pipeline:
+ """
+ Creates a new GlassFlow pipeline
+
+ Returns:
+ self: Pipeline object
+
+ Raises:
+ ValueError: If name is not provided in the constructor
+ ValueError: If space_id is not provided in the constructor
+ ValueError: If transformation_file is not provided
+ in the constructor
+ """
+ create_pipeline = api.CreatePipeline(
+ name=self.name,
+ space_id=self.space_id,
+ transformation_function=self.transformation_code,
+ requirements_txt=self.requirements,
+ source_connector=self.source_connector,
+ sink_connector=self.sink_connector,
+ environments=self.env_vars,
+ state=self.state,
+ metadata=self.metadata,
+ )
+ if self.name is None:
+ raise ValueError("Name must be provided in order to create the pipeline")
+ if self.space_id is None:
+ raise ValueError("Argument space_id must be provided in the constructor")
+ if self.transformation_file is None:
+ raise ValueError(
+ "Argument transformation_file must be provided in the constructor"
+ )
+ else:
+ self._read_transformation_file()
+
+ request = operations.CreatePipelineRequest(
+ organization_id=self.organization_id,
+ personal_access_token=self.personal_access_token,
+ **create_pipeline.__dict__,
+ )
+
+ base_res = self._request(method="POST", endpoint="/pipelines", request=request)
+ res = operations.CreatePipelineResponse(
+ status_code=base_res.status_code,
+ content_type=base_res.content_type,
+ raw_response=base_res.raw_response,
+ **base_res.raw_response.json(),
+ )
+
+ self.id = res.id
+ self.created_at = res.created_at
+ self.access_tokens.append({"name": "default", "token": res.access_token})
+ return self
+
+ def update(
+ self,
+ name: str | None = None,
+ state: api.PipelineState | None = None,
+ transformation_file: str | None = None,
+ requirements: str | None = None,
+ metadata: dict | None = None,
+ source_kind: str | None = None,
+ source_config: dict | None = None,
+ sink_kind: str | None = None,
+ sink_config: dict | None = None,
+ env_vars: list[dict[str, str]] | None = None,
+ ) -> Pipeline:
+ """
+ Updates a GlassFlow pipeline
+
+ Args:
+
+ name: Name of the pipeline
+ state: State of the pipeline after creation.
+ It can be either "running" or "paused"
+ transformation_file: Path to file with transformation function of
+ the pipeline.
+ requirements: Requirements.txt of the pipeline
+ source_kind: Kind of source for the pipeline. If no source is
+ provided, the default source will be SDK
+ source_config: Configuration of the pipeline's source
+ sink_kind: Kind of sink for the pipeline. If no sink is provided,
+ the default sink will be SDK
+ sink_config: Configuration of the pipeline's sink
+ env_vars: Environment variables to pass to the pipeline
+ metadata: Metadata of the pipeline
+
+ Returns:
+ self: Updated pipeline
+
+ """
+
+ # Fetch current pipeline data
+ self.fetch()
+
+ if transformation_file is not None or requirements is not None:
+ if transformation_file is not None:
+ with open(transformation_file) as f:
+ file = f.read()
+ else:
+ file = self.transformation_code
+
+ if requirements is None:
+ requirements = self.requirements
+
+ self._upload_function_artifact(file, requirements)
+ self.requirements = requirements
+ self.transformation_code = file
+
+ if source_kind is not None:
+ source_connector = dict(
+ kind=source_kind,
+ config=source_config,
+ )
+ else:
+ source_connector = self.source_connector
+
+ if sink_kind is not None:
+ sink_connector = dict(
+ kind=sink_kind,
+ config=sink_config,
+ )
+ else:
+ sink_connector = self.sink_connector
+
+ if env_vars is not None:
+ self._update_function(env_vars)
+
+ request = operations.UpdatePipelineRequest(
+ organization_id=self.organization_id,
+ personal_access_token=self.personal_access_token,
+ name=name if name is not None else self.name,
+ state=state if state is not None else self.state,
+ metadata=metadata if metadata is not None else self.metadata,
+ source_connector=source_connector,
+ sink_connector=sink_connector,
+ )
+
+ base_res = self._request(
+ method="PATCH", endpoint=f"/pipelines/{self.id}", request=request
+ )
+ self._fill_pipeline_details(base_res.raw_response.json())
+ return self
+
+ def delete(self) -> None:
+ """
+ Deletes a GlassFlow pipeline
+
+ Returns:
+
+ Raises:
+ ValueError: If ID is not provided in the constructor
+ PipelineNotFoundError: If ID provided does not match any
+ existing pipeline in GlassFlow
+ UnauthorizedError: If the Personal Access Token is not
+ provided or is invalid
+ """
+ if self.id is None:
+ raise ValueError("Pipeline id must be provided")
+
+ request = operations.DeletePipelineRequest(
+ pipeline_id=self.id,
+ organization_id=self.organization_id,
+ personal_access_token=self.personal_access_token,
+ )
+ self._request(
+ method="DELETE",
+ endpoint=f"/pipelines/{self.id}",
+ request=request,
+ )
+
+ def get_logs(
+ self,
+ page_size: int = 50,
+ page_token: str | None = None,
+ severity_code: api.SeverityCodeInput = api.SeverityCodeInput.integer_100,
+ start_time: str | None = None,
+ end_time: str | None = None,
+ ) -> operations.GetFunctionLogsResponse:
+ """
+ Get the pipeline's logs
+
+ Args:
+ page_size: Pagination size
+ page_token: Page token filter (use for pagination)
+ severity_code: Severity code filter
+ start_time: Start time filter
+ end_time: End time filter
+
+ Returns:
+ PipelineFunctionsGetLogsResponse: Response with the logs
+ """
+ request = operations.GetFunctionLogsRequest(
+ organization_id=self.organization_id,
+ personal_access_token=self.personal_access_token,
+ pipeline_id=self.id,
+ page_size=page_size,
+ page_token=page_token,
+ severity_code=severity_code,
+ start_time=start_time,
+ end_time=end_time,
+ )
+ base_res = self._request(
+ method="GET",
+ endpoint=f"/pipelines/{self.id}/functions/main/logs",
+ request=request,
+ )
+ base_res_json = base_res.raw_response.json()
+ logs = [
+ api.FunctionLogEntry.from_dict(entry) for entry in base_res_json["logs"]
+ ]
+ return operations.GetFunctionLogsResponse(
+ status_code=base_res.status_code,
+ content_type=base_res.content_type,
+ raw_response=base_res.raw_response,
+ logs=logs,
+ next=base_res_json["next"],
+ )
+
+ def _list_access_tokens(self) -> Pipeline:
+ request = operations.ListAccessTokensRequest(
+ organization_id=self.organization_id,
+ personal_access_token=self.personal_access_token,
+ pipeline_id=self.id,
+ )
+ base_res = self._request(
+ method="GET",
+ endpoint=f"/pipelines/{self.id}/access_tokens",
+ request=request,
+ )
+ res_json = base_res.raw_response.json()
+ self.access_tokens = res_json["access_tokens"]
+ return self
+
+ def _get_function_artifact(self) -> Pipeline:
+ """
+ Fetch pipeline function source
+
+ Returns:
+ self: Pipeline with function source details
+ """
+ request = operations.GetArtifactRequest(
+ organization_id=self.organization_id,
+ personal_access_token=self.personal_access_token,
+ pipeline_id=self.id,
+ )
+ base_res = self._request(
+ method="GET",
+ endpoint=f"/pipelines/{self.id}/functions/main/artifacts/latest",
+ request=request,
+ )
+ res_json = base_res.raw_response.json()
+ self.transformation_code = res_json["transformation_function"]
+ self.requirements = res_json["requirements_txt"]
+ return self
+
+ def _upload_function_artifact(self, file: str, requirements: str) -> None:
+ request = operations.PostArtifactRequest(
+ pipeline_id=self.id,
+ organization_id=self.organization_id,
+ personal_access_token=self.personal_access_token,
+ file=file,
+ requirementsTxt=requirements,
+ )
+ try:
+ self._request(
+ method="POST",
+ endpoint=f"/pipelines/{self.id}/functions/main/artifacts",
+ request=request,
+ serialization_method="multipart",
+ )
+ except errors.ClientError as e:
+ if e.status_code == 425:
+ # TODO: Figure out appropriate behaviour
+ print("Update still in progress")
+ else:
+ raise e
+
+ def _update_function(self, env_vars):
+ """
+ Patch pipeline function
+
+ Args:
+ env_vars: Environment variables to update
+
+ Returns:
+ self: Pipeline with updated function
+ """
+ request = operations.UpdateFunctionRequest(
+ organization_id=self.organization_id,
+ personal_access_token=self.personal_access_token,
+ pipeline_id=self.id,
+ environments=env_vars,
+ )
+ base_res = self._request(
+ method="PATCH",
+ endpoint=f"/pipelines/{self.id}/functions/main",
+ request=request,
+ )
+ res_json = base_res.raw_response.json()
+ self.env_vars = res_json["environments"]
+ return self
+
+ def get_source(
+ self, pipeline_access_token_name: str | None = None
+ ) -> PipelineDataSource:
+ """
+ Get source client to publish data to the pipeline
+
+ Args:
+ pipeline_access_token_name (str | None): Name of the pipeline
+ access token to use. If not specified, the default token
+ will be used
+
+ Returns:
+ PipelineDataSource: Source client to publish data to the pipeline
+
+ Raises:
+ ValueError: If pipeline id is not provided in the constructor
+ """
+ return self._get_data_client("source", pipeline_access_token_name)
+
+ def get_sink(
+ self, pipeline_access_token_name: str | None = None
+ ) -> PipelineDataSink:
+ """
+ Get sink client to consume data from the pipeline
+
+ Args:
+ pipeline_access_token_name (str | None): Name of the pipeline
+ access token to use. If not specified, the default token
+ will be used
+
+ Returns:
+ PipelineDataSink: Sink client to consume data from the pipeline
+
+ Raises:
+ ValueError: If pipeline id is not provided in the constructor
+ """
+ return self._get_data_client("sink", pipeline_access_token_name)
+
+ def _get_data_client(
+ self, client_type: str, pipeline_access_token_name: str | None = None
+ ) -> PipelineDataSource | PipelineDataSink:
+ if self.id is None:
+ raise ValueError("Pipeline id must be provided in the constructor")
+ elif len(self.access_tokens) == 0:
+ self._list_access_tokens()
+
+ if pipeline_access_token_name is not None:
+ for t in self.access_tokens:
+ if t["name"] == pipeline_access_token_name:
+ token = t["token"]
+ break
+ else:
+ raise ValueError(
+ f"Token with name {pipeline_access_token_name} " f"was not found"
+ )
+ else:
+ token = self.access_tokens[0]["token"]
+ if client_type == "source":
+ client = PipelineDataSource(
+ pipeline_id=self.id,
+ pipeline_access_token=token,
+ )
+ elif client_type == "sink":
+ client = PipelineDataSink(
+ pipeline_id=self.id,
+ pipeline_access_token=token,
+ )
+ else:
+ raise ValueError("client_type must be either source or sink")
+ return client
+
+ def _request(
+ self,
+ method: str,
+ endpoint: str,
+ request: operations.BaseManagementRequest,
+ **kwargs,
+ ) -> operations.BaseResponse:
+ try:
+ return super()._request(
+ method=method, endpoint=endpoint, request=request, **kwargs
+ )
+ except errors.ClientError as e:
+ if e.status_code == 404:
+ raise errors.PipelineNotFoundError(self.id, e.raw_response) from e
+ elif e.status_code == 401:
+ raise errors.UnauthorizedError(e.raw_response) from e
+ else:
+ raise e
+
+ def _read_transformation_file(self):
+ try:
+ with open(self.transformation_file) as f:
+ self.transformation_code = f.read()
+ except FileNotFoundError:
+ raise
+
+ def _fill_pipeline_details(self, pipeline_details: dict) -> Pipeline:
+ self.id = pipeline_details["id"]
+ self.name = pipeline_details["name"]
+ self.space_id = pipeline_details["space_id"]
+ self.state = pipeline_details["state"]
+ if pipeline_details["source_connector"]:
+ self.source_kind = pipeline_details["source_connector"]["kind"]
+ self.source_config = pipeline_details["source_connector"]["config"]
+ if pipeline_details["sink_connector"]:
+ self.sink_kind = pipeline_details["sink_connector"]["kind"]
+ self.sink_config = pipeline_details["sink_connector"]["config"]
+ self.created_at = pipeline_details["created_at"]
+ self.env_vars = pipeline_details["environments"]
+
+ return self
diff --git a/src/glassflow/pipeline_data.py b/src/glassflow/pipeline_data.py
new file mode 100644
index 0000000..52d0380
--- /dev/null
+++ b/src/glassflow/pipeline_data.py
@@ -0,0 +1,215 @@
+import random
+import time
+from typing import Optional
+
+from .api_client import APIClient
+from .models import errors, operations
+from .models.operations.base import BasePipelineDataRequest, BaseResponse
+from .utils import utils
+
+
+class PipelineDataClient(APIClient):
+ """Base Client object to publish and consume events from the given pipeline.
+
+ Attributes:
+ glassflow_config: GlassFlowConfig object to interact with GlassFlow API
+ pipeline_id: The pipeline id to interact with
+ pipeline_access_token: The access token to access the pipeline
+ """
+
+ def __init__(self, pipeline_id: str, pipeline_access_token: str):
+ super().__init__()
+ self.pipeline_id = pipeline_id
+ self.pipeline_access_token = pipeline_access_token
+
+ def validate_credentials(self) -> None:
+ """
+ Check if the pipeline credentials are valid and raise an error if not
+ """
+ request = operations.StatusAccessTokenRequest(
+ pipeline_id=self.pipeline_id,
+ x_pipeline_access_token=self.pipeline_access_token,
+ )
+ self._request(
+ method="GET",
+ endpoint="/pipelines/{pipeline_id}/status/access_token",
+ request=request,
+ )
+
+ def _request(
+ self, method: str, endpoint: str, request: BasePipelineDataRequest, **kwargs
+ ) -> BaseResponse:
+ try:
+ res = super()._request(method, endpoint, request, **kwargs)
+ except errors.ClientError as e:
+ if e.status_code == 401:
+ raise errors.PipelineAccessTokenInvalidError(e.raw_response) from e
+ elif e.status_code == 404:
+ raise errors.PipelineNotFoundError(
+ self.pipeline_id, e.raw_response
+ ) from e
+ else:
+ raise e
+ return res
+
+
+class PipelineDataSource(PipelineDataClient):
+ def publish(self, request_body: dict) -> operations.PublishEventResponse:
+ """Push a new message into the pipeline
+
+ Args:
+ request_body: The message to be published into the pipeline
+
+ Returns:
+ PublishEventResponse: Response object containing the status
+ code and the raw response
+
+ Raises:
+ ClientError: If an error occurred while publishing the event
+ """
+ request = operations.PublishEventRequest(
+ pipeline_id=self.pipeline_id,
+ x_pipeline_access_token=self.pipeline_access_token,
+ request_body=request_body,
+ )
+ base_res = self._request(
+ method="POST",
+ endpoint="/pipelines/{pipeline_id}/topics/input/events",
+ request=request,
+ )
+
+ return operations.PublishEventResponse(
+ status_code=base_res.status_code,
+ content_type=base_res.content_type,
+ raw_response=base_res.raw_response,
+ )
+
+
+class PipelineDataSink(PipelineDataClient):
+ def __init__(self, pipeline_id: str, pipeline_access_token: str):
+ super().__init__(pipeline_id, pipeline_access_token)
+
+ # retry delay for consuming messages (in seconds)
+ self._consume_retry_delay_minimum = 1
+ self._consume_retry_delay_current = 1
+ self._consume_retry_delay_max = 60
+
+ def consume(self) -> operations.ConsumeEventResponse:
+ """Consume the last message from the pipeline
+
+ Returns:
+ ConsumeEventResponse: Response object containing the status
+ code and the raw response
+
+ Raises:
+ ClientError: If an error occurred while consuming the event
+
+ """
+ request = operations.ConsumeEventRequest(
+ pipeline_id=self.pipeline_id,
+ x_pipeline_access_token=self.pipeline_access_token,
+ )
+
+ self._respect_retry_delay()
+ base_res = self._request(
+ method="POST",
+ endpoint="/pipelines/{pipeline_id}/topics/output/events/consume",
+ request=request,
+ )
+
+ res = operations.ConsumeEventResponse(
+ status_code=base_res.status_code,
+ content_type=base_res.content_type,
+ raw_response=base_res.raw_response,
+ )
+
+ self._update_retry_delay(base_res.status_code)
+ if res.status_code == 200:
+ if not utils.match_content_type(res.content_type, "application/json"):
+ raise errors.UnknownContentTypeError(res.raw_response)
+
+ self._consume_retry_delay_current = self._consume_retry_delay_minimum
+ body = utils.unmarshal_json(
+ res.raw_response.text, Optional[operations.ConsumeEventResponseBody]
+ )
+ res.body = body
+ elif res.status_code == 204:
+ # No messages to be consumed.
+ # update the retry delay
+ # Return an empty response body
+ body = operations.ConsumeEventResponseBody("", "", {})
+ res.body = body
+ elif res.status_code == 429:
+ # update the retry delay
+ body = operations.ConsumeEventResponseBody("", "", {})
+ res.body = body
+ elif not utils.match_content_type(res.content_type, "application/json"):
+ raise errors.UnknownContentTypeError(res.raw_response)
+
+ return res
+
+ def consume_failed(self) -> operations.ConsumeFailedResponse:
+ """Consume the failed message from the pipeline
+
+ Returns:
+ ConsumeFailedResponse: Response object containing the status
+ code and the raw response
+
+ Raises:
+ ClientError: If an error occurred while consuming the event
+
+ """
+ request = operations.ConsumeFailedRequest(
+ pipeline_id=self.pipeline_id,
+ x_pipeline_access_token=self.pipeline_access_token,
+ )
+
+ self._respect_retry_delay()
+ base_res = self._request(
+ method="POST",
+ endpoint="/pipelines/{pipeline_id}/topics/failed/events/consume",
+ request=request,
+ )
+
+ res = operations.ConsumeFailedResponse(
+ status_code=base_res.status_code,
+ content_type=base_res.content_type,
+ raw_response=base_res.raw_response,
+ )
+
+ self._update_retry_delay(res.status_code)
+ if res.status_code == 200:
+ if not utils.match_content_type(res.content_type, "application/json"):
+ raise errors.UnknownContentTypeError(res.raw_response)
+
+ self._consume_retry_delay_current = self._consume_retry_delay_minimum
+ body = utils.unmarshal_json(
+ res.raw_response.text, Optional[operations.ConsumeFailedResponseBody]
+ )
+ res.body = body
+ elif res.status_code == 204:
+ # No messages to be consumed. Return an empty response body
+ body = operations.ConsumeFailedResponseBody("", "", {})
+ res.body = body
+ elif res.status_code == 429:
+ # update the retry delay
+ body = operations.ConsumeEventResponseBody("", "", {})
+ res.body = body
+ elif not utils.match_content_type(res.content_type, "application/json"):
+ raise errors.UnknownContentTypeError(res.raw_response)
+ return res
+
+ def _update_retry_delay(self, status_code: int):
+ if status_code == 200:
+ self._consume_retry_delay_current = self._consume_retry_delay_minimum
+ elif status_code == 204 or status_code == 429:
+ self._consume_retry_delay_current *= 2
+ self._consume_retry_delay_current = min(
+ self._consume_retry_delay_current, self._consume_retry_delay_max
+ )
+ self._consume_retry_delay_current += random.uniform(0, 0.1)
+
+ def _respect_retry_delay(self):
+ if self._consume_retry_delay_current > self._consume_retry_delay_minimum:
+ # sleep before making the request
+ time.sleep(self._consume_retry_delay_current)
diff --git a/src/glassflow/pipelines.py b/src/glassflow/pipelines.py
deleted file mode 100644
index 0fbbb43..0000000
--- a/src/glassflow/pipelines.py
+++ /dev/null
@@ -1,368 +0,0 @@
-import dataclasses
-import random
-import sys
-import time
-from typing import Optional
-
-import glassflow.utils as utils
-
-from .models import errors, operations
-
-
-class PipelineClient:
- """Client object to publish and consume events from the given pipeline.
-
- Attributes:
- glassflow_client: GlassFlowClient object to interact with GlassFlow API
- pipeline_id: The pipeline id to interact with
- organization_id: Organization ID of the user. If not provided, the default organization will be used
- pipeline_access_token: The access token to access the pipeline
- """
-
- def __init__(
- self, glassflow_client, pipeline_id: str, pipeline_access_token: str
- ) -> None:
- """Create a new PipelineClient object to interact with a specific pipeline
-
- Args:
- glassflow_client: GlassFlowClient object to interact with GlassFlow API
- pipeline_id: The pipeline id to interact with
- pipeline_access_token: The access token to access the pipeline
- """
- self.glassflow_client = glassflow_client
- self.pipeline_id = pipeline_id
- self.organization_id = self.glassflow_client.organization_id
- self.pipeline_access_token = pipeline_access_token
- # retry delay for consuming messages (in seconds)
- self._consume_retry_delay_minimum = 1
- self._consume_retry_delay_current = 1
- self._consume_retry_delay_max = 60
-
- def is_access_token_valid(self) -> bool:
- """
- Check if the pipeline access token is valid
-
- Returns:
- Boolean: True if the pipeline access token is correct, False otherwise
- """
- base_url = self.glassflow_client.glassflow_config.server_url
-
- request = operations.StatusAccessTokenRequest(
- pipeline_id=self.pipeline_id,
- x_pipeline_access_token=self.pipeline_access_token,
- )
-
- url = utils.generate_url(
- operations.PublishEventRequest,
- base_url,
- "/pipelines/{pipeline_id}/status/access_token",
- request,
- )
-
- headers = self._get_headers(request)
-
- client = self.glassflow_client.glassflow_config.client
-
- http_res = client.request("GET", url, headers=headers)
- content_type = http_res.headers.get("Content-Type")
-
- if http_res.status_code == 200:
- res = True
- elif http_res.status_code == 401:
- res = False
- elif http_res.status_code in [400, 500]:
- if utils.match_content_type(content_type, "application/json"):
- out = utils.unmarshal_json(http_res.text, errors.Error)
- out.raw_response = http_res
- raise out
- else:
- raise errors.ClientError(
- f"unknown content-type received: {content_type}",
- http_res.status_code,
- http_res.text,
- http_res,
- )
- elif 400 < http_res.status_code < 600:
- raise errors.ClientError(
- "API error occurred", http_res.status_code, http_res.text, http_res
- )
- return res
-
- def is_valid(self) -> bool:
- """
- Check if the pipeline exists and credentials are valid
-
- Returns:
- Boolean: True if the pipeline exists and credentials are valid, False otherwise
- """
- try:
- return self.is_access_token_valid()
- except errors.ClientError as e:
- if e.status_code == 404:
- return False
- else:
- raise e
-
- def publish(self, request_body: dict) -> operations.PublishEventResponse:
- """Push a new message into the pipeline
-
- Args:
- request_body: The message to be published into the pipeline
-
- Returns:
- PublishEventResponse: Response object containing the status code and the raw response
-
- Raises:
- ClientError: If an error occurred while publishing the event
- """
- request = operations.PublishEventRequest(
- organization_id=self.organization_id,
- pipeline_id=self.pipeline_id,
- x_pipeline_access_token=self.pipeline_access_token,
- request_body=request_body,
- )
-
- base_url = self.glassflow_client.glassflow_config.server_url
-
- url = utils.generate_url(
- operations.PublishEventRequest,
- base_url,
- "/pipelines/{pipeline_id}/topics/input/events",
- request,
- )
-
- req_content_type, data, form = utils.serialize_request_body(
- request, operations.PublishEventRequest, "request_body", False, True, "json"
- )
-
- headers = self._get_headers(request, req_content_type)
- query_params = utils.get_query_params(operations.PublishEventRequest, request)
-
- client = self.glassflow_client.glassflow_config.client
-
- http_res = client.request(
- "POST", url, params=query_params, data=data, files=form, headers=headers
- )
- content_type = http_res.headers.get("Content-Type")
-
- res = operations.PublishEventResponse(
- status_code=http_res.status_code,
- content_type=content_type,
- raw_response=http_res,
- )
-
- if http_res.status_code == 200:
- pass
- elif http_res.status_code in [400, 500]:
- if utils.match_content_type(content_type, "application/json"):
- out = utils.unmarshal_json(http_res.text, errors.Error)
- out.raw_response = http_res
- raise out
- else:
- raise errors.ClientError(
- f"unknown content-type received: {content_type}",
- http_res.status_code,
- http_res.text,
- http_res,
- )
- elif 400 < http_res.status_code < 600:
- raise errors.ClientError(
- "API error occurred", http_res.status_code, http_res.text, http_res
- )
-
- return res
-
- def consume(self) -> operations.ConsumeEventResponse:
- """Consume the last message from the pipeline
-
- Returns:
- ConsumeEventResponse: Response object containing the status code and the raw response
-
- Raises:
- ClientError: If an error occurred while consuming the event
-
- """
- request = operations.ConsumeEventRequest(
- pipeline_id=self.pipeline_id,
- organization_id=self.organization_id,
- x_pipeline_access_token=self.pipeline_access_token,
- )
-
- base_url = self.glassflow_client.glassflow_config.server_url
-
- url = utils.generate_url(
- operations.ConsumeEventRequest,
- base_url,
- "/pipelines/{pipeline_id}/topics/output/events/consume",
- request,
- )
- headers = self._get_headers(request)
- query_params = utils.get_query_params(operations.ConsumeEventRequest, request)
-
- client = self.glassflow_client.glassflow_config.client
- # make the request
- self._respect_retry_delay()
-
- http_res = client.request("POST", url, params=query_params, headers=headers)
- content_type = http_res.headers.get("Content-Type")
-
- res = operations.ConsumeEventResponse(
- status_code=http_res.status_code,
- content_type=content_type,
- raw_response=http_res,
- )
-
- self._update_retry_delay(http_res.status_code)
- if http_res.status_code == 200:
- self._consume_retry_delay_current = self._consume_retry_delay_minimum
- if utils.match_content_type(content_type, "application/json"):
- body = utils.unmarshal_json(
- http_res.text, Optional[operations.ConsumeEventResponseBody]
- )
- res.body = body
- else:
- raise errors.ClientError(
- f"unknown content-type received: {content_type}",
- http_res.status_code,
- http_res.text,
- http_res,
- )
- elif http_res.status_code == 204:
- # No messages to be consumed.
- # update the retry delay
- # Return an empty response body
- body = operations.ConsumeEventResponseBody("", "", {})
- res.body = body
- elif http_res.status_code == 429:
- # update the retry delay
- body = operations.ConsumeEventResponseBody("", "", {})
- res.body = body
- elif http_res.status_code in [400, 500]:
- if utils.match_content_type(content_type, "application/json"):
- out = utils.unmarshal_json(http_res.text, errors.Error)
- out.raw_response = http_res
- raise out
- else:
- raise errors.ClientError(
- f"unknown content-type received: {content_type}",
- http_res.status_code,
- http_res.text,
- http_res,
- )
- elif 400 < http_res.status_code < 600:
- raise errors.ClientError(
- "API error occurred", http_res.status_code, http_res.text, http_res
- )
-
- return res
-
- def consume_failed(self) -> operations.ConsumeFailedResponse:
- """Consume the failed message from the pipeline
-
- Returns:
- ConsumeFailedResponse: Response object containing the status code and the raw response
-
- Raises:
- ClientError: If an error occurred while consuming the event
-
- """
- request = operations.ConsumeFailedRequest(
- pipeline_id=self.pipeline_id,
- organization_id=self.organization_id,
- x_pipeline_access_token=self.pipeline_access_token,
- )
-
- base_url = self.glassflow_client.glassflow_config.server_url
-
- url = utils.generate_url(
- operations.ConsumeFailedRequest,
- base_url,
- "/pipelines/{pipeline_id}/topics/failed/events/consume",
- request,
- )
- headers = self._get_headers(request)
- query_params = utils.get_query_params(operations.ConsumeFailedRequest, request)
-
- client = self.glassflow_client.glassflow_config.client
- self._respect_retry_delay()
- http_res = client.request("POST", url, params=query_params, headers=headers)
- content_type = http_res.headers.get("Content-Type")
-
- res = operations.ConsumeFailedResponse(
- status_code=http_res.status_code,
- content_type=content_type,
- raw_response=http_res,
- )
-
- self._update_retry_delay(http_res.status_code)
- if http_res.status_code == 200:
- if utils.match_content_type(content_type, "application/json"):
- body = utils.unmarshal_json(
- http_res.text, Optional[operations.ConsumeFailedResponseBody]
- )
- res.body = body
- else:
- raise errors.ClientError(
- f"unknown content-type received: {content_type}",
- http_res.status_code,
- http_res.text,
- http_res,
- )
- elif http_res.status_code == 204:
- # No messages to be consumed. Return an empty response body
- body = operations.ConsumeFailedResponseBody("", "", {})
- res.body = body
- elif http_res.status_code in [400, 500]:
- if utils.match_content_type(content_type, "application/json"):
- out = utils.unmarshal_json(http_res.text, errors.Error)
- out.raw_response = http_res
- raise out
- else:
- raise errors.ClientError(
- f"unknown content-type received: {content_type}",
- http_res.status_code,
- http_res.text,
- http_res,
- )
- elif 400 < http_res.status_code < 600:
- raise errors.ClientError(
- "API error occurred", http_res.status_code, http_res.text, http_res
- )
-
- return res
-
- def _get_headers(
- self, request: dataclasses.dataclass, req_content_type: Optional[str] = None
- ) -> dict:
- headers = utils.get_req_specific_headers(request)
- headers["Accept"] = "application/json"
- headers["Gf-Client"] = self.glassflow_client.glassflow_config.glassflow_client
- headers["User-Agent"] = self.glassflow_client.glassflow_config.user_agent
- headers["Gf-Python-Version"] = (
- f"{sys.version_info.major}."
- f"{sys.version_info.minor}."
- f"{sys.version_info.micro}"
- )
-
- if req_content_type and req_content_type not in (
- "multipart/form-data",
- "multipart/mixed",
- ):
- headers["content-type"] = req_content_type
-
- return headers
-
- def _update_retry_delay(self, status_code: int):
- if status_code == 200:
- self._consume_retry_delay_current = self._consume_retry_delay_minimum
- elif status_code == 204 or status_code == 429:
- self._consume_retry_delay_current *= 2
- self._consume_retry_delay_current = min(
- self._consume_retry_delay_current, self._consume_retry_delay_max
- )
- self._consume_retry_delay_current += random.uniform(0, 0.1)
-
- def _respect_retry_delay(self):
- if self._consume_retry_delay_current > self._consume_retry_delay_minimum:
- # sleep before making the request
- time.sleep(self._consume_retry_delay_current)
diff --git a/src/glassflow/space.py b/src/glassflow/space.py
new file mode 100644
index 0000000..6ad088a
--- /dev/null
+++ b/src/glassflow/space.py
@@ -0,0 +1,116 @@
+from __future__ import annotations
+
+from .client import APIClient
+from .models import api, errors, operations
+
+
+class Space(APIClient):
+ def __init__(
+ self,
+ personal_access_token: str,
+ name: str | None = None,
+ id: str | None = None,
+ created_at: str | None = None,
+ organization_id: str | None = None,
+ ):
+ """Creates a new GlassFlow pipeline object
+
+ Args:
+ personal_access_token: The personal access token to authenticate
+ against GlassFlow
+ name: Name of the space
+ id: ID of the GlassFlow Space you want to create the pipeline in
+ created_at: Timestamp when the space was created
+
+ Returns:
+ Space: Space object to interact with the GlassFlow API
+
+ Raises:
+ """
+ super().__init__()
+ self.name = name
+ self.id = id
+ self.created_at = created_at
+ self.organization_id = organization_id
+ self.personal_access_token = personal_access_token
+
+ def create(self) -> Space:
+ """
+ Creates a new GlassFlow space
+
+ Returns:
+ self: Space object
+
+ Raises:
+ ValueError: If name is not provided in the constructor
+
+ """
+ if self.name is None:
+ raise ValueError("Name must be provided in order to create the space")
+ create_space = api.CreateSpace(name=self.name)
+ request = operations.CreateSpaceRequest(
+ organization_id=self.organization_id,
+ personal_access_token=self.personal_access_token,
+ **create_space.__dict__,
+ )
+ base_res = self._request(method="POST", endpoint="/spaces", request=request)
+
+ res = operations.CreateSpaceResponse(
+ status_code=base_res.status_code,
+ content_type=base_res.content_type,
+ raw_response=base_res.raw_response,
+ **base_res.raw_response.json(),
+ )
+
+ self.id = res.id
+ self.created_at = res.created_at
+ self.name = res.name
+ return self
+
+ def delete(self) -> None:
+ """
+ Deletes a GlassFlow space
+
+ Returns:
+
+ Raises:
+ ValueError: If ID is not provided in the constructor
+ SpaceNotFoundError: If ID provided does not match any
+ existing space in GlassFlow
+ UnauthorizedError: If the Personal Access Token is not
+ provided or is invalid
+ """
+ if self.id is None:
+ raise ValueError("Space id must be provided in the constructor")
+
+ request = operations.DeleteSpaceRequest(
+ space_id=self.id,
+ organization_id=self.organization_id,
+ personal_access_token=self.personal_access_token,
+ )
+ self._request(
+ method="DELETE",
+ endpoint=f"/spaces/{self.id}",
+ request=request,
+ )
+
+ def _request(
+ self,
+ method: str,
+ endpoint: str,
+ request: operations.BaseManagementRequest,
+ **kwargs,
+ ) -> operations.BaseResponse:
+ try:
+ return super()._request(
+ method=method, endpoint=endpoint, request=request, **kwargs
+ )
+ except errors.ClientError as e:
+ if e.status_code == 404:
+ raise errors.SpaceNotFoundError(self.id, e.raw_response) from e
+ elif e.status_code == 401:
+ raise errors.UnauthorizedError(e.raw_response) from e
+ elif e.status_code == 409:
+ raise errors.SpaceIsNotEmptyError(e.raw_response) from e
+ else:
+ raise e
diff --git a/src/glassflow/utils/__init__.py b/src/glassflow/utils/__init__.py
index c7eca83..5f5b06d 100644
--- a/src/glassflow/utils/__init__.py
+++ b/src/glassflow/utils/__init__.py
@@ -1,6 +1,13 @@
-from .utils import (generate_url, get_field_name, get_query_params,
- get_req_specific_headers, marshal_json, match_content_type,
- serialize_request_body, unmarshal_json)
+from .utils import (
+ generate_url,
+ get_field_name,
+ get_query_params,
+ get_req_specific_headers,
+ marshal_json,
+ match_content_type,
+ serialize_request_body,
+ unmarshal_json,
+)
__all__ = [
"match_content_type",
@@ -10,5 +17,5 @@
"get_query_params",
"get_req_specific_headers",
"get_field_name",
- "marshal_json"
-]
\ No newline at end of file
+ "marshal_json",
+]
diff --git a/src/glassflow/utils/utils.py b/src/glassflow/utils/utils.py
index 60e0586..08b2473 100644
--- a/src/glassflow/utils/utils.py
+++ b/src/glassflow/utils/utils.py
@@ -1,3 +1,5 @@
+# ruff: noqa: E501, SIM102
+
import json
import re
from dataclasses import Field, dataclass, fields, is_dataclass, make_dataclass
@@ -5,8 +7,7 @@
from decimal import Decimal
from email.message import Message
from enum import Enum
-from typing import (Any, Callable, Dict, List, Tuple, Union, get_args,
- get_origin)
+from typing import Any, Callable, Dict, List, Tuple, Union, get_args, get_origin
from xmlrpc.client import boolean
from dataclasses_json import DataClassJsonMixin
@@ -306,9 +307,8 @@ def serialize_request_body(
serialization_method: str,
encoder=None,
) -> Tuple[str, any, any]:
- if request is None:
- if not nullable and optional:
- return None, None, None
+ if request is None and not nullable and optional:
+ return None, None, None
if not is_dataclass(request) or not hasattr(request, request_field_name):
return serialize_content_type(
@@ -321,9 +321,8 @@ def serialize_request_body(
request_val = getattr(request, request_field_name)
- if request_val is None:
- if not nullable and optional:
- return None, None, None
+ if request_val is None and not nullable and optional:
+ return None, None, None
request_fields: Tuple[Field, ...] = fields(request)
request_metadata = None
@@ -387,7 +386,7 @@ def serialize_multipart_form(
file_name = ""
field_name = ""
- content = bytes()
+ content = b""
for file_field in file_fields:
file_metadata = file_field.metadata.get("multipart_form")
@@ -399,7 +398,7 @@ def serialize_multipart_form(
else:
field_name = file_metadata.get("field_name", file_field.name)
file_name = getattr(val, file_field.name)
- if field_name == "" or file_name == "" or content == bytes():
+ if field_name == "" or file_name == "" or content == b"":
raise Exception("invalid multipart/form-data file")
form.append([field_name, [file_name, content]])
@@ -635,11 +634,7 @@ def match_content_type(content_type: str, pattern: str) -> boolean:
return True
parts = media_type.split("/")
- if len(parts) == 2:
- if pattern in (f"{parts[0]}/*", f"*/{parts[1]}"):
- return True
-
- return False
+ return len(parts) == 2 and pattern in (f"{parts[0]}/*", f"*/{parts[1]}")
def get_field_name(name):
diff --git a/tests/data/transformation.py b/tests/data/transformation.py
new file mode 100644
index 0000000..ff23e28
--- /dev/null
+++ b/tests/data/transformation.py
@@ -0,0 +1,3 @@
+def handler(data, log):
+ data["new_field"] = "new_value"
+ return data
diff --git a/tests/data/transformation_2.py b/tests/data/transformation_2.py
new file mode 100644
index 0000000..9f903c3
--- /dev/null
+++ b/tests/data/transformation_2.py
@@ -0,0 +1,4 @@
+def handler(data, log):
+ data["new_field"] = "new_value"
+ log.info(data)
+ return data
diff --git a/tests/glassflow/conftest.py b/tests/glassflow/conftest.py
index dc367f8..9932a4a 100644
--- a/tests/glassflow/conftest.py
+++ b/tests/glassflow/conftest.py
@@ -1,35 +1,4 @@
-import os
-import uuid
+from glassflow.api_client import APIClient
-import pytest
-
-from glassflow.client import GlassFlowClient
-
-
-@pytest.fixture
-def client():
- return GlassFlowClient()
-
-
-@pytest.fixture
-def pipeline_credentials():
- return {
- "pipeline_id": os.getenv("PIPELINE_ID"),
- "pipeline_access_token": os.getenv("PIPELINE_ACCESS_TOKEN"),
- }
-
-
-@pytest.fixture
-def pipeline_credentials_invalid_token():
- return {
- "pipeline_id": os.getenv("PIPELINE_ID"),
- "pipeline_access_token": "invalid-pipeline-access-token",
- }
-
-
-@pytest.fixture
-def pipeline_credentials_invalid_id():
- return {
- "pipeline_id": str(uuid.uuid4()),
- "pipeline_access_token": os.getenv("PIPELINE_ACCESS_TOKEN"),
- }
+# Use staging api server
+APIClient.glassflow_config.server_url = "https://staging.api.glassflow.dev/v1"
diff --git a/tests/glassflow/integration_tests/client_test.py b/tests/glassflow/integration_tests/client_test.py
new file mode 100644
index 0000000..7def0bc
--- /dev/null
+++ b/tests/glassflow/integration_tests/client_test.py
@@ -0,0 +1,15 @@
+def test_get_pipeline_ok(client, creating_pipeline):
+ p = client.get_pipeline(pipeline_id=creating_pipeline.id)
+
+ assert p.id == creating_pipeline.id
+ assert p.name == creating_pipeline.name
+
+
+def test_list_pipelines_ok(client, creating_pipeline):
+ res = client.list_pipelines()
+
+ assert res.status_code == 200
+ assert res.content_type == "application/json"
+ assert res.total_amount >= 1
+ assert res.pipelines[-1]["id"] == creating_pipeline.id
+ assert res.pipelines[-1]["name"] == creating_pipeline.name
diff --git a/tests/glassflow/integration_tests/conftest.py b/tests/glassflow/integration_tests/conftest.py
new file mode 100644
index 0000000..4ec92fb
--- /dev/null
+++ b/tests/glassflow/integration_tests/conftest.py
@@ -0,0 +1,117 @@
+import os
+import uuid
+
+import pytest
+
+from glassflow import (
+ GlassFlowClient,
+ Pipeline,
+ PipelineDataSink,
+ PipelineDataSource,
+ Space,
+)
+
+
+@pytest.fixture(scope="session")
+def client():
+ return GlassFlowClient(os.getenv("PERSONAL_ACCESS_TOKEN"))
+
+
+@pytest.fixture
+def space(client):
+ return Space(
+ name="integration-tests", personal_access_token=client.personal_access_token
+ )
+
+
+@pytest.fixture
+def creating_space(space):
+ space.create()
+ yield space
+ space.delete()
+
+
+@pytest.fixture
+def space_with_random_id(client):
+ return Space(
+ id=str(uuid.uuid4()),
+ personal_access_token=client.personal_access_token,
+ )
+
+
+@pytest.fixture
+def space_with_random_id_and_invalid_token(client):
+ return Space(
+ id=str(uuid.uuid4()),
+ personal_access_token="invalid-token",
+ )
+
+
+@pytest.fixture
+def pipeline(client, creating_space):
+ return Pipeline(
+ name="test_pipeline",
+ space_id=creating_space.id,
+ transformation_file="tests/data/transformation.py",
+ personal_access_token=client.personal_access_token,
+ )
+
+
+@pytest.fixture
+def pipeline_with_random_id(client):
+ return Pipeline(
+ id=str(uuid.uuid4()),
+ personal_access_token=client.personal_access_token,
+ )
+
+
+@pytest.fixture
+def pipeline_with_random_id_and_invalid_token():
+ return Pipeline(
+ id=str(uuid.uuid4()),
+ personal_access_token="invalid-token",
+ )
+
+
+@pytest.fixture
+def creating_pipeline(pipeline):
+ pipeline.create()
+ yield pipeline
+ pipeline.delete()
+
+
+@pytest.fixture
+def source(creating_pipeline):
+ return PipelineDataSource(
+ pipeline_id=creating_pipeline.id,
+ pipeline_access_token=creating_pipeline.access_tokens[0]["token"],
+ )
+
+
+@pytest.fixture
+def source_with_invalid_access_token(creating_pipeline):
+ return PipelineDataSource(
+ pipeline_id=creating_pipeline.id, pipeline_access_token="invalid-access-token"
+ )
+
+
+@pytest.fixture
+def source_with_non_existing_id(creating_pipeline):
+ return PipelineDataSource(
+ pipeline_id=str(uuid.uuid4()),
+ pipeline_access_token=creating_pipeline.access_tokens[0]["token"],
+ )
+
+
+@pytest.fixture
+def source_with_published_events(source):
+ source.publish({"test_field": "test_value"})
+ yield source
+
+
+@pytest.fixture
+def sink(source_with_published_events):
+ return PipelineDataSink(
+ pipeline_id=source_with_published_events.pipeline_id,
+ pipeline_access_token=source_with_published_events.pipeline_access_token,
+ )
diff --git a/tests/glassflow/integration_tests/pipeline_data_test.py b/tests/glassflow/integration_tests/pipeline_data_test.py
new file mode 100644
index 0000000..1dcf9d3
--- /dev/null
+++ b/tests/glassflow/integration_tests/pipeline_data_test.py
@@ -0,0 +1,53 @@
+import pytest
+
+from glassflow import errors
+
+
+def test_using_staging_server(source, sink):
+ assert source.glassflow_config.server_url == "https://staging.api.glassflow.dev/v1"
+ assert sink.glassflow_config.server_url == "https://staging.api.glassflow.dev/v1"
+
+
+def test_validate_credentials_from_pipeline_data_source_ok(source):
+ try:
+ source.validate_credentials()
+ except Exception as e:
+ pytest.fail(e)
+
+
+def test_validate_credentials_from_pipeline_data_source_fail_with_invalid_access_token(
+ source_with_invalid_access_token,
+):
+ with pytest.raises(errors.PipelineAccessTokenInvalidError):
+ source_with_invalid_access_token.validate_credentials()
+
+
+def test_validate_credentials_from_pipeline_data_source_fail_with_id_not_found(
+ source_with_non_existing_id,
+):
+ with pytest.raises(errors.PipelineNotFoundError):
+ source_with_non_existing_id.validate_credentials()
+
+
+def test_publish_to_pipeline_data_source_ok(source):
+ publish_response = source.publish({"test_field": "test_value"})
+ assert publish_response.status_code == 200
+
+
+def test_consume_from_pipeline_data_sink_ok(sink):
+ n_tries = 0
+ max_tries = 10
+ while True:
+ if n_tries == max_tries:
+ pytest.fail("Max tries exceeded")
+
+ consume_response = sink.consume()
+ assert consume_response.status_code in (200, 204)
+ if consume_response.status_code == 200:
+ assert consume_response.json() == {
+ "test_field": "test_value",
+ "new_field": "new_value",
+ }
+ break
+ else:
+ n_tries += 1
diff --git a/tests/glassflow/integration_tests/pipeline_test.py b/tests/glassflow/integration_tests/pipeline_test.py
index f525214..144fca8 100644
--- a/tests/glassflow/integration_tests/pipeline_test.py
+++ b/tests/glassflow/integration_tests/pipeline_test.py
@@ -1,66 +1,95 @@
import pytest
-from glassflow.models.errors import ClientError
-
-
-def test_pipeline_is_access_token_valid_ok(client, pipeline_credentials):
- pipeline = client.pipeline_client(**pipeline_credentials)
-
- is_valid = pipeline.is_access_token_valid()
- assert is_valid
-
-
-def test_pipeline_is_access_token_valid_not_ok(
- client, pipeline_credentials_invalid_token
-):
- pipeline = client.pipeline_client(**pipeline_credentials_invalid_token)
-
- is_valid = pipeline.is_access_token_valid()
- assert not is_valid
-
-
-def test_pipeline_is_access_token_valid_with_invalid_credentials(
- client, pipeline_credentials_invalid_id
-):
- pipeline = client.pipeline_client(**pipeline_credentials_invalid_id)
-
- with pytest.raises(ClientError) as exc_info:
- pipeline.is_access_token_valid()
- exc = exc_info.value
- assert exc.status_code == 404
-
-
-def test_pipeline_is_valid_with_invalid_pipeline_id(
- client, pipeline_credentials_invalid_id
-):
- pipeline = client.pipeline_client(**pipeline_credentials_invalid_id)
-
- assert pipeline.is_valid() is False
-
-
-def test_pipeline_is_valid_with_invalid_pipeline_token(
- client, pipeline_credentials_invalid_token
-):
- pipeline = client.pipeline_client(**pipeline_credentials_invalid_token)
-
- assert pipeline.is_valid() is False
-
-
-def test_pipeline_is_valid_ok(
- client, pipeline_credentials
-):
- pipeline = client.pipeline_client(**pipeline_credentials)
-
- assert pipeline.is_valid() is True
-
-
-def test_pipeline_publish_and_consume(client, pipeline_credentials):
- pipeline = client.pipeline_client(**pipeline_credentials)
- publish_response = pipeline.publish({"test-key": "test-value"})
- assert publish_response.status_code == 200
+from glassflow.models import api, errors
+
+
+def test_create_pipeline_ok(creating_pipeline):
+ assert creating_pipeline.name == "test_pipeline"
+ assert creating_pipeline.id is not None
+
+
+def test_fetch_pipeline_ok(creating_pipeline):
+ creating_pipeline.fetch()
+ assert creating_pipeline.name == "test_pipeline"
+ assert creating_pipeline.id is not None
+ assert creating_pipeline.created_at is not None
+
+
+def test_fetch_pipeline_fail_with_404(pipeline_with_random_id):
+ with pytest.raises(errors.PipelineNotFoundError):
+ pipeline_with_random_id.fetch()
+
+
+def test_fetch_pipeline_fail_with_401(pipeline_with_random_id_and_invalid_token):
+ with pytest.raises(errors.UnauthorizedError):
+ pipeline_with_random_id_and_invalid_token.fetch()
+
+
+def test_update_pipeline_ok(creating_pipeline):
+ updated_pipeline = creating_pipeline.update(
+ name="new_name",
+ sink_kind="webhook",
+ state="paused",
+ sink_config={
+ "url": "www.test-url.com",
+ "method": "GET",
+ "headers": [{"name": "header1", "value": "header1"}],
+ },
+ transformation_file="tests/data/transformation_2.py",
+ requirements="requests,pandas",
+ env_vars=[
+ {"name": "env1", "value": "env1"},
+ {"name": "env2", "value": "env2"},
+ ],
+ )
+ assert updated_pipeline.name == "new_name"
+ assert updated_pipeline.sink_kind == "webhook"
+ assert updated_pipeline.sink_config == {
+ "url": "www.test-url.com",
+ "method": "GET",
+ "headers": [{"name": "header1", "value": "header1"}],
+ }
+ assert updated_pipeline.env_vars == [
+ {"name": "env1", "value": "env1"},
+ {"name": "env2", "value": "env2"},
+ ]
+ with open("tests/data/transformation_2.py") as f:
+ assert updated_pipeline.transformation_code == f.read()
+
+ assert updated_pipeline.source_kind == creating_pipeline.source_kind
+ assert updated_pipeline.source_config == creating_pipeline.source_config
+ assert updated_pipeline.state == api.PipelineState.paused
+
+
+def test_delete_pipeline_fail_with_404(pipeline_with_random_id):
+ with pytest.raises(errors.PipelineNotFoundError):
+ pipeline_with_random_id.delete()
+
+
+def test_delete_pipeline_fail_with_401(pipeline_with_random_id_and_invalid_token):
+ with pytest.raises(errors.UnauthorizedError):
+ pipeline_with_random_id_and_invalid_token.delete()
+
+
+def test_get_logs_from_pipeline_ok(creating_pipeline):
+ import time
+
+ n_tries = 0
+ max_tries = 10
while True:
- consume_response = pipeline.consume()
- assert consume_response.status_code in (200, 204)
- if consume_response.status_code == 200:
- assert consume_response.json() == {"test-key": "test-value"}
+ if n_tries == max_tries:
+ pytest.fail("Max tries reached")
+
+ logs = creating_pipeline.get_logs()
+ if len(logs.logs) >= 2:
break
+ else:
+ n_tries += 1
+ time.sleep(1)
+
+ assert logs.status_code == 200
+ assert logs.content_type == "application/json"
+ assert logs.logs[0].payload.message == "Function is uploaded"
+ assert logs.logs[0].level == "INFO"
+ assert logs.logs[1].payload.message == "Pipeline is created"
+ assert logs.logs[1].level == "INFO"
diff --git a/tests/glassflow/integration_tests/space_test.py b/tests/glassflow/integration_tests/space_test.py
new file mode 100644
index 0000000..e17278b
--- /dev/null
+++ b/tests/glassflow/integration_tests/space_test.py
@@ -0,0 +1,18 @@
+import pytest
+
+from glassflow import errors
+
+
+def test_create_space_ok(creating_space):
+ assert creating_space.name == "integration-tests"
+ assert creating_space.id is not None
+
+
+def test_delete_space_fail_with_404(space_with_random_id):
+ with pytest.raises(errors.SpaceNotFoundError):
+ space_with_random_id.delete()
+
+
+def test_delete_space_fail_with_401(space_with_random_id_and_invalid_token):
+ with pytest.raises(errors.UnauthorizedError):
+ space_with_random_id_and_invalid_token.delete()
diff --git a/tests/glassflow/unit_tests/__init__.py b/tests/glassflow/unit_tests/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/tests/glassflow/unit_tests/client_test.py b/tests/glassflow/unit_tests/client_test.py
new file mode 100644
index 0000000..6363263
--- /dev/null
+++ b/tests/glassflow/unit_tests/client_test.py
@@ -0,0 +1,48 @@
+import pytest
+
+from glassflow.models import errors
+
+
+@pytest.fixture
+def list_pipelines_response():
+ return {
+ "total_amount": 1,
+ "pipelines": [
+ {
+ "name": "test-name",
+ "space_id": "test-space-id",
+ "metadata": {"additionalProp1": {}},
+ "id": "test-id",
+ "created_at": "2024-09-25T13:52:17.910Z",
+ "state": "running",
+ "space_name": "test-space-name",
+ }
+ ],
+ }
+
+
+def test_list_pipelines_ok(requests_mock, list_pipelines_response, client):
+ requests_mock.get(
+ client.glassflow_config.server_url + "/pipelines",
+ json=list_pipelines_response,
+ status_code=200,
+ headers={"Content-Type": "application/json"},
+ )
+
+ res = client.list_pipelines()
+
+ assert res.status_code == 200
+ assert res.content_type == "application/json"
+ assert res.total_amount == list_pipelines_response["total_amount"]
+ assert res.pipelines == list_pipelines_response["pipelines"]
+
+
+def test_list_pipelines_fail_with_401(requests_mock, client):
+ requests_mock.get(
+ client.glassflow_config.server_url + "/pipelines",
+ status_code=401,
+ headers={"Content-Type": "application/json"},
+ )
+
+ with pytest.raises(errors.UnauthorizedError):
+ client.list_pipelines()
diff --git a/tests/glassflow/unit_tests/conftest.py b/tests/glassflow/unit_tests/conftest.py
new file mode 100644
index 0000000..ea936f0
--- /dev/null
+++ b/tests/glassflow/unit_tests/conftest.py
@@ -0,0 +1,170 @@
+import pytest
+
+from glassflow import GlassFlowClient
+
+
+@pytest.fixture
+def client():
+ return GlassFlowClient()
+
+
+@pytest.fixture
+def get_pipeline_request_mock(client, requests_mock, fetch_pipeline_response):
+ return requests_mock.get(
+ client.glassflow_config.server_url + "/pipelines/test-id",
+ json=fetch_pipeline_response,
+ status_code=200,
+ headers={"Content-Type": "application/json"},
+ )
+
+
+@pytest.fixture
+def get_access_token_request_mock(
+ client, requests_mock, fetch_pipeline_response, access_tokens_response
+):
+ return requests_mock.get(
+ client.glassflow_config.server_url
+ + f"/pipelines/{fetch_pipeline_response['id']}/access_tokens",
+ json=access_tokens_response,
+ status_code=200,
+ headers={"Content-Type": "application/json"},
+ )
+
+
+@pytest.fixture
+def get_pipeline_function_source_request_mock(
+ client, requests_mock, fetch_pipeline_response, function_source_response
+):
+ return requests_mock.get(
+ client.glassflow_config.server_url
+ + f"/pipelines/{fetch_pipeline_response['id']}/functions/main/artifacts/latest",
+ json=function_source_response,
+ status_code=200,
+ headers={"Content-Type": "application/json"},
+ )
+
+
+@pytest.fixture
+def update_pipeline_request_mock(
+ client, requests_mock, fetch_pipeline_response, update_pipeline_response
+):
+ return requests_mock.patch(
+ client.glassflow_config.server_url
+ + f"/pipelines/{fetch_pipeline_response['id']}",
+ json=update_pipeline_response,
+ status_code=200,
+ headers={"Content-Type": "application/json"},
+ )
+
+
+@pytest.fixture
+def fetch_pipeline_response():
+ return {
+ "id": "test-id",
+ "name": "test-name",
+ "space_id": "test-space-id",
+ "metadata": {},
+ "created_at": "2024-09-23T10:08:45.529Z",
+ "state": "running",
+ "space_name": "test-space-name",
+ "source_connector": {
+ "kind": "google_pubsub",
+ "config": {
+ "project_id": "test-project",
+ "subscription_id": "test-subscription",
+ "credentials_json": "credentials.json",
+ },
+ },
+ "sink_connector": {
+ "kind": "webhook",
+ "config": {
+ "url": "www.test-url.com",
+ "method": "GET",
+ "headers": [
+ {"name": "header1", "value": "header1"},
+ {"name": "header2", "value": "header2"},
+ ],
+ },
+ },
+ "environments": [{"test-var": "test-var"}],
+ }
+
+
+@pytest.fixture
+def update_pipeline_response(fetch_pipeline_response):
+ fetch_pipeline_response["name"] = "updated name"
+ fetch_pipeline_response["source_connector"] = None
+ return fetch_pipeline_response
+
+
+@pytest.fixture
+def create_pipeline_response():
+ return {
+ "name": "test-name",
+ "space_id": "string",
+ "metadata": {"additionalProp1": {}},
+ "id": "test-id",
+ "created_at": "2024-09-23T10:08:45.529Z",
+ "state": "running",
+ "access_token": "string",
+ }
+
+
+@pytest.fixture
+def create_space_response():
+ return {
+ "name": "test-space",
+ "id": "test-space-id",
+ "created_at": "2024-09-30T02:47:51.901Z",
+ }
+
+
+@pytest.fixture
+def access_tokens_response():
+ return {
+ "total_amount": 2,
+ "access_tokens": [
+ {
+ "name": "token1",
+ "id": "string",
+ "token": "string",
+ "created_at": "2024-09-25T10:46:18.468Z",
+ },
+ {
+ "name": "token2",
+ "id": "string",
+ "token": "string",
+ "created_at": "2024-09-26T04:28:51.782Z",
+ },
+ ],
+ }
+
+
+@pytest.fixture
+def function_source_response():
+ return {
+ "files": [{"name": "string", "content": "string"}],
+ "transformation_function": "string",
+ "requirements_txt": "string",
+ }
+
+
+@pytest.fixture
+def get_logs_response():
+ return {
+ "logs": [
+ {
+ "level": "INFO",
+ "severity_code": 0,
+ "timestamp": "2024-09-30T16:04:08.211Z",
+ "payload": {"message": "Info Message Log", "additionalProp1": {}},
+ },
+ {
+ "level": "ERROR",
+ "severity_code": 500,
+ "timestamp": "2024-09-30T16:04:08.211Z",
+ "payload": {"message": "Error Message Log", "additionalProp1": {}},
+ },
+ ],
+ "next": "string",
+ }
diff --git a/tests/glassflow/unit_tests/pipeline_data_test.py b/tests/glassflow/unit_tests/pipeline_data_test.py
new file mode 100644
index 0000000..bc8bd79
--- /dev/null
+++ b/tests/glassflow/unit_tests/pipeline_data_test.py
@@ -0,0 +1,310 @@
+import pytest
+
+from glassflow import PipelineDataSink, PipelineDataSource
+from glassflow.models import errors
+from glassflow.pipeline_data import PipelineDataClient
+
+
+@pytest.fixture
+def consume_payload():
+ return {
+ "req_id": "string",
+ "receive_time": "2024-09-23T07:28:27.958Z",
+ "payload": {},
+ "event_context": {
+ "request_id": "string",
+ "external_id": "string",
+ "receive_time": "2024-09-23T07:28:27.958Z",
+ },
+ "status": "string",
+ "response": {},
+ }
+
+
+def test_validate_credentials_ok(requests_mock):
+ data_client = PipelineDataClient(
+ pipeline_id="test-id",
+ pipeline_access_token="test-token",
+ )
+ requests_mock.get(
+ data_client.glassflow_config.server_url
+ + "/pipelines/test-id/status/access_token",
+ status_code=200,
+ headers={
+ "Content-Type": "application/json",
+ "X-pipeline-access-token": data_client.pipeline_access_token,
+ },
+ )
+ data_client.validate_credentials()
+
+
+def test_push_to_pipeline_data_source_ok(requests_mock):
+ source = PipelineDataSource(
+ pipeline_id="test-id",
+ pipeline_access_token="test-access-token",
+ )
+ requests_mock.post(
+ source.glassflow_config.server_url + "/pipelines/test-id/topics/input/events",
+ status_code=200,
+ headers={
+ "Content-Type": "application/json",
+ "X-pipeline-access-token": "test-access-token",
+ },
+ )
+
+ res = source.publish({"test": "test"})
+
+ assert res.status_code == 200
+ assert res.content_type == "application/json"
+
+
+def test_push_to_pipeline_data_source_fail_with_404(requests_mock):
+ source = PipelineDataSource(
+ pipeline_id="test-id",
+ pipeline_access_token="test-access-token",
+ )
+ requests_mock.post(
+ source.glassflow_config.server_url + "/pipelines/test-id/topics/input/events",
+ status_code=404,
+ headers={
+ "Content-Type": "application/json",
+ "X-pipeline-access-token": "test-access-token",
+ },
+ )
+
+ with pytest.raises(errors.PipelineNotFoundError):
+ source.publish({"test": "test"})
+
+
+def test_push_to_pipeline_data_source_fail_with_401(requests_mock):
+ source = PipelineDataSource(
+ pipeline_id="test-id",
+ pipeline_access_token="test-access-token",
+ )
+ requests_mock.post(
+ source.glassflow_config.server_url + "/pipelines/test-id/topics/input/events",
+ status_code=401,
+ headers={
+ "Content-Type": "application/json",
+ "X-pipeline-access-token": "test-access-token",
+ },
+ )
+
+ with pytest.raises(errors.PipelineAccessTokenInvalidError):
+ source.publish({"test": "test"})
+
+
+def test_consume_from_pipeline_data_sink_ok(requests_mock, consume_payload):
+ sink = PipelineDataSink(
+ pipeline_id="test-id",
+ pipeline_access_token="test-access-token",
+ )
+ requests_mock.post(
+ sink.glassflow_config.server_url
+ + "/pipelines/test-id/topics/output/events/consume",
+ json=consume_payload,
+ status_code=200,
+ headers={
+ "Content-Type": "application/json",
+ "X-pipeline-access-token": "test-access-token",
+ },
+ )
+
+ res = sink.consume()
+
+ assert res.status_code == 200
+ assert res.content_type == "application/json"
+ assert res.body.req_id == consume_payload["req_id"]
+
+
+def test_consume_from_pipeline_data_sink_fail_with_404(requests_mock):
+ sink = PipelineDataSink(
+ pipeline_id="test-id",
+ pipeline_access_token="test-access-token",
+ )
+ requests_mock.post(
+ sink.glassflow_config.server_url
+ + "/pipelines/test-id/topics/output/events/consume",
+ json={"test-data": "test-data"},
+ status_code=404,
+ headers={
+ "Content-Type": "application/json",
+ "X-pipeline-access-token": "test-access-token",
+ },
+ )
+
+ with pytest.raises(errors.PipelineNotFoundError):
+ sink.consume()
+
+
+def test_consume_from_pipeline_data_sink_fail_with_401(requests_mock):
+ sink = PipelineDataSink(
+ pipeline_id="test-id",
+ pipeline_access_token="test-access-token",
+ )
+ requests_mock.post(
+ sink.glassflow_config.server_url
+ + "/pipelines/test-id/topics/output/events/consume",
+ json={"test-data": "test-data"},
+ status_code=401,
+ headers={
+ "Content-Type": "application/json",
+ "X-pipeline-access-token": "test-access-token",
+ },
+ )
+
+ with pytest.raises(errors.PipelineAccessTokenInvalidError):
+ sink.consume()
+
+
+def test_consume_from_pipeline_data_sink_ok_with_empty_response(requests_mock):
+ sink = PipelineDataSink(
+ pipeline_id="test-id",
+ pipeline_access_token="test-access-token",
+ )
+ requests_mock.post(
+ sink.glassflow_config.server_url
+ + "/pipelines/test-id/topics/output/events/consume",
+ status_code=204,
+ headers={
+ "Content-Type": "application/json",
+ "X-pipeline-access-token": "test-access-token",
+ },
+ )
+
+ res = sink.consume()
+
+ assert res.status_code == 204
+ assert res.content_type == "application/json"
+ assert res.body.event == {}
+
+
+def test_consume_from_pipeline_data_sink_ok_with_too_many_requests(requests_mock):
+ sink = PipelineDataSink(
+ pipeline_id="test-id",
+ pipeline_access_token="test-access-token",
+ )
+ requests_mock.post(
+ sink.glassflow_config.server_url
+ + "/pipelines/test-id/topics/output/events/consume",
+ status_code=429,
+ headers={
+ "Content-Type": "application/json",
+ "X-pipeline-access-token": "test-access-token",
+ },
+ )
+
+ res = sink.consume()
+
+ assert res.status_code == 429
+ assert res.content_type == "application/json"
+ assert res.body.event == {}
+
+
+def test_consume_failed_from_pipeline_data_sink_ok(requests_mock, consume_payload):
+ sink = PipelineDataSink(
+ pipeline_id="test-id",
+ pipeline_access_token="test-access-token",
+ )
+ requests_mock.post(
+ sink.glassflow_config.server_url
+ + "/pipelines/test-id/topics/failed/events/consume",
+ json=consume_payload,
+ status_code=200,
+ headers={
+ "Content-Type": "application/json",
+ "X-pipeline-access-token": "test-access-token",
+ },
+ )
+
+ res = sink.consume_failed()
+
+ assert res.status_code == 200
+ assert res.content_type == "application/json"
+ assert res.body.req_id == consume_payload["req_id"]
+
+
+def test_consume_failed_from_pipeline_data_sink_ok_with_empty_response(requests_mock):
+ sink = PipelineDataSink(
+ pipeline_id="test-id",
+ pipeline_access_token="test-access-token",
+ )
+ requests_mock.post(
+ sink.glassflow_config.server_url
+ + "/pipelines/test-id/topics/failed/events/consume",
+ status_code=204,
+ headers={
+ "Content-Type": "application/json",
+ "X-pipeline-access-token": "test-access-token",
+ },
+ )
+
+ res = sink.consume_failed()
+
+ assert res.status_code == 204
+ assert res.content_type == "application/json"
+ assert res.body.event == {}
+
+
+def test_consume_failed_from_pipeline_data_sink_ok_with_too_many_requests(
+ requests_mock,
+):
+ sink = PipelineDataSink(
+ pipeline_id="test-id",
+ pipeline_access_token="test-access-token",
+ )
+ requests_mock.post(
+ sink.glassflow_config.server_url
+ + "/pipelines/test-id/topics/failed/events/consume",
+ status_code=429,
+ headers={
+ "Content-Type": "application/json",
+ "X-pipeline-access-token": "test-access-token",
+ },
+ )
+
+ res = sink.consume_failed()
+
+ assert res.status_code == 429
+ assert res.content_type == "application/json"
+ assert res.body.event == {}
+
+
+def test_consume_failed_from_pipeline_data_sink_fail_with_404(requests_mock):
+ sink = PipelineDataSink(
+ pipeline_id="test-id",
+ pipeline_access_token="test-access-token",
+ )
+ requests_mock.post(
+ sink.glassflow_config.server_url
+ + "/pipelines/test-id/topics/failed/events/consume",
+ json={"test-data": "test-data"},
+ status_code=404,
+ headers={
+ "Content-Type": "application/json",
+ "X-pipeline-access-token": "test-access-token",
+ },
+ )
+
+ with pytest.raises(errors.PipelineNotFoundError):
+ sink.consume_failed()
+
+
+def test_consume_failed_from_pipeline_data_sink_fail_with_401(requests_mock):
+ sink = PipelineDataSink(
+ pipeline_id="test-id",
+ pipeline_access_token="test-access-token",
+ )
+ requests_mock.post(
+ sink.glassflow_config.server_url
+ + "/pipelines/test-id/topics/failed/events/consume",
+ json={"test-data": "test-data"},
+ status_code=401,
+ headers={
+ "Content-Type": "application/json",
+ "X-pipeline-access-token": "test-access-token",
+ },
+ )
+
+ with pytest.raises(errors.PipelineAccessTokenInvalidError):
+ sink.consume_failed()
diff --git a/tests/glassflow/unit_tests/pipeline_test.py b/tests/glassflow/unit_tests/pipeline_test.py
new file mode 100644
index 0000000..4521d6d
--- /dev/null
+++ b/tests/glassflow/unit_tests/pipeline_test.py
@@ -0,0 +1,272 @@
+import pytest
+
+from glassflow.models import errors
+from glassflow.pipeline import Pipeline
+
+
+def test_pipeline_with_transformation_file():
+ try:
+ p = Pipeline(
+ transformation_file="tests/data/transformation.py",
+ personal_access_token="test-token",
+ )
+ p._read_transformation_file()
+ assert p.transformation_code is not None
+ except Exception as e:
+ pytest.fail(e)
+
+
+def test_pipeline_fail_with_file_not_found():
+ with pytest.raises(FileNotFoundError):
+ p = Pipeline(
+ transformation_file="fake_file.py", personal_access_token="test-token"
+ )
+ p._read_transformation_file()
+
+
+def test_pipeline_fail_with_missing_sink_data():
+ with pytest.raises(ValueError) as e:
+ Pipeline(
+ transformation_file="tests/data/transformation.py",
+ personal_access_token="test-token",
+ sink_kind="google_pubsub",
+ )
+ assert str(e.value) == "Both sink_kind and sink_config must be provided"
+
+
+def test_pipeline_fail_with_missing_source_data():
+ with pytest.raises(ValueError) as e:
+ Pipeline(
+ transformation_file="tests/data/transformation.py",
+ personal_access_token="test-token",
+ source_kind="google_pubsub",
+ )
+ assert str(e.value) == "Both source_kind and source_config must be provided"
+
+
+def test_fetch_pipeline_ok(
+ get_pipeline_request_mock,
+ get_access_token_request_mock,
+ get_pipeline_function_source_request_mock,
+ fetch_pipeline_response,
+ function_source_response,
+):
+ pipeline = Pipeline(
+ id=fetch_pipeline_response["id"],
+ personal_access_token="test-token",
+ ).fetch()
+
+ assert pipeline.name == fetch_pipeline_response["name"]
+ assert len(pipeline.access_tokens) > 0
+ assert (
+ pipeline.transformation_code
+ == function_source_response["transformation_function"]
+ )
+ assert pipeline.requirements == function_source_response["requirements_txt"]
+
+
+def test_fetch_pipeline_fail_with_404(requests_mock, fetch_pipeline_response, client):
+ requests_mock.get(
+ client.glassflow_config.server_url + "/pipelines/test-id",
+ json=fetch_pipeline_response,
+ status_code=404,
+ headers={"Content-Type": "application/json"},
+ )
+
+ with pytest.raises(errors.PipelineNotFoundError):
+ Pipeline(
+ id=fetch_pipeline_response["id"],
+ personal_access_token="test-token",
+ ).fetch()
+
+
+def test_fetch_pipeline_fail_with_401(requests_mock, fetch_pipeline_response, client):
+ requests_mock.get(
+ client.glassflow_config.server_url + "/pipelines/test-id",
+ json=fetch_pipeline_response,
+ status_code=401,
+ headers={"Content-Type": "application/json"},
+ )
+
+ with pytest.raises(errors.UnauthorizedError):
+ Pipeline(
+ id=fetch_pipeline_response["id"],
+ personal_access_token="test-token",
+ ).fetch()
+
+
+def test_create_pipeline_ok(
+ requests_mock, fetch_pipeline_response, create_pipeline_response, client
+):
+ requests_mock.post(
+ client.glassflow_config.server_url + "/pipelines",
+ json=create_pipeline_response,
+ status_code=200,
+ headers={"Content-Type": "application/json"},
+ )
+ pipeline = Pipeline(
+ name=fetch_pipeline_response["name"],
+ space_id=create_pipeline_response["space_id"],
+ transformation_file="tests/data/transformation.py",
+ personal_access_token="test-token",
+ ).create()
+
+ assert pipeline.id == "test-id"
+ assert pipeline.name == "test-name"
+
+
+def test_create_pipeline_fail_with_missing_name(client):
+ with pytest.raises(ValueError) as e:
+ Pipeline(
+ space_id="test-space-id",
+ transformation_file="tests/data/transformation.py",
+ personal_access_token="test-token",
+ ).create()
+
+ assert e.value.__str__() == (
+ "Name must be provided in order to " "create the pipeline"
+ )
+
+
+def test_create_pipeline_fail_with_missing_space_id(client):
+ with pytest.raises(ValueError) as e:
+ Pipeline(
+ name="test-name",
+ transformation_file="tests/data/transformation.py",
+ personal_access_token="test-token",
+ ).create()
+
+ assert str(e.value) == ("Argument space_id must be provided in the constructor")
+
+
+def test_create_pipeline_fail_with_missing_transformation(client):
+ with pytest.raises(ValueError) as e:
+ Pipeline(
+ name="test-name",
+ space_id="test-space-id",
+ personal_access_token="test-token",
+ ).create()
+
+ assert str(e.value) == (
+ "Argument transformation_file must be provided in the constructor"
+ )
+
+
+def test_update_pipeline_ok(
+ get_pipeline_request_mock,
+ get_access_token_request_mock,
+ get_pipeline_function_source_request_mock,
+ update_pipeline_request_mock,
+ fetch_pipeline_response,
+ update_pipeline_response,
+):
+ pipeline = (
+ Pipeline(personal_access_token="test-token")
+ ._fill_pipeline_details(fetch_pipeline_response)
+ .update()
+ )
+
+ assert pipeline.name == update_pipeline_response["name"]
+ assert pipeline.source_connector == update_pipeline_response["source_connector"]
+
+
+def test_delete_pipeline_ok(requests_mock, client):
+ requests_mock.delete(
+ client.glassflow_config.server_url + "/pipelines/test-pipeline-id",
+ status_code=204,
+ headers={"Content-Type": "application/json"},
+ )
+ pipeline = Pipeline(
+ id="test-pipeline-id",
+ personal_access_token="test-token",
+ )
+ pipeline.delete()
+
+
+def test_delete_pipeline_fail_with_missing_pipeline_id(client):
+ pipeline = Pipeline(
+ personal_access_token="test-token",
+ )
+ with pytest.raises(ValueError):
+ pipeline.delete()
+
+
+def test_get_source_from_pipeline_ok(
+ client,
+ fetch_pipeline_response,
+ get_pipeline_request_mock,
+ get_access_token_request_mock,
+ get_pipeline_function_source_request_mock,
+ access_tokens_response,
+):
+ p = client.get_pipeline(fetch_pipeline_response["id"])
+ source = p.get_source()
+ source2 = p.get_source(pipeline_access_token_name="token2")
+
+ assert source.pipeline_id == p.id
+ assert (
+ source.pipeline_access_token
+ == access_tokens_response["access_tokens"][0]["token"]
+ )
+
+ assert source2.pipeline_id == p.id
+ assert (
+ source2.pipeline_access_token
+ == access_tokens_response["access_tokens"][1]["token"]
+ )
+
+
+def test_get_source_from_pipeline_fail_with_missing_id(client):
+ pipeline = Pipeline(personal_access_token="test-token")
+ with pytest.raises(ValueError) as e:
+ pipeline.get_source()
+
+ assert e.value.__str__() == "Pipeline id must be provided in the constructor"
+
+
+def test_get_sink_from_pipeline_ok(
+ client,
+ fetch_pipeline_response,
+ get_pipeline_request_mock,
+ get_access_token_request_mock,
+ get_pipeline_function_source_request_mock,
+ access_tokens_response,
+):
+ p = client.get_pipeline(fetch_pipeline_response["id"])
+ sink = p.get_sink()
+ sink2 = p.get_sink(pipeline_access_token_name="token2")
+
+ assert sink.pipeline_id == p.id
+ assert (
+ sink.pipeline_access_token
+ == access_tokens_response["access_tokens"][0]["token"]
+ )
+
+ assert sink2.pipeline_id == p.id
+ assert (
+ sink2.pipeline_access_token
+ == access_tokens_response["access_tokens"][1]["token"]
+ )
+
+
+def test_get_logs_from_pipeline_ok(client, requests_mock, get_logs_response):
+ pipeline_id = "test-pipeline-id"
+ requests_mock.get(
+ client.glassflow_config.server_url
+ + f"/pipelines/{pipeline_id}/functions/main/logs",
+ json=get_logs_response,
+ status_code=200,
+ headers={"Content-Type": "application/json"},
+ )
+ pipeline = Pipeline(id=pipeline_id, personal_access_token="test-token")
+ logs = pipeline.get_logs()
+
+ assert logs.status_code == 200
+ assert logs.content_type == "application/json"
+ assert logs.next == get_logs_response["next"]
+ for idx, log in enumerate(logs.logs):
+ assert log.level == get_logs_response["logs"][idx]["level"]
+ assert log.severity_code == get_logs_response["logs"][idx]["severity_code"]
+ assert (
+ log.payload.message == get_logs_response["logs"][idx]["payload"]["message"]
+ )
diff --git a/tests/glassflow/unit_tests/space_test.py b/tests/glassflow/unit_tests/space_test.py
new file mode 100644
index 0000000..db76510
--- /dev/null
+++ b/tests/glassflow/unit_tests/space_test.py
@@ -0,0 +1,46 @@
+import pytest
+
+from glassflow import Space
+
+
+def test_create_space_ok(requests_mock, create_space_response, client):
+ requests_mock.post(
+ client.glassflow_config.server_url + "/spaces",
+ json=create_space_response,
+ status_code=200,
+ headers={"Content-Type": "application/json"},
+ )
+ space = Space(
+ name=create_space_response["name"], personal_access_token="test-token"
+ ).create()
+
+ assert space.name == create_space_response["name"]
+ assert space.id == create_space_response["id"]
+ assert space.created_at == create_space_response["created_at"]
+
+
+def test_create_space_fail_with_missing_name(client):
+ with pytest.raises(ValueError) as e:
+ Space(personal_access_token="test-token").create()
+
+ assert str(e.value) == ("Name must be provided in order to create the space")
+
+
+def test_delete_space_ok(requests_mock, client):
+ requests_mock.delete(
+ client.glassflow_config.server_url + "/spaces/test-space-id",
+ status_code=204,
+ headers={"Content-Type": "application/json"},
+ )
+ space = Space(
+ id="test-space-id",
+ personal_access_token="test-token",
+ )
+ space.delete()
+
+
+def test_delete_space_fail_with_missing_id(client):
+ with pytest.raises(ValueError) as e:
+ Space(personal_access_token="test-token").delete()
+
+ assert str(e.value) == "Space id must be provided in the constructor"