diff --git a/.github/workflows/on_pr.yaml b/.github/workflows/on_pr.yaml index dbc6786..5584701 100644 --- a/.github/workflows/on_pr.yaml +++ b/.github/workflows/on_pr.yaml @@ -4,6 +4,12 @@ on: pull_request: branches: - main + - release/* + +permissions: + contents: write + checks: write + pull-requests: write jobs: tests: @@ -24,11 +30,16 @@ jobs: - name: Install dependencies run: pip install -e .[dev] - - name: Run Integration Tests - run: pytest tests/glassflow/integration_tests/ + - name: Run Tests + run: pytest env: - PIPELINE_ID: ${{ secrets.PIPELINE_ID }} - PIPELINE_ACCESS_TOKEN: ${{ secrets.PIPELINE_ACCESS_TOKEN }} + PERSONAL_ACCESS_TOKEN: ${{ secrets.INTEGRATION_PERSONAL_ACCESS_TOKEN }} + + - name: Upload coverage report + uses: actions/upload-artifact@v4 + with: + name: coverageReport + path: tests/reports/ checks: name: Run code checks @@ -48,8 +59,46 @@ jobs: - name: Install dependencies run: pip install -e .[dev] - - name: Run isort - run: isort . - - - name: Run linter and code formatter + - name: Run ruff linter checks run: ruff check . + + - name: Run ruff formatting checks + run: ruff format --check . + + coverage: + runs-on: ubuntu-latest + needs: [tests] + steps: + - uses: actions/checkout@v3 + with: + persist-credentials: false # otherwise, the token used is the GITHUB_TOKEN, instead of your personal token + fetch-depth: 0 # otherwise, you will fail to push refs to dest repo + + - name: Download coverage report + uses: actions/download-artifact@v4 + with: + name: coverageReport + + - name: Pytest coverage comment + id: coverageComment + uses: MishaKav/pytest-coverage-comment@main + with: + pytest-xml-coverage-path: ./coverage.xml + junitxml-path: ./pytest.xml + + - name: Update Readme with Coverage Html + if: ${{ github.ref == 'refs/heads/main' }} + run: | + sed -i '//,//c\\n\${{ steps.coverageComment.outputs.coverageHtml }}\n' ./README.md + + - name: Commit & Push changes to Readme + id: commit + if: ${{ github.ref == 'refs/heads/main' }} + run: | + git config --global user.name 'GitHub Actions Workflow glassflow-python-sdk/.github/workflows/on_pr.yaml' + git config --global user.email 'glassflow-actions-workflow@users.noreply.github.com' + + git add README.md + git commit -m "Update coverage on Readme" + + git push origin master diff --git a/.gitignore b/.gitignore index af095a4..249da53 100644 --- a/.gitignore +++ b/.gitignore @@ -10,4 +10,6 @@ site** .pypirc dist/ build -.env \ No newline at end of file +.env +tests/reports +.coverage \ No newline at end of file diff --git a/README.md b/README.md index ea291a3..f5ee09c 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,8 @@ Chat on Slack Ruff + + # GlassFlow Python SDK @@ -22,11 +24,12 @@ You can install the GlassFlow Python SDK using pip: pip install glassflow ``` -## Available Operations +## Data Operations * [publish](#publish) - Publish a new event into the pipeline * [consume](#consume) - Consume the transformed event from the pipeline * [consume failed](#consume-failed) - Consume the events that failed from the pipeline +* [validate credentials](#validate-credentials) - Validate pipeline credentials ## publish @@ -36,12 +39,11 @@ Publish a new event into the pipeline ### Example Usage ```python -import glassflow +from glassflow import PipelineDataSource -client = glassflow.GlassFlowClient() -pipeline_client = client.pipeline_client(pipeline_id="") +``` + +Now you can perform CRUD operations on your pipelines: + +* [list_pipelines](#list_pipelines) - Returns the list of pipelines available +* [get_pipeline](#get_pipeline) - Returns a pipeline object from a given pipeline ID +* [create](#create) - Create a new pipeline +* [delete](#delete) - Delete an existing pipeline + +## list_pipelines + +Returns information about the available pipelines. It can be restricted to a +specific space by passing the `space_id`. + +### Example Usage + +```python +from glassflow import GlassFlowClient + +client = GlassFlowClient(personal_access_token="") +res = client.list_pipelines() +``` + +## get_pipeline + +Gets information about a pipeline from a given pipeline ID. It returns a Pipeline object +which can be used manage the Pipeline. + +### Example Usage + +```python +from glassflow import GlassFlowClient + +client = GlassFlowClient(personal_access_token="") +pipeline = client.get_pipeline(pipeline_id="") + +print("Name:", pipeline.name) ``` +## create + +The Pipeline object has a create method that creates a new GlassFlow pipeline. + +### Example Usage + +```python +from glassflow import Pipeline + +pipeline = Pipeline( + name="", + transformation_file="path/to/transformation.py", + space_id="", + personal_access_token="" +).create() +``` + +In the next example we create a pipeline with Google PubSub source +and a webhook sink: + +```python +from glassflow import Pipeline + +pipeline = Pipeline( + name="", + transformation_file="path/to/transformation.py", + space_id="", + personal_access_token="", + source_kind="google_pubsub", + source_config={ + "project_id": "", + "subscription_id": "", + "credentials_json": "" + }, + sink_kind="webhook", + sink_config={ + "url": "", + "method": "", + "headers": [{"header1": "header1_value"}] + } +).create() +``` + +## delete + +The Pipeline object has a delete method to delete a pipeline + +### Example Usage + +```python +from glassflow import Pipeline + +pipeline = Pipeline( + name="", + transformation_file="path/to/transformation.py", + space_id="", + personal_access_token="" +).create() + +pipeline.delete() +``` ## Quickstart diff --git a/USAGE.md b/USAGE.md index 5c58996..75634e0 100644 --- a/USAGE.md +++ b/USAGE.md @@ -1,11 +1,13 @@ ```python import glassflow -client = glassflow.GlassFlowClient() -pipeline = client.pipeline_client(pipeline_id="", pipeline_access_token="") +source = glassflow.PipelineDataSource(pipeline_id="", pipeline_access_token="") data = { "name": "Hello World", "id": 1 } -res = pipeline.publish(request_body=data) +source_res = source.publish(request_body=data) + +sink = glassflow.PipelineDataSink(pipeline_id="", pipeline_access_token="") +sink_res = sink.consume() ``` \ No newline at end of file diff --git a/docs/index.md b/docs/index.md index f7a7894..1c8d43c 100644 --- a/docs/index.md +++ b/docs/index.md @@ -1,6 +1,6 @@ # Welcome to GlassFlow Python SDK Docs -The [GlassFlow](https://www.glassflow.dev/) Python SDK provides a convenient way to interact with the GlassFlow API in your Python applications. The SDK is used to publish and consume events to your [GlassFlow pipelines](https://learn.glassflow.dev/docs/concepts/pipeline-configuration). +The [GlassFlow](https://www.glassflow.dev/) Python SDK provides a convenient way to interact with the GlassFlow API in your Python applications. The SDK is used to publish and consume events to your [GlassFlow pipelines](https://docs.glassflow.dev/concepts/pipeline). ## Installation @@ -11,101 +11,12 @@ You can install the GlassFlow Python SDK using pip: pip install glassflow ``` -## Available Operations +## Content -* [publish](#publish) - Publish a new event into the pipeline -* [consume](#consume) - Consume the transformed event from the pipeline -* [consume failed](#consume-failed) - Consume the events that failed from the pipeline -* [is access token valid](#is-access-token-valid) - Validates Pipeline Access Token -* [is_valid](#is-valid) - Check if pipeline credentials are valid +* [Publish and Consume Events](publish_and_consume.md) - Learn how to publish and consume events to/from a pipeline. +* [Pipeline Management](pipeline_management.md) - Learn how to manage your pipelines from the SDK. -## publish - -Publish a new event into the pipeline - -### Example Usage - -```python -import glassflow - -client = glassflow.GlassFlowClient() -pipeline_client = client.pipeline_client(pipeline_id="") +``` + +Here is a list of operations you can do with the `GlassFlowClient`: + +* [List Pipelines](#list-pipelines) - Returns a list with all your pipelines +* [Get Pipeline](#get-pipeline) - Returns a pipeline object from a given pipeline ID +* [Create Pipeline](#create-pipeline) - Create a new pipeline +* [List Spaces](#list-spaces) - Returns a list with all your spaces +* [Create Space](#create-space) - Create a new space + +You can also interact directly with the `Pipeline` or `Space` objects. They +allow for some extra functionalities like delete or update. + +* [Update Pipeline](#update-pipeline) - Update an existing pipeline +* [Delete Pipeline](#delete-pipeline) - Delete an existing pipeline +* [Delete Space](#delete-space) - Delete an existing pipeline + +## List Pipelines + +Returns information about the available pipelines. It can be restricted to a +specific space by passing the `space_id`. + +### Example Usage + +```python +from glassflow import GlassFlowClient + +client = GlassFlowClient(personal_access_token="") +res = client.list_pipelines() +``` + +## Get Pipeline + +Gets information about a pipeline from a given pipeline ID. It returns a Pipeline object +which can be used manage the Pipeline. + +### Example Usage + +```python +from glassflow import GlassFlowClient + +client = GlassFlowClient(personal_access_token="") +pipeline = client.get_pipeline(pipeline_id="") + +print("Name:", pipeline.name) +``` + +## Create Pipeline + +Creates a new pipeline and returns a `Pipeline` object. + +### Example Usage + +```python +from glassflow import GlassFlowClient + +client = GlassFlowClient(personal_access_token="") +pipeline = client.create_pipeline( + name="MyFirstPipeline", + space_id="", + transformation_file="path/to/transformation.py" +) + +print("Pipeline ID:", pipeline.id) +``` + +In the next example we create a pipeline with Google PubSub source +and a webhook sink: + +```python +from glassflow import GlassFlowClient + +client = GlassFlowClient(personal_access_token="") + +pipeline = client.create_pipeline( + name="MyFirstPipeline", + space_id="", + transformation_file="path/to/transformation.py", + source_kind="google_pubsub", + source_config={ + "project_id": "", + "subscription_id": "", + "credentials_json": "" + }, + sink_kind="webhook", + sink_config={ + "url": "www.my-webhook-url.com", + "method": "POST", + "headers": [{"header1": "header1_value"}] + } +) +``` + +## Update Pipeline + +The Pipeline object has an update method. + +### Example Usage + +```python +from glassflow import Pipeline + +pipeline = Pipeline( + id="", + personal_access_token="", +) + +pipeline.update( + transformation_file="path/to/new/transformation.py", + name="NewPipelineName", +) +``` + +## Delete Pipeline + +The Pipeline object has a delete method to delete a pipeline + +### Example Usage + +```python +from glassflow import Pipeline + +pipeline = Pipeline( + id="", + personal_access_token="" +) +pipeline.delete() +``` + +## List Spaces + +Returns information about the available spaces. + +### Example Usage + +```python +from glassflow import GlassFlowClient + +client = GlassFlowClient(personal_access_token="") +res = client.list_spaces() +``` + + +## Create Space + +Creates a new space and returns a `Space` object. + +```python +from glassflow import GlassFlowClient + +client = GlassFlowClient(personal_access_token="") +space = client.create_space(name="MyFirstSpace") +``` + + +## Delete Space + +The Space object has a delete method to delete a space + +### Example Usage + +```python +from glassflow import Space + +space = Space( + id="", + personal_access_token="" +) +space.delete() +``` \ No newline at end of file diff --git a/docs/publish_and_consume.md b/docs/publish_and_consume.md new file mode 100644 index 0000000..223b7f4 --- /dev/null +++ b/docs/publish_and_consume.md @@ -0,0 +1,80 @@ +# Publish and Consume Events + + +* [Publish](#publish) - Publish a new event into the pipeline from a data source +* [Consume](#consume) - Consume the transformed event from the pipeline in a data sink +* [Consume Failed](#consume-failed) - Consume the events that failed from the pipeline in a +* [Validate Credentials](#validate-credentials) - Validate pipeline credentials + + +## Publish + +Publish a new event into the pipeline + +### Example Usage + +```python +from glassflow import PipelineDataSource + +source = PipelineDataSource(pipeline_id="=61.0"] -build-backend = "setuptools.build_meta" \ No newline at end of file +build-backend = "setuptools.build_meta" + +[tool.pytest.ini_options] +addopts = """ + --import-mode=importlib \ + --junitxml=tests/reports/pytest.xml \ + --cov=src/glassflow \ + --cov-report xml:tests/reports/coverage.xml \ + -ra -q +""" +testpaths = [ + "tests", +] + +[tool.coverage.html] +skip_covered = true + +[tool.ruff.lint] +select = [ + # pycodestyle + "E", + # Pyflakes + "F", + # pyupgrade + "UP", + # flake8-bugbear + "B", + # flake8-simplify + "SIM", + # isort + "I", +] + +[tool.ruff.lint.pydocstyle] +convention = "google" + +[tool.datamodel-codegen] +field-constraints = true +snake-case-field = true +strip-default-none = false +target-python-version = "3.7" +use-title-as-name = true +disable-timestamp = true +enable-version-header = true +use-double-quotes = true +use-subclass-enum=true +input-file-type = "openapi" +output-model-type = "dataclasses.dataclass" \ No newline at end of file diff --git a/setup.py b/setup.py index e42e251..06b4a95 100644 --- a/setup.py +++ b/setup.py @@ -1,14 +1,14 @@ import setuptools try: - with open("README.md", "r") as fh: + with open("README.md") as fh: long_description = fh.read() except FileNotFoundError: long_description = "" setuptools.setup( name="glassflow", - version="1.0.11", + version="2.0.2rc7", author="glassflow", description="GlassFlow Python Client SDK", url="https://learn.glassflow.dev/docs", @@ -36,7 +36,15 @@ "python-dotenv==1.0.1", ], extras_require={ - "dev": ["pylint==2.16.2", "pytest==8.3.2", "isort==5.13.2", "ruff==0.6.3"] + "dev": [ + "pylint==2.16.2", + "pytest==8.3.2", + "pytest-cov==5.0.0", + "datamodel-code-generator[http]==0.26.0", + "requests-mock==1.12.1", + "isort==5.13.2", + "ruff==0.6.3", + ] }, package_dir={"": "src"}, python_requires=">=3.8", diff --git a/src/glassflow/__init__.py b/src/glassflow/__init__.py index 17ec359..e3b7ab2 100644 --- a/src/glassflow/__init__.py +++ b/src/glassflow/__init__.py @@ -1,2 +1,7 @@ from .client import GlassFlowClient as GlassFlowClient from .config import GlassFlowConfig as GlassFlowConfig +from .models import errors as errors +from .pipeline import Pipeline as Pipeline +from .pipeline_data import PipelineDataSink as PipelineDataSink +from .pipeline_data import PipelineDataSource as PipelineDataSource +from .space import Space as Space diff --git a/src/glassflow/api_client.py b/src/glassflow/api_client.py new file mode 100644 index 0000000..369362a --- /dev/null +++ b/src/glassflow/api_client.py @@ -0,0 +1,94 @@ +from __future__ import annotations + +import sys + +import requests as requests_http + +from .config import GlassFlowConfig +from .models import errors +from .models.operations.base import BaseRequest, BaseResponse +from .utils import utils as utils + + +class APIClient: + glassflow_config = GlassFlowConfig() + + def __init__(self): + super().__init__() + self.client = requests_http.Session() + + def _get_headers( + self, request: BaseRequest, req_content_type: str | None = None + ) -> dict: + headers = utils.get_req_specific_headers(request) + headers["Accept"] = "application/json" + headers["Gf-Client"] = self.glassflow_config.glassflow_client + headers["User-Agent"] = self.glassflow_config.user_agent + headers["Gf-Python-Version"] = ( + f"{sys.version_info.major}." + f"{sys.version_info.minor}." + f"{sys.version_info.micro}" + ) + + if req_content_type and req_content_type not in ( + "multipart/form-data", + "multipart/mixed", + ): + headers["content-type"] = req_content_type + + return headers + + def _request( + self, + method: str, + endpoint: str, + request: BaseRequest, + serialization_method: str = "json", + ) -> BaseResponse: + request_type = type(request) + + url = utils.generate_url( + request_type, + self.glassflow_config.server_url, + endpoint, + request, + ) + + req_content_type, data, form = utils.serialize_request_body( + request=request, + request_type=request_type, + request_field_name="request_body", + nullable=False, + optional=True, + serialization_method=serialization_method, + ) + if method == "GET": + data = None + + headers = self._get_headers(request, req_content_type) + query_params = utils.get_query_params(request_type, request) + + # make the request + http_res = self.client.request( + method, url=url, params=query_params, headers=headers, data=data, files=form + ) + content_type = http_res.headers.get("Content-Type") + + res = BaseResponse( + status_code=http_res.status_code, + content_type=content_type, + raw_response=http_res, + ) + + if http_res.status_code in [400, 500]: + out = utils.unmarshal_json(http_res.text, errors.Error) + out.raw_response = http_res + raise out + elif http_res.status_code == 429: + pass + elif 400 < http_res.status_code < 600: + raise errors.ClientError( + "API error occurred", http_res.status_code, http_res.text, http_res + ) + + return res diff --git a/src/glassflow/client.py b/src/glassflow/client.py index 632861f..06c0fb1 100644 --- a/src/glassflow/client.py +++ b/src/glassflow/client.py @@ -1,68 +1,202 @@ """GlassFlow Python Client to interact with GlassFlow API""" -import os -from typing import Optional +from __future__ import annotations -import requests as requests_http +from .api_client import APIClient +from .models import errors, operations +from .models.api import PipelineState +from .pipeline import Pipeline +from .space import Space -from .config import GlassFlowConfig -from .pipelines import PipelineClient - -class GlassFlowClient: - """GlassFlow Client to interact with GlassFlow API and manage pipelines and other resources +class GlassFlowClient(APIClient): + """ + GlassFlow Client to interact with GlassFlow API and manage pipelines + and other resources Attributes: - rclient: requests.Session object to make HTTP requests to GlassFlow API + client: requests.Session object to make HTTP requests to GlassFlow API glassflow_config: GlassFlowConfig object to store configuration - organization_id: Organization ID of the user. If not provided, the default organization will be used + organization_id: Organization ID of the user. If not provided, + the default organization will be used """ - glassflow_config: GlassFlowConfig - - def __init__(self, organization_id: str = None) -> None: + def __init__( + self, personal_access_token: str = None, organization_id: str = None + ) -> None: """Create a new GlassFlowClient object Args: - organization_id: Organization ID of the user. If not provided, the default organization will be used + personal_access_token: GlassFlow Personal Access Token + organization_id: Organization ID of the user. If not provided, + the default organization will be used """ - rclient = requests_http.Session() - self.glassflow_config = GlassFlowConfig(rclient) + super().__init__() + self.personal_access_token = personal_access_token self.organization_id = organization_id - def pipeline_client( + def get_pipeline(self, pipeline_id: str) -> Pipeline: + """Gets a Pipeline object from the GlassFlow API + + Args: + pipeline_id: UUID of the pipeline + + Returns: + Pipeline: Pipeline object from the GlassFlow API + + Raises: + PipelineNotFoundError: Pipeline does not exist + UnauthorizedError: User does not have permission to perform the + requested operation + ClientError: GlassFlow Client Error + """ + return Pipeline( + personal_access_token=self.personal_access_token, id=pipeline_id + ).fetch() + + def create_pipeline( self, - pipeline_id: Optional[str] = None, - pipeline_access_token: Optional[str] = None, - space_id: Optional[str] = None, - ) -> PipelineClient: - """Create a new PipelineClient object to interact with a specific pipeline + name: str, + space_id: str, + transformation_file: str = None, + requirements: str = None, + source_kind: str = None, + source_config: dict = None, + sink_kind: str = None, + sink_config: dict = None, + env_vars: list[dict[str, str]] = None, + state: PipelineState = "running", + metadata: dict = None, + ) -> Pipeline: + """Creates a new GlassFlow pipeline + + Args: + name: Name of the pipeline + space_id: ID of the GlassFlow Space you want to create the pipeline in + transformation_file: Path to file with transformation function of + the pipeline. + requirements: Requirements.txt of the pipeline + source_kind: Kind of source for the pipeline. If no source is + provided, the default source will be SDK + source_config: Configuration of the pipeline's source + sink_kind: Kind of sink for the pipeline. If no sink is provided, + the default sink will be SDK + sink_config: Configuration of the pipeline's sink + env_vars: Environment variables to pass to the pipeline + state: State of the pipeline after creation. + It can be either "running" or "paused" + metadata: Metadata of the pipeline + + Returns: + Pipeline: New pipeline + + Raises: + UnauthorizedError: User does not have permission to perform + the requested operation + """ + return Pipeline( + name=name, + space_id=space_id, + transformation_file=transformation_file, + requirements=requirements, + source_kind=source_kind, + source_config=source_config, + sink_kind=sink_kind, + sink_config=sink_config, + env_vars=env_vars, + state=state, + metadata=metadata, + organization_id=self.organization_id, + personal_access_token=self.personal_access_token, + ).create() + + def list_pipelines( + self, space_ids: list[str] | None = None + ) -> operations.ListPipelinesResponse: + """ + Lists all pipelines in the GlassFlow API Args: - pipeline_id: The pipeline id to interact with - pipeline_access_token: The access token to access the pipeline + space_ids: List of Space IDs of the pipelines to list. + If not specified, all the pipelines will be listed. Returns: - PipelineClient: Client object to publish and consume events from the given pipeline. + ListPipelinesResponse: Response object with the pipelines listed + + Raises: + UnauthorizedError: User does not have permission to perform the """ - # if no pipeline_id or pipeline_access_token is provided, try to read from environment variables - if not pipeline_id: - pipeline_id = os.getenv("PIPELINE_ID") - if not pipeline_access_token: - pipeline_access_token = os.getenv("PIPELINE_ACCESS_TOKEN") - # no pipeline_id provided explicitly or in environment variables - if not pipeline_id: - raise ValueError( - "PIPELINE_ID must be set as an environment variable or provided explicitly" + request = operations.ListPipelinesRequest( + space_id=space_ids, + organization_id=self.organization_id, + personal_access_token=self.personal_access_token, + ) + try: + res = self._request( + method="GET", + endpoint="/pipelines", + request=request, ) - if not pipeline_access_token: - raise ValueError( - "PIPELINE_ACCESS_TOKEN must be set as an environment variable or provided explicitly" + res_json = res.raw_response.json() + except errors.ClientError as e: + if e.status_code == 401: + raise errors.UnauthorizedError(e.raw_response) from e + else: + raise e + + return operations.ListPipelinesResponse( + content_type=res.content_type, + status_code=res.status_code, + raw_response=res.raw_response, + total_amount=res_json["total_amount"], + pipelines=res_json["pipelines"], + ) + + def list_spaces(self) -> operations.ListSpacesResponse: + request = operations.ListSpacesRequest( + organization_id=self.organization_id, + personal_access_token=self.personal_access_token, + ) + try: + res = self._request( + method="GET", + endpoint="/spaces", + request=request, ) + res_json = res.raw_response.json() + except errors.ClientError as e: + if e.status_code == 401: + raise errors.UnauthorizedError(e.raw_response) from e + else: + raise e - return PipelineClient( - glassflow_client=self, - pipeline_id=pipeline_id, - pipeline_access_token=pipeline_access_token, + return operations.ListSpacesResponse( + content_type=res.content_type, + status_code=res.status_code, + raw_response=res.raw_response, + total_amount=res_json["total_amount"], + spaces=res_json["spaces"], ) + + def create_space( + self, + name: str, + ) -> Space: + """Creates a new Space + + Args: + name: Name of the Space + + Returns: + Space: New space + + Raises: + UnauthorizedError: User does not have permission to perform + the requested operation + """ + return Space( + name=name, + personal_access_token=self.personal_access_token, + organization_id=self.organization_id, + ).create() diff --git a/src/glassflow/config.py b/src/glassflow/config.py index d67bd14..206a2e8 100644 --- a/src/glassflow/config.py +++ b/src/glassflow/config.py @@ -1,23 +1,19 @@ from dataclasses import dataclass from importlib.metadata import version -import requests - @dataclass class GlassFlowConfig: """Configuration object for GlassFlowClient Attributes: - client: requests.Session object to interact with the GlassFlow API server_url: The base URL of the GlassFlow API sdk_version: The version of the GlassFlow Python SDK user_agent: The user agent to be used in the requests """ - client: requests.Session server_url: str = "https://api.glassflow.dev/v1" sdk_version: str = version("glassflow") - user_agent: str = "glassflow-python-sdk/{}".format(sdk_version) - glassflow_client: str = "python-sdk/{}".format(sdk_version) + user_agent: str = f"glassflow-python-sdk/{sdk_version}" + glassflow_client: str = f"python-sdk/{sdk_version}" diff --git a/src/glassflow/models/api/__init__.py b/src/glassflow/models/api/__init__.py new file mode 100644 index 0000000..f84fabc --- /dev/null +++ b/src/glassflow/models/api/__init__.py @@ -0,0 +1,31 @@ +from .api import ( + CreatePipeline, + CreateSpace, + FunctionEnvironments, + FunctionLogEntry, + FunctionLogs, + GetDetailedSpacePipeline, + PipelineState, + SeverityCodeInput, + SinkConnector, + SourceConnector, + SpacePipeline, + SpaceScope, + UpdatePipeline, +) + +__all__ = [ + "CreatePipeline", + "FunctionEnvironments", + "FunctionLogEntry", + "FunctionLogs", + "GetDetailedSpacePipeline", + "PipelineState", + "SeverityCodeInput", + "SinkConnector", + "SourceConnector", + "SpacePipeline", + "UpdatePipeline", + "SpaceScope", + "CreateSpace", +] diff --git a/src/glassflow/models/api/api.py b/src/glassflow/models/api/api.py new file mode 100644 index 0000000..e88dc0a --- /dev/null +++ b/src/glassflow/models/api/api.py @@ -0,0 +1,377 @@ +# ruff: noqa +# generated by datamodel-codegen: +# filename: https://api.glassflow.dev/v1/openapi.yaml +# version: 0.26.0 + +from __future__ import annotations +from dataclasses_json import dataclass_json + +from dataclasses import dataclass +from enum import Enum +from typing import Any, Dict, List, Optional, Union + + +@dataclass_json +@dataclass +class Error: + detail: str + + +@dataclass_json +@dataclass +class CreateOrganization: + name: str + + +@dataclass_json +@dataclass +class Organization(CreateOrganization): + id: str + + +@dataclass_json +@dataclass +class OrganizationScope(Organization): + role: str + + +OrganizationScopes = List[OrganizationScope] + + +@dataclass_json +@dataclass +class SignUp: + access_token: str + id_token: str + + +@dataclass_json +@dataclass +class BasePipeline: + name: str + space_id: str + metadata: Dict[str, Any] + + +class PipelineState(str, Enum): + running = "running" + paused = "paused" + + +@dataclass_json +@dataclass +class FunctionEnvironment: + name: str + value: str + + +FunctionEnvironments = Optional[List[FunctionEnvironment]] + + +class Kind(str, Enum): + google_pubsub = "google_pubsub" + + +@dataclass_json +@dataclass +class Config: + project_id: str + subscription_id: str + credentials_json: str + + +@dataclass_json +@dataclass +class SourceConnector1: + kind: Kind + config: Config + + +class Kind1(str, Enum): + amazon_sqs = "amazon_sqs" + + +@dataclass_json +@dataclass +class Config1: + queue_url: str + aws_region: str + aws_access_key: str + aws_secret_key: str + + +@dataclass_json +@dataclass +class SourceConnector2: + kind: Kind1 + config: Config1 + + +SourceConnector = Optional[Union[SourceConnector1, SourceConnector2]] + + +class Kind2(str, Enum): + webhook = "webhook" + + +class Method(str, Enum): + get = "GET" + post = "POST" + put = "PUT" + patch = "PATCH" + delete = "DELETE" + + +@dataclass_json +@dataclass +class Header: + name: str + value: str + + +@dataclass_json +@dataclass +class Config2: + url: str + method: Method + headers: List[Header] + + +@dataclass_json +@dataclass +class SinkConnector1: + kind: Kind2 + config: Config2 + + +class Kind3(str, Enum): + clickhouse = "clickhouse" + + +@dataclass_json +@dataclass +class Config3: + addr: str + database: str + username: str + password: str + table: str + + +@dataclass_json +@dataclass +class SinkConnector2: + kind: Kind3 + config: Config3 + + +SinkConnector = Optional[Union[SinkConnector1, SinkConnector2]] + + +@dataclass_json +@dataclass +class Pipeline(BasePipeline): + id: str + created_at: str + state: PipelineState + + +@dataclass_json +@dataclass +class SpacePipeline(Pipeline): + space_name: str + + +@dataclass_json +@dataclass +class GetDetailedSpacePipeline(SpacePipeline): + source_connector: SourceConnector + sink_connector: SinkConnector + environments: FunctionEnvironments + + +@dataclass_json +@dataclass +class PipelineFunctionOutput: + environments: FunctionEnvironments + + +SpacePipelines = List[SpacePipeline] + + +@dataclass_json +@dataclass +class CreateSpace: + name: str + + +@dataclass_json +@dataclass +class UpdateSpace: + name: str + + +@dataclass_json +@dataclass +class Space(CreateSpace): + id: str + created_at: str + + +@dataclass_json +@dataclass +class SpaceScope(Space): + permission: str + + +SpaceScopes = List[SpaceScope] + + +@dataclass_json +@dataclass +class Payload: + message: str + + +class SeverityCodeInput(int, Enum): + integer_100 = 100 + integer_200 = 200 + integer_400 = 400 + integer_500 = 500 + + +SeverityCode = int + + +@dataclass_json +@dataclass +class CreateAccessToken: + name: str + + +@dataclass_json +@dataclass +class AccessToken(CreateAccessToken): + id: str + token: str + created_at: str + + +AccessTokens = List[AccessToken] + + +@dataclass_json +@dataclass +class PaginationResponse: + total_amount: int + + +@dataclass_json +@dataclass +class SourceFile: + name: str + content: str + + +SourceFiles = List[SourceFile] + + +@dataclass_json +@dataclass +class EventContext: + request_id: str + receive_time: str + external_id: Optional[str] = None + + +PersonalAccessToken = str + + +@dataclass_json +@dataclass +class Profile: + id: str + home_organization: Organization + name: str + email: str + provider: str + external_settings: Dict[str, Any] + subscriber_id: str + + +@dataclass_json +@dataclass +class ListOrganizationScopes(PaginationResponse): + organizations: OrganizationScopes + + +@dataclass_json +@dataclass +class UpdatePipeline: + name: str + transformation_function: Optional[str] = None + transformation_requirements: Optional[List[str]] = None + requirements_txt: Optional[str] = None + metadata: Optional[Dict[str, Any]] = None + source_connector: Optional[SourceConnector] = None + sink_connector: Optional[SinkConnector] = None + environments: Optional[FunctionEnvironments] = None + + +@dataclass_json +@dataclass +class CreatePipeline(BasePipeline): + transformation_function: Optional[str] = None + transformation_requirements: Optional[List[str]] = None + requirements_txt: Optional[str] = None + source_connector: Optional[SourceConnector] = None + sink_connector: Optional[SinkConnector] = None + environments: Optional[FunctionEnvironments] = None + state: Optional[PipelineState] = None + + +@dataclass_json +@dataclass +class ListPipelines(PaginationResponse): + pipelines: SpacePipelines + + +@dataclass_json +@dataclass +class ListSpaceScopes(PaginationResponse): + spaces: SpaceScopes + + +@dataclass_json +@dataclass +class FunctionLogEntry: + level: str + severity_code: SeverityCode + timestamp: str + payload: Payload + + +@dataclass_json +@dataclass +class ListAccessTokens(PaginationResponse): + access_tokens: AccessTokens + + +@dataclass_json +@dataclass +class ConsumeEvent: + payload: Dict[str, Any] + event_context: EventContext + status: str + response: Dict[str, Any] + req_id: Optional[str] = None + receive_time: Optional[str] = None + + +@dataclass_json +@dataclass +class ListPersonalAccessTokens: + tokens: List[PersonalAccessToken] + + +FunctionLogs = List[FunctionLogEntry] diff --git a/src/glassflow/models/errors/__init__.py b/src/glassflow/models/errors/__init__.py index a2e8aa4..6a8efd5 100644 --- a/src/glassflow/models/errors/__init__.py +++ b/src/glassflow/models/errors/__init__.py @@ -1,4 +1,21 @@ -from .clienterror import ClientError +from .clienterror import ( + ClientError, + PipelineAccessTokenInvalidError, + PipelineNotFoundError, + SpaceIsNotEmptyError, + SpaceNotFoundError, + UnauthorizedError, + UnknownContentTypeError, +) from .error import Error -__all__ = ["Error", "ClientError"] +__all__ = [ + "Error", + "ClientError", + "PipelineNotFoundError", + "PipelineAccessTokenInvalidError", + "SpaceNotFoundError", + "UnknownContentTypeError", + "UnauthorizedError", + "SpaceIsNotEmptyError", +] diff --git a/src/glassflow/models/errors/clienterror.py b/src/glassflow/models/errors/clienterror.py index d8117bd..f5ea477 100644 --- a/src/glassflow/models/errors/clienterror.py +++ b/src/glassflow/models/errors/clienterror.py @@ -49,3 +49,76 @@ def __str__(self): body = f"\n{self.body}" return f"{self.detail}: Status {self.status_code}{body}" + + +class PipelineNotFoundError(ClientError): + """Error caused by a pipeline ID not found.""" + + def __init__(self, pipeline_id: str, raw_response: requests_http.Response): + super().__init__( + detail=f"Pipeline ID {pipeline_id} does not exist", + status_code=404, + body=raw_response.text, + raw_response=raw_response, + ) + + +class SpaceNotFoundError(ClientError): + """Error caused by a pipeline ID not found.""" + + def __init__(self, space_id: str, raw_response: requests_http.Response): + super().__init__( + detail=f"Space ID {space_id} does not exist", + status_code=404, + body=raw_response.text, + raw_response=raw_response, + ) + + +class UnauthorizedError(ClientError): + """Error caused by a user not authorized.""" + + def __init__(self, raw_response: requests_http.Response): + super().__init__( + detail="Unauthorized request, Personal Access Token used is invalid", + status_code=401, + body=raw_response.text, + raw_response=raw_response, + ) + + +class PipelineAccessTokenInvalidError(ClientError): + """Error caused by invalid access token.""" + + def __init__(self, raw_response: requests_http.Response): + super().__init__( + detail="The Pipeline Access Token used is invalid", + status_code=401, + body=raw_response.text, + raw_response=raw_response, + ) + + +class UnknownContentTypeError(ClientError): + """Error caused by an unknown content type response.""" + + def __init__(self, raw_response: requests_http.Response): + content_type = raw_response.headers.get("Content-Type") + super().__init__( + detail=f"unknown content-type received: {content_type}", + status_code=raw_response.status_code, + body=raw_response.text, + raw_response=raw_response, + ) + + +class SpaceIsNotEmptyError(ClientError): + """Error caused by trying to delete a space that is not empty.""" + + def __init__(self, raw_response: requests_http.Response): + super().__init__( + detail=raw_response.json()["msg"], + status_code=409, + body=raw_response.text, + raw_response=raw_response, + ) diff --git a/src/glassflow/models/operations/__init__.py b/src/glassflow/models/operations/__init__.py index 5383ff1..5286126 100644 --- a/src/glassflow/models/operations/__init__.py +++ b/src/glassflow/models/operations/__init__.py @@ -1,21 +1,92 @@ -from .consumeevent import (ConsumeEventRequest, ConsumeEventResponse, - ConsumeEventResponseBody) -from .consumefailed import (ConsumeFailedRequest, ConsumeFailedResponse, - ConsumeFailedResponseBody) -from .publishevent import (PublishEventRequest, PublishEventRequestBody, - PublishEventResponse, PublishEventResponseBody) -from .status_access_token import StatusAccessTokenRequest +from .access_token import ListAccessTokensRequest, StatusAccessTokenRequest +from .artifact import ( + GetArtifactRequest, + PostArtifactRequest, +) +from .base import ( + BaseManagementRequest, + BasePipelineManagementRequest, + BaseRequest, + BaseResponse, + BaseSpaceManagementDataRequest, +) +from .consumeevent import ( + ConsumeEventRequest, + ConsumeEventResponse, + ConsumeEventResponseBody, +) +from .consumefailed import ( + ConsumeFailedRequest, + ConsumeFailedResponse, + ConsumeFailedResponseBody, +) +from .function import ( + FetchFunctionRequest, + GetFunctionLogsRequest, + GetFunctionLogsResponse, + UpdateFunctionRequest, +) +from .pipeline_crud import ( + CreatePipelineRequest, + CreatePipelineResponse, + DeletePipelineRequest, + GetPipelineRequest, + GetPipelineResponse, + ListPipelinesRequest, + ListPipelinesResponse, + UpdatePipelineRequest, + UpdatePipelineResponse, +) +from .publishevent import ( + PublishEventRequest, + PublishEventRequestBody, + PublishEventResponse, + PublishEventResponseBody, +) +from .space_crud import ( + CreateSpaceRequest, + CreateSpaceResponse, + DeleteSpaceRequest, + ListSpacesRequest, + ListSpacesResponse, +) __all__ = [ - "PublishEventRequest", - "PublishEventRequestBody", - "PublishEventResponse", - "PublishEventResponseBody", + "BaseManagementRequest", + "BasePipelineManagementRequest", + "BaseRequest", + "BaseResponse", + "BaseSpaceManagementDataRequest", "ConsumeEventRequest", "ConsumeEventResponse", "ConsumeEventResponseBody", "ConsumeFailedRequest", "ConsumeFailedResponse", "ConsumeFailedResponseBody", + "CreatePipelineRequest", + "CreatePipelineResponse", + "DeletePipelineRequest", + "DeleteSpaceRequest", + "GetPipelineRequest", + "GetPipelineResponse", + "ListPipelinesRequest", + "ListPipelinesResponse", + "ListAccessTokensRequest", + "PublishEventRequest", + "PublishEventRequestBody", + "PublishEventResponse", + "PublishEventResponseBody", + "GetArtifactRequest", + "GetFunctionLogsRequest", + "GetFunctionLogsResponse", "StatusAccessTokenRequest", + "ListSpacesResponse", + "ListSpacesRequest", + "CreateSpaceRequest", + "CreateSpaceResponse", + "UpdatePipelineRequest", + "UpdatePipelineResponse", + "UpdateFunctionRequest", + "FetchFunctionRequest", + "PostArtifactRequest", ] diff --git a/src/glassflow/models/operations/access_token.py b/src/glassflow/models/operations/access_token.py new file mode 100644 index 0000000..1832fc4 --- /dev/null +++ b/src/glassflow/models/operations/access_token.py @@ -0,0 +1,25 @@ +from __future__ import annotations + +import dataclasses + +from .base import BasePipelineDataRequest, BasePipelineManagementRequest + + +@dataclasses.dataclass +class StatusAccessTokenRequest(BasePipelineDataRequest): + """Request check the status of an access token + + Attributes: + pipeline_id: The id of the pipeline + organization_id: The id of the organization + x_pipeline_access_token: The access token of the pipeline + + """ + + pass + + +@dataclasses.dataclass +class ListAccessTokensRequest(BasePipelineManagementRequest): + page_size: int = 50 + page: int = 1 diff --git a/src/glassflow/models/operations/artifact.py b/src/glassflow/models/operations/artifact.py new file mode 100644 index 0000000..daad296 --- /dev/null +++ b/src/glassflow/models/operations/artifact.py @@ -0,0 +1,23 @@ +from __future__ import annotations + +import dataclasses + +from .base import BasePipelineManagementRequest + + +@dataclasses.dataclass +class GetArtifactRequest(BasePipelineManagementRequest): + pass + + +@dataclasses.dataclass +class PostArtifactRequest(BasePipelineManagementRequest): + file: str | None = dataclasses.field( + default=None, + metadata={ + "multipart_form": { + "field_name": "file", + } + }, + ) + requirementsTxt: str | None = dataclasses.field(default=None) diff --git a/src/glassflow/models/operations/base.py b/src/glassflow/models/operations/base.py new file mode 100644 index 0000000..da5f539 --- /dev/null +++ b/src/glassflow/models/operations/base.py @@ -0,0 +1,92 @@ +import dataclasses +from typing import Optional + +from requests import Response + + +@dataclasses.dataclass() +class BaseRequest: + pass + + +@dataclasses.dataclass() +class BaseManagementRequest(BaseRequest): + organization_id: Optional[str] = dataclasses.field( + default=None, + metadata={ + "query_param": { + "field_name": "organization_id", + "style": "form", + "explode": True, + } + }, + ) + personal_access_token: str = dataclasses.field( + default=None, + metadata={ + "header": { + "field_name": "Personal-Access-Token", + "style": "simple", + "explode": False, + } + }, + ) + + +@dataclasses.dataclass +class BasePipelineRequest(BaseRequest): + pipeline_id: str = dataclasses.field( + metadata={ + "path_param": { + "field_name": "pipeline_id", + "style": "simple", + "explode": False, + } + } + ) + organization_id: Optional[str] = dataclasses.field( + default=None, + metadata={ + "query_param": { + "field_name": "organization_id", + "style": "form", + "explode": True, + } + }, + ) + + +@dataclasses.dataclass +class BasePipelineDataRequest(BasePipelineRequest): + x_pipeline_access_token: str = dataclasses.field( + default=None, + metadata={ + "header": { + "field_name": "X-PIPELINE-ACCESS-TOKEN", + "style": "simple", + "explode": False, + } + }, + ) + + +@dataclasses.dataclass +class BaseSpaceRequest(BaseRequest): + space_id: str + + +@dataclasses.dataclass +class BasePipelineManagementRequest(BaseManagementRequest, BasePipelineRequest): + pass + + +@dataclasses.dataclass +class BaseSpaceManagementDataRequest(BaseManagementRequest, BaseSpaceRequest): + pass + + +@dataclasses.dataclass +class BaseResponse: + content_type: str = dataclasses.field() + status_code: int = dataclasses.field() + raw_response: Response = dataclasses.field() diff --git a/src/glassflow/models/operations/consumeevent.py b/src/glassflow/models/operations/consumeevent.py index 14847ab..5fe727c 100644 --- a/src/glassflow/models/operations/consumeevent.py +++ b/src/glassflow/models/operations/consumeevent.py @@ -3,14 +3,14 @@ from __future__ import annotations import dataclasses -from typing import Optional -import requests as requests_http from dataclasses_json import config, dataclass_json +from .base import BasePipelineDataRequest, BaseResponse + @dataclasses.dataclass -class ConsumeEventRequest: +class ConsumeEventRequest(BasePipelineDataRequest): """Request to consume an event from a pipeline topic Attributes: @@ -20,35 +20,7 @@ class ConsumeEventRequest: """ - pipeline_id: str = dataclasses.field( - metadata={ - "path_param": { - "field_name": "pipeline_id", - "style": "simple", - "explode": False, - } - } - ) - organization_id: Optional[str] = dataclasses.field( - default=None, - metadata={ - "query_param": { - "field_name": "organization_id", - "style": "form", - "explode": True, - } - }, - ) - x_pipeline_access_token: str = dataclasses.field( - default=None, - metadata={ - "header": { - "field_name": "X-PIPELINE-ACCESS-TOKEN", - "style": "simple", - "explode": False, - } - }, - ) + pass @dataclass_json @@ -69,7 +41,7 @@ class ConsumeEventResponseBody: @dataclasses.dataclass -class ConsumeEventResponse: +class ConsumeEventResponse(BaseResponse): """Response to consume an event from a pipeline topic Attributes: @@ -80,14 +52,11 @@ class ConsumeEventResponse: """ - content_type: str = dataclasses.field() - status_code: int = dataclasses.field() - raw_response: requests_http.Response = dataclasses.field() - body: Optional[ConsumeEventResponseBody] = dataclasses.field(default=None) + body: ConsumeEventResponseBody | None = dataclasses.field(default=None) def json(self): """Return the response body as a JSON object. - This method is to have cmopatibility with the requests.Response.json() method + This method is to have compatibility with the requests.Response.json() method Returns: dict: The transformed event as a JSON object diff --git a/src/glassflow/models/operations/consumefailed.py b/src/glassflow/models/operations/consumefailed.py index de1bff2..12fadf4 100644 --- a/src/glassflow/models/operations/consumefailed.py +++ b/src/glassflow/models/operations/consumefailed.py @@ -3,14 +3,14 @@ from __future__ import annotations import dataclasses -from typing import Optional -import requests as requests_http from dataclasses_json import config, dataclass_json +from .base import BasePipelineDataRequest, BaseResponse + @dataclasses.dataclass -class ConsumeFailedRequest: +class ConsumeFailedRequest(BasePipelineDataRequest): """Request to consume failed events from a pipeline Attributes: @@ -20,35 +20,7 @@ class ConsumeFailedRequest: """ - pipeline_id: str = dataclasses.field( - metadata={ - "path_param": { - "field_name": "pipeline_id", - "style": "simple", - "explode": False, - } - } - ) - organization_id: Optional[str] = dataclasses.field( - default=None, - metadata={ - "query_param": { - "field_name": "organization_id", - "style": "form", - "explode": True, - } - }, - ) - x_pipeline_access_token: str = dataclasses.field( - default=None, - metadata={ - "header": { - "field_name": "X-PIPELINE-ACCESS-TOKEN", - "style": "simple", - "explode": False, - } - }, - ) + pass @dataclass_json @@ -69,8 +41,8 @@ class ConsumeFailedResponseBody: @dataclasses.dataclass -class ConsumeFailedResponse: - """Response to consume an failed event from a pipeline +class ConsumeFailedResponse(BaseResponse): + """Response to consume a failed event from a pipeline Attributes: content_type: HTTP response content type for this operation @@ -80,14 +52,11 @@ class ConsumeFailedResponse: """ - content_type: str = dataclasses.field() - status_code: int = dataclasses.field() - raw_response: requests_http.Response = dataclasses.field() - body: Optional[ConsumeFailedResponseBody] = dataclasses.field(default=None) + body: ConsumeFailedResponseBody | None = dataclasses.field(default=None) def json(self): """Return the response body as a JSON object. - This method is to have cmopatibility with the requests.Response.json() method + This method is to have compatibility with the requests.Response.json() method Returns: dict: The transformed event as a JSON object diff --git a/src/glassflow/models/operations/function.py b/src/glassflow/models/operations/function.py new file mode 100644 index 0000000..93c8ecb --- /dev/null +++ b/src/glassflow/models/operations/function.py @@ -0,0 +1,31 @@ +from __future__ import annotations + +import dataclasses + +from ..api import FunctionEnvironments, FunctionLogs, SeverityCodeInput +from .base import BasePipelineManagementRequest, BaseResponse + + +@dataclasses.dataclass +class GetFunctionLogsRequest(BasePipelineManagementRequest): + page_size: int = 50 + page_token: str = None + severity_code: SeverityCodeInput | None = None + start_time: str | None = None + end_time: str | None = None + + +@dataclasses.dataclass +class GetFunctionLogsResponse(BaseResponse): + logs: FunctionLogs + next: str + + +@dataclasses.dataclass +class FetchFunctionRequest(BasePipelineManagementRequest): + pass + + +@dataclasses.dataclass +class UpdateFunctionRequest(BasePipelineManagementRequest): + environments: FunctionEnvironments | None = dataclasses.field(default=None) diff --git a/src/glassflow/models/operations/pipeline_crud.py b/src/glassflow/models/operations/pipeline_crud.py new file mode 100644 index 0000000..72d0c0f --- /dev/null +++ b/src/glassflow/models/operations/pipeline_crud.py @@ -0,0 +1,78 @@ +from __future__ import annotations + +import dataclasses +from enum import Enum + +from ..api import ( + CreatePipeline, + GetDetailedSpacePipeline, + PipelineState, + SinkConnector, + SourceConnector, + SpacePipeline, +) +from .base import BaseManagementRequest, BasePipelineManagementRequest, BaseResponse + + +@dataclasses.dataclass +class GetPipelineRequest(BasePipelineManagementRequest): + pass + + +@dataclasses.dataclass +class GetPipelineResponse(BaseResponse): + pipeline: GetDetailedSpacePipeline | None = dataclasses.field(default=None) + + +@dataclasses.dataclass +class CreatePipelineRequest(BaseManagementRequest, CreatePipeline): + pass + + +@dataclasses.dataclass +class UpdatePipelineRequest(BaseManagementRequest): + name: str | None = dataclasses.field(default=None) + state: PipelineState | None = dataclasses.field(default=None) + metadata: dict | None = dataclasses.field(default=None) + source_connector: SourceConnector | None = dataclasses.field(default=None) + sink_connector: SinkConnector | None = dataclasses.field(default=None) + + +@dataclasses.dataclass +class UpdatePipelineResponse(BaseResponse): + pipeline: GetDetailedSpacePipeline | None = dataclasses.field(default=None) + + +@dataclasses.dataclass +class CreatePipelineResponse(BaseResponse): + name: str + space_id: str + id: str + created_at: str + state: PipelineState + access_token: str + metadata: dict | None = dataclasses.field(default=None) + + +@dataclasses.dataclass +class DeletePipelineRequest(BasePipelineManagementRequest): + pass + + +class Order(str, Enum): + asc = "asc" + desc = "desc" + + +@dataclasses.dataclass +class ListPipelinesRequest(BaseManagementRequest): + space_id: list[str] | None = None + page_size: int = 50 + page: int = 1 + order_by: Order = Order.asc + + +@dataclasses.dataclass +class ListPipelinesResponse(BaseResponse): + total_amount: int + pipelines: list[SpacePipeline] diff --git a/src/glassflow/models/operations/publishevent.py b/src/glassflow/models/operations/publishevent.py index 00a830f..f5e78ba 100644 --- a/src/glassflow/models/operations/publishevent.py +++ b/src/glassflow/models/operations/publishevent.py @@ -3,9 +3,8 @@ from __future__ import annotations import dataclasses -from typing import Optional -import requests as requests_http +from .base import BasePipelineDataRequest, BaseResponse @dataclasses.dataclass @@ -14,7 +13,7 @@ class PublishEventRequestBody: @dataclasses.dataclass -class PublishEventRequest: +class PublishEventRequest(BasePipelineDataRequest): """Request to publish an event to a pipeline topic Attributes: @@ -24,35 +23,6 @@ class PublishEventRequest: request_body: The request body / event that should be published to the pipeline """ - pipeline_id: str = dataclasses.field( - metadata={ - "path_param": { - "field_name": "pipeline_id", - "style": "simple", - "explode": False, - } - } - ) - organization_id: Optional[str] = dataclasses.field( - default=None, - metadata={ - "query_param": { - "field_name": "organization_id", - "style": "form", - "explode": True, - } - }, - ) - x_pipeline_access_token: str = dataclasses.field( - default=None, - metadata={ - "header": { - "field_name": "X-PIPELINE-ACCESS-TOKEN", - "style": "simple", - "explode": False, - } - }, - ) request_body: dict = dataclasses.field( default=None, metadata={"request": {"media_type": "application/json"}} ) @@ -64,7 +34,7 @@ class PublishEventResponseBody: @dataclasses.dataclass -class PublishEventResponse: +class PublishEventResponse(BaseResponse): """Response object for publish event operation Attributes: @@ -75,7 +45,4 @@ class PublishEventResponse: """ - content_type: str = dataclasses.field() - status_code: int = dataclasses.field() - raw_response: requests_http.Response = dataclasses.field() - object: Optional[PublishEventResponseBody] = dataclasses.field(default=None) + object: PublishEventResponseBody | None = dataclasses.field(default=None) diff --git a/src/glassflow/models/operations/space_crud.py b/src/glassflow/models/operations/space_crud.py new file mode 100644 index 0000000..993c096 --- /dev/null +++ b/src/glassflow/models/operations/space_crud.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +import dataclasses +from enum import Enum + +from ..api import CreateSpace, SpaceScope +from .base import BaseManagementRequest, BaseResponse, BaseSpaceManagementDataRequest + + +@dataclasses.dataclass +class ListSpacesResponse(BaseResponse): + total_amount: int + spaces: list[SpaceScope] + + +class Order(str, Enum): + asc = "asc" + desc = "desc" + + +@dataclasses.dataclass +class ListSpacesRequest(BaseManagementRequest): + page_size: int = 50 + page: int = 1 + order_by: Order = Order.asc + + +@dataclasses.dataclass +class CreateSpaceRequest(BaseManagementRequest, CreateSpace): + pass + + +@dataclasses.dataclass +class CreateSpaceResponse(BaseResponse): + name: str + id: str + created_at: str + + +@dataclasses.dataclass +class DeleteSpaceRequest(BaseSpaceManagementDataRequest): + pass diff --git a/src/glassflow/models/operations/status_access_token.py b/src/glassflow/models/operations/status_access_token.py deleted file mode 100644 index 037d943..0000000 --- a/src/glassflow/models/operations/status_access_token.py +++ /dev/null @@ -1,34 +0,0 @@ -from __future__ import annotations - -import dataclasses - - -@dataclasses.dataclass -class StatusAccessTokenRequest: - """Request check the status of an access token - - Attributes: - pipeline_id: The id of the pipeline - x_pipeline_access_token: The access token of the pipeline - - """ - - pipeline_id: str = dataclasses.field( - metadata={ - "path_param": { - "field_name": "pipeline_id", - "style": "simple", - "explode": False, - } - } - ) - x_pipeline_access_token: str = dataclasses.field( - default=None, - metadata={ - "header": { - "field_name": "X-PIPELINE-ACCESS-TOKEN", - "style": "simple", - "explode": False, - } - }, - ) diff --git a/src/glassflow/pipeline.py b/src/glassflow/pipeline.py new file mode 100644 index 0000000..bde51d2 --- /dev/null +++ b/src/glassflow/pipeline.py @@ -0,0 +1,553 @@ +from __future__ import annotations + +from .client import APIClient +from .models import api, errors, operations +from .pipeline_data import PipelineDataSink, PipelineDataSource + + +class Pipeline(APIClient): + def __init__( + self, + personal_access_token: str, + name: str | None = None, + space_id: str | None = None, + id: str | None = None, + source_kind: str | None = None, + source_config: dict | None = None, + sink_kind: str | None = None, + sink_config: dict | None = None, + requirements: str | None = None, + transformation_file: str | None = None, + env_vars: list[dict[str, str]] | None = None, + state: api.PipelineState = "running", + organization_id: str | None = None, + metadata: dict | None = None, + created_at: str | None = None, + ): + """Creates a new GlassFlow pipeline object + + Args: + personal_access_token: The personal access token to authenticate + against GlassFlow + id: Pipeline ID + name: Name of the pipeline + space_id: ID of the GlassFlow Space you want to create the pipeline in + transformation_file: Path to file with transformation function of + the pipeline. + requirements: Requirements.txt of the pipeline + source_kind: Kind of source for the pipeline. If no source is + provided, the default source will be SDK + source_config: Configuration of the pipeline's source + sink_kind: Kind of sink for the pipeline. If no sink is provided, + the default sink will be SDK + sink_config: Configuration of the pipeline's sink + env_vars: Environment variables to pass to the pipeline + state: State of the pipeline after creation. + It can be either "running" or "paused" + metadata: Metadata of the pipeline + created_at: Timestamp when the pipeline was created + + Returns: + Pipeline: Pipeline object to interact with the GlassFlow API + + Raises: + FailNotFoundError: If the transformation file is provided and + does not exist + """ + super().__init__() + self.id = id + self.name = name + self.space_id = space_id + self.personal_access_token = personal_access_token + self.source_kind = source_kind + self.source_config = source_config + self.sink_kind = sink_kind + self.sink_config = sink_config + self.requirements = requirements + self.transformation_code = None + self.transformation_file = transformation_file + self.env_vars = env_vars + self.state = state + self.organization_id = organization_id + self.metadata = metadata if metadata is not None else {} + self.created_at = created_at + self.access_tokens = [] + + if self.transformation_file is not None: + self._read_transformation_file() + + if source_kind is not None and self.source_config is not None: + self.source_connector = dict( + kind=self.source_kind, + config=self.source_config, + ) + elif self.source_kind is None and self.source_config is None: + self.source_connector = None + else: + raise ValueError("Both source_kind and source_config must be provided") + + if self.sink_kind is not None and self.sink_config is not None: + self.sink_connector = dict( + kind=sink_kind, + config=sink_config, + ) + elif self.sink_kind is None and self.sink_config is None: + self.sink_connector = None + else: + raise ValueError("Both sink_kind and sink_config must be provided") + + def fetch(self) -> Pipeline: + """ + Fetches pipeline information from the GlassFlow API + + Returns: + self: Pipeline object + + Raises: + ValueError: If ID is not provided in the constructor + PipelineNotFoundError: If ID provided does not match any + existing pipeline in GlassFlow + UnauthorizedError: If the Personal Access Token is not + provider or is invalid + """ + if self.id is None: + raise ValueError( + "Pipeline id must be provided in order to fetch it's details" + ) + + request = operations.GetPipelineRequest( + pipeline_id=self.id, + organization_id=self.organization_id, + personal_access_token=self.personal_access_token, + ) + + base_res = self._request( + method="GET", + endpoint=f"/pipelines/{self.id}", + request=request, + ) + self._fill_pipeline_details(base_res.raw_response.json()) + + # Fetch Pipeline Access Tokens + self._list_access_tokens() + + # Fetch function source + self._get_function_artifact() + + return self + + def create(self) -> Pipeline: + """ + Creates a new GlassFlow pipeline + + Returns: + self: Pipeline object + + Raises: + ValueError: If name is not provided in the constructor + ValueError: If space_id is not provided in the constructor + ValueError: If transformation_file is not provided + in the constructor + """ + create_pipeline = api.CreatePipeline( + name=self.name, + space_id=self.space_id, + transformation_function=self.transformation_code, + requirements_txt=self.requirements, + source_connector=self.source_connector, + sink_connector=self.sink_connector, + environments=self.env_vars, + state=self.state, + metadata=self.metadata, + ) + if self.name is None: + raise ValueError("Name must be provided in order to create the pipeline") + if self.space_id is None: + raise ValueError("Argument space_id must be provided in the constructor") + if self.transformation_file is None: + raise ValueError( + "Argument transformation_file must be provided in the constructor" + ) + else: + self._read_transformation_file() + + request = operations.CreatePipelineRequest( + organization_id=self.organization_id, + personal_access_token=self.personal_access_token, + **create_pipeline.__dict__, + ) + + base_res = self._request(method="POST", endpoint="/pipelines", request=request) + res = operations.CreatePipelineResponse( + status_code=base_res.status_code, + content_type=base_res.content_type, + raw_response=base_res.raw_response, + **base_res.raw_response.json(), + ) + + self.id = res.id + self.created_at = res.created_at + self.access_tokens.append({"name": "default", "token": res.access_token}) + return self + + def update( + self, + name: str | None = None, + state: api.PipelineState | None = None, + transformation_file: str | None = None, + requirements: str | None = None, + metadata: dict | None = None, + source_kind: str | None = None, + source_config: dict | None = None, + sink_kind: str | None = None, + sink_config: dict | None = None, + env_vars: list[dict[str, str]] | None = None, + ) -> Pipeline: + """ + Updates a GlassFlow pipeline + + Args: + + name: Name of the pipeline + state: State of the pipeline after creation. + It can be either "running" or "paused" + transformation_file: Path to file with transformation function of + the pipeline. + requirements: Requirements.txt of the pipeline + source_kind: Kind of source for the pipeline. If no source is + provided, the default source will be SDK + source_config: Configuration of the pipeline's source + sink_kind: Kind of sink for the pipeline. If no sink is provided, + the default sink will be SDK + sink_config: Configuration of the pipeline's sink + env_vars: Environment variables to pass to the pipeline + metadata: Metadata of the pipeline + + Returns: + self: Updated pipeline + + """ + + # Fetch current pipeline data + self.fetch() + + if transformation_file is not None or requirements is not None: + if transformation_file is not None: + with open(transformation_file) as f: + file = f.read() + else: + file = self.transformation_code + + if requirements is None: + requirements = self.requirements + + self._upload_function_artifact(file, requirements) + self.requirements = requirements + self.transformation_code = file + + if source_kind is not None: + source_connector = dict( + kind=source_kind, + config=source_config, + ) + else: + source_connector = self.source_connector + + if sink_kind is not None: + sink_connector = dict( + kind=sink_kind, + config=sink_config, + ) + else: + sink_connector = self.sink_connector + + if env_vars is not None: + self._update_function(env_vars) + + request = operations.UpdatePipelineRequest( + organization_id=self.organization_id, + personal_access_token=self.personal_access_token, + name=name if name is not None else self.name, + state=state if state is not None else self.state, + metadata=metadata if metadata is not None else self.metadata, + source_connector=source_connector, + sink_connector=sink_connector, + ) + + base_res = self._request( + method="PATCH", endpoint=f"/pipelines/{self.id}", request=request + ) + self._fill_pipeline_details(base_res.raw_response.json()) + return self + + def delete(self) -> None: + """ + Deletes a GlassFlow pipeline + + Returns: + + Raises: + ValueError: If ID is not provided in the constructor + PipelineNotFoundError: If ID provided does not match any + existing pipeline in GlassFlow + UnauthorizedError: If the Personal Access Token is not + provided or is invalid + """ + if self.id is None: + raise ValueError("Pipeline id must be provided") + + request = operations.DeletePipelineRequest( + pipeline_id=self.id, + organization_id=self.organization_id, + personal_access_token=self.personal_access_token, + ) + self._request( + method="DELETE", + endpoint=f"/pipelines/{self.id}", + request=request, + ) + + def get_logs( + self, + page_size: int = 50, + page_token: str | None = None, + severity_code: api.SeverityCodeInput = api.SeverityCodeInput.integer_100, + start_time: str | None = None, + end_time: str | None = None, + ) -> operations.GetFunctionLogsResponse: + """ + Get the pipeline's logs + + Args: + page_size: Pagination size + page_token: Page token filter (use for pagination) + severity_code: Severity code filter + start_time: Start time filter + end_time: End time filter + + Returns: + PipelineFunctionsGetLogsResponse: Response with the logs + """ + request = operations.GetFunctionLogsRequest( + organization_id=self.organization_id, + personal_access_token=self.personal_access_token, + pipeline_id=self.id, + page_size=page_size, + page_token=page_token, + severity_code=severity_code, + start_time=start_time, + end_time=end_time, + ) + base_res = self._request( + method="GET", + endpoint=f"/pipelines/{self.id}/functions/main/logs", + request=request, + ) + base_res_json = base_res.raw_response.json() + logs = [ + api.FunctionLogEntry.from_dict(entry) for entry in base_res_json["logs"] + ] + return operations.GetFunctionLogsResponse( + status_code=base_res.status_code, + content_type=base_res.content_type, + raw_response=base_res.raw_response, + logs=logs, + next=base_res_json["next"], + ) + + def _list_access_tokens(self) -> Pipeline: + request = operations.ListAccessTokensRequest( + organization_id=self.organization_id, + personal_access_token=self.personal_access_token, + pipeline_id=self.id, + ) + base_res = self._request( + method="GET", + endpoint=f"/pipelines/{self.id}/access_tokens", + request=request, + ) + res_json = base_res.raw_response.json() + self.access_tokens = res_json["access_tokens"] + return self + + def _get_function_artifact(self) -> Pipeline: + """ + Fetch pipeline function source + + Returns: + self: Pipeline with function source details + """ + request = operations.GetArtifactRequest( + organization_id=self.organization_id, + personal_access_token=self.personal_access_token, + pipeline_id=self.id, + ) + base_res = self._request( + method="GET", + endpoint=f"/pipelines/{self.id}/functions/main/artifacts/latest", + request=request, + ) + res_json = base_res.raw_response.json() + self.transformation_code = res_json["transformation_function"] + self.requirements = res_json["requirements_txt"] + return self + + def _upload_function_artifact(self, file: str, requirements: str) -> None: + request = operations.PostArtifactRequest( + pipeline_id=self.id, + organization_id=self.organization_id, + personal_access_token=self.personal_access_token, + file=file, + requirementsTxt=requirements, + ) + try: + self._request( + method="POST", + endpoint=f"/pipelines/{self.id}/functions/main/artifacts", + request=request, + serialization_method="multipart", + ) + except errors.ClientError as e: + if e.status_code == 425: + # TODO: Figure out appropriate behaviour + print("Update still in progress") + else: + raise e + + def _update_function(self, env_vars): + """ + Patch pipeline function + + Args: + env_vars: Environment variables to update + + Returns: + self: Pipeline with updated function + """ + request = operations.UpdateFunctionRequest( + organization_id=self.organization_id, + personal_access_token=self.personal_access_token, + pipeline_id=self.id, + environments=env_vars, + ) + base_res = self._request( + method="PATCH", + endpoint=f"/pipelines/{self.id}/functions/main", + request=request, + ) + res_json = base_res.raw_response.json() + self.env_vars = res_json["environments"] + return self + + def get_source( + self, pipeline_access_token_name: str | None = None + ) -> PipelineDataSource: + """ + Get source client to publish data to the pipeline + + Args: + pipeline_access_token_name (str | None): Name of the pipeline + access token to use. If not specified, the default token + will be used + + Returns: + PipelineDataSource: Source client to publish data to the pipeline + + Raises: + ValueError: If pipeline id is not provided in the constructor + """ + return self._get_data_client("source", pipeline_access_token_name) + + def get_sink( + self, pipeline_access_token_name: str | None = None + ) -> PipelineDataSink: + """ + Get sink client to consume data from the pipeline + + Args: + pipeline_access_token_name (str | None): Name of the pipeline + access token to use. If not specified, the default token + will be used + + Returns: + PipelineDataSink: Sink client to consume data from the pipeline + + Raises: + ValueError: If pipeline id is not provided in the constructor + """ + return self._get_data_client("sink", pipeline_access_token_name) + + def _get_data_client( + self, client_type: str, pipeline_access_token_name: str | None = None + ) -> PipelineDataSource | PipelineDataSink: + if self.id is None: + raise ValueError("Pipeline id must be provided in the constructor") + elif len(self.access_tokens) == 0: + self._list_access_tokens() + + if pipeline_access_token_name is not None: + for t in self.access_tokens: + if t["name"] == pipeline_access_token_name: + token = t["token"] + break + else: + raise ValueError( + f"Token with name {pipeline_access_token_name} " f"was not found" + ) + else: + token = self.access_tokens[0]["token"] + if client_type == "source": + client = PipelineDataSource( + pipeline_id=self.id, + pipeline_access_token=token, + ) + elif client_type == "sink": + client = PipelineDataSink( + pipeline_id=self.id, + pipeline_access_token=token, + ) + else: + raise ValueError("client_type must be either source or sink") + return client + + def _request( + self, + method: str, + endpoint: str, + request: operations.BaseManagementRequest, + **kwargs, + ) -> operations.BaseResponse: + try: + return super()._request( + method=method, endpoint=endpoint, request=request, **kwargs + ) + except errors.ClientError as e: + if e.status_code == 404: + raise errors.PipelineNotFoundError(self.id, e.raw_response) from e + elif e.status_code == 401: + raise errors.UnauthorizedError(e.raw_response) from e + else: + raise e + + def _read_transformation_file(self): + try: + with open(self.transformation_file) as f: + self.transformation_code = f.read() + except FileNotFoundError: + raise + + def _fill_pipeline_details(self, pipeline_details: dict) -> Pipeline: + self.id = pipeline_details["id"] + self.name = pipeline_details["name"] + self.space_id = pipeline_details["space_id"] + self.state = pipeline_details["state"] + if pipeline_details["source_connector"]: + self.source_kind = pipeline_details["source_connector"]["kind"] + self.source_config = pipeline_details["source_connector"]["config"] + if pipeline_details["sink_connector"]: + self.sink_kind = pipeline_details["sink_connector"]["kind"] + self.sink_config = pipeline_details["sink_connector"]["config"] + self.created_at = pipeline_details["created_at"] + self.env_vars = pipeline_details["environments"] + + return self diff --git a/src/glassflow/pipeline_data.py b/src/glassflow/pipeline_data.py new file mode 100644 index 0000000..52d0380 --- /dev/null +++ b/src/glassflow/pipeline_data.py @@ -0,0 +1,215 @@ +import random +import time +from typing import Optional + +from .api_client import APIClient +from .models import errors, operations +from .models.operations.base import BasePipelineDataRequest, BaseResponse +from .utils import utils + + +class PipelineDataClient(APIClient): + """Base Client object to publish and consume events from the given pipeline. + + Attributes: + glassflow_config: GlassFlowConfig object to interact with GlassFlow API + pipeline_id: The pipeline id to interact with + pipeline_access_token: The access token to access the pipeline + """ + + def __init__(self, pipeline_id: str, pipeline_access_token: str): + super().__init__() + self.pipeline_id = pipeline_id + self.pipeline_access_token = pipeline_access_token + + def validate_credentials(self) -> None: + """ + Check if the pipeline credentials are valid and raise an error if not + """ + request = operations.StatusAccessTokenRequest( + pipeline_id=self.pipeline_id, + x_pipeline_access_token=self.pipeline_access_token, + ) + self._request( + method="GET", + endpoint="/pipelines/{pipeline_id}/status/access_token", + request=request, + ) + + def _request( + self, method: str, endpoint: str, request: BasePipelineDataRequest, **kwargs + ) -> BaseResponse: + try: + res = super()._request(method, endpoint, request, **kwargs) + except errors.ClientError as e: + if e.status_code == 401: + raise errors.PipelineAccessTokenInvalidError(e.raw_response) from e + elif e.status_code == 404: + raise errors.PipelineNotFoundError( + self.pipeline_id, e.raw_response + ) from e + else: + raise e + return res + + +class PipelineDataSource(PipelineDataClient): + def publish(self, request_body: dict) -> operations.PublishEventResponse: + """Push a new message into the pipeline + + Args: + request_body: The message to be published into the pipeline + + Returns: + PublishEventResponse: Response object containing the status + code and the raw response + + Raises: + ClientError: If an error occurred while publishing the event + """ + request = operations.PublishEventRequest( + pipeline_id=self.pipeline_id, + x_pipeline_access_token=self.pipeline_access_token, + request_body=request_body, + ) + base_res = self._request( + method="POST", + endpoint="/pipelines/{pipeline_id}/topics/input/events", + request=request, + ) + + return operations.PublishEventResponse( + status_code=base_res.status_code, + content_type=base_res.content_type, + raw_response=base_res.raw_response, + ) + + +class PipelineDataSink(PipelineDataClient): + def __init__(self, pipeline_id: str, pipeline_access_token: str): + super().__init__(pipeline_id, pipeline_access_token) + + # retry delay for consuming messages (in seconds) + self._consume_retry_delay_minimum = 1 + self._consume_retry_delay_current = 1 + self._consume_retry_delay_max = 60 + + def consume(self) -> operations.ConsumeEventResponse: + """Consume the last message from the pipeline + + Returns: + ConsumeEventResponse: Response object containing the status + code and the raw response + + Raises: + ClientError: If an error occurred while consuming the event + + """ + request = operations.ConsumeEventRequest( + pipeline_id=self.pipeline_id, + x_pipeline_access_token=self.pipeline_access_token, + ) + + self._respect_retry_delay() + base_res = self._request( + method="POST", + endpoint="/pipelines/{pipeline_id}/topics/output/events/consume", + request=request, + ) + + res = operations.ConsumeEventResponse( + status_code=base_res.status_code, + content_type=base_res.content_type, + raw_response=base_res.raw_response, + ) + + self._update_retry_delay(base_res.status_code) + if res.status_code == 200: + if not utils.match_content_type(res.content_type, "application/json"): + raise errors.UnknownContentTypeError(res.raw_response) + + self._consume_retry_delay_current = self._consume_retry_delay_minimum + body = utils.unmarshal_json( + res.raw_response.text, Optional[operations.ConsumeEventResponseBody] + ) + res.body = body + elif res.status_code == 204: + # No messages to be consumed. + # update the retry delay + # Return an empty response body + body = operations.ConsumeEventResponseBody("", "", {}) + res.body = body + elif res.status_code == 429: + # update the retry delay + body = operations.ConsumeEventResponseBody("", "", {}) + res.body = body + elif not utils.match_content_type(res.content_type, "application/json"): + raise errors.UnknownContentTypeError(res.raw_response) + + return res + + def consume_failed(self) -> operations.ConsumeFailedResponse: + """Consume the failed message from the pipeline + + Returns: + ConsumeFailedResponse: Response object containing the status + code and the raw response + + Raises: + ClientError: If an error occurred while consuming the event + + """ + request = operations.ConsumeFailedRequest( + pipeline_id=self.pipeline_id, + x_pipeline_access_token=self.pipeline_access_token, + ) + + self._respect_retry_delay() + base_res = self._request( + method="POST", + endpoint="/pipelines/{pipeline_id}/topics/failed/events/consume", + request=request, + ) + + res = operations.ConsumeFailedResponse( + status_code=base_res.status_code, + content_type=base_res.content_type, + raw_response=base_res.raw_response, + ) + + self._update_retry_delay(res.status_code) + if res.status_code == 200: + if not utils.match_content_type(res.content_type, "application/json"): + raise errors.UnknownContentTypeError(res.raw_response) + + self._consume_retry_delay_current = self._consume_retry_delay_minimum + body = utils.unmarshal_json( + res.raw_response.text, Optional[operations.ConsumeFailedResponseBody] + ) + res.body = body + elif res.status_code == 204: + # No messages to be consumed. Return an empty response body + body = operations.ConsumeFailedResponseBody("", "", {}) + res.body = body + elif res.status_code == 429: + # update the retry delay + body = operations.ConsumeEventResponseBody("", "", {}) + res.body = body + elif not utils.match_content_type(res.content_type, "application/json"): + raise errors.UnknownContentTypeError(res.raw_response) + return res + + def _update_retry_delay(self, status_code: int): + if status_code == 200: + self._consume_retry_delay_current = self._consume_retry_delay_minimum + elif status_code == 204 or status_code == 429: + self._consume_retry_delay_current *= 2 + self._consume_retry_delay_current = min( + self._consume_retry_delay_current, self._consume_retry_delay_max + ) + self._consume_retry_delay_current += random.uniform(0, 0.1) + + def _respect_retry_delay(self): + if self._consume_retry_delay_current > self._consume_retry_delay_minimum: + # sleep before making the request + time.sleep(self._consume_retry_delay_current) diff --git a/src/glassflow/pipelines.py b/src/glassflow/pipelines.py deleted file mode 100644 index 0fbbb43..0000000 --- a/src/glassflow/pipelines.py +++ /dev/null @@ -1,368 +0,0 @@ -import dataclasses -import random -import sys -import time -from typing import Optional - -import glassflow.utils as utils - -from .models import errors, operations - - -class PipelineClient: - """Client object to publish and consume events from the given pipeline. - - Attributes: - glassflow_client: GlassFlowClient object to interact with GlassFlow API - pipeline_id: The pipeline id to interact with - organization_id: Organization ID of the user. If not provided, the default organization will be used - pipeline_access_token: The access token to access the pipeline - """ - - def __init__( - self, glassflow_client, pipeline_id: str, pipeline_access_token: str - ) -> None: - """Create a new PipelineClient object to interact with a specific pipeline - - Args: - glassflow_client: GlassFlowClient object to interact with GlassFlow API - pipeline_id: The pipeline id to interact with - pipeline_access_token: The access token to access the pipeline - """ - self.glassflow_client = glassflow_client - self.pipeline_id = pipeline_id - self.organization_id = self.glassflow_client.organization_id - self.pipeline_access_token = pipeline_access_token - # retry delay for consuming messages (in seconds) - self._consume_retry_delay_minimum = 1 - self._consume_retry_delay_current = 1 - self._consume_retry_delay_max = 60 - - def is_access_token_valid(self) -> bool: - """ - Check if the pipeline access token is valid - - Returns: - Boolean: True if the pipeline access token is correct, False otherwise - """ - base_url = self.glassflow_client.glassflow_config.server_url - - request = operations.StatusAccessTokenRequest( - pipeline_id=self.pipeline_id, - x_pipeline_access_token=self.pipeline_access_token, - ) - - url = utils.generate_url( - operations.PublishEventRequest, - base_url, - "/pipelines/{pipeline_id}/status/access_token", - request, - ) - - headers = self._get_headers(request) - - client = self.glassflow_client.glassflow_config.client - - http_res = client.request("GET", url, headers=headers) - content_type = http_res.headers.get("Content-Type") - - if http_res.status_code == 200: - res = True - elif http_res.status_code == 401: - res = False - elif http_res.status_code in [400, 500]: - if utils.match_content_type(content_type, "application/json"): - out = utils.unmarshal_json(http_res.text, errors.Error) - out.raw_response = http_res - raise out - else: - raise errors.ClientError( - f"unknown content-type received: {content_type}", - http_res.status_code, - http_res.text, - http_res, - ) - elif 400 < http_res.status_code < 600: - raise errors.ClientError( - "API error occurred", http_res.status_code, http_res.text, http_res - ) - return res - - def is_valid(self) -> bool: - """ - Check if the pipeline exists and credentials are valid - - Returns: - Boolean: True if the pipeline exists and credentials are valid, False otherwise - """ - try: - return self.is_access_token_valid() - except errors.ClientError as e: - if e.status_code == 404: - return False - else: - raise e - - def publish(self, request_body: dict) -> operations.PublishEventResponse: - """Push a new message into the pipeline - - Args: - request_body: The message to be published into the pipeline - - Returns: - PublishEventResponse: Response object containing the status code and the raw response - - Raises: - ClientError: If an error occurred while publishing the event - """ - request = operations.PublishEventRequest( - organization_id=self.organization_id, - pipeline_id=self.pipeline_id, - x_pipeline_access_token=self.pipeline_access_token, - request_body=request_body, - ) - - base_url = self.glassflow_client.glassflow_config.server_url - - url = utils.generate_url( - operations.PublishEventRequest, - base_url, - "/pipelines/{pipeline_id}/topics/input/events", - request, - ) - - req_content_type, data, form = utils.serialize_request_body( - request, operations.PublishEventRequest, "request_body", False, True, "json" - ) - - headers = self._get_headers(request, req_content_type) - query_params = utils.get_query_params(operations.PublishEventRequest, request) - - client = self.glassflow_client.glassflow_config.client - - http_res = client.request( - "POST", url, params=query_params, data=data, files=form, headers=headers - ) - content_type = http_res.headers.get("Content-Type") - - res = operations.PublishEventResponse( - status_code=http_res.status_code, - content_type=content_type, - raw_response=http_res, - ) - - if http_res.status_code == 200: - pass - elif http_res.status_code in [400, 500]: - if utils.match_content_type(content_type, "application/json"): - out = utils.unmarshal_json(http_res.text, errors.Error) - out.raw_response = http_res - raise out - else: - raise errors.ClientError( - f"unknown content-type received: {content_type}", - http_res.status_code, - http_res.text, - http_res, - ) - elif 400 < http_res.status_code < 600: - raise errors.ClientError( - "API error occurred", http_res.status_code, http_res.text, http_res - ) - - return res - - def consume(self) -> operations.ConsumeEventResponse: - """Consume the last message from the pipeline - - Returns: - ConsumeEventResponse: Response object containing the status code and the raw response - - Raises: - ClientError: If an error occurred while consuming the event - - """ - request = operations.ConsumeEventRequest( - pipeline_id=self.pipeline_id, - organization_id=self.organization_id, - x_pipeline_access_token=self.pipeline_access_token, - ) - - base_url = self.glassflow_client.glassflow_config.server_url - - url = utils.generate_url( - operations.ConsumeEventRequest, - base_url, - "/pipelines/{pipeline_id}/topics/output/events/consume", - request, - ) - headers = self._get_headers(request) - query_params = utils.get_query_params(operations.ConsumeEventRequest, request) - - client = self.glassflow_client.glassflow_config.client - # make the request - self._respect_retry_delay() - - http_res = client.request("POST", url, params=query_params, headers=headers) - content_type = http_res.headers.get("Content-Type") - - res = operations.ConsumeEventResponse( - status_code=http_res.status_code, - content_type=content_type, - raw_response=http_res, - ) - - self._update_retry_delay(http_res.status_code) - if http_res.status_code == 200: - self._consume_retry_delay_current = self._consume_retry_delay_minimum - if utils.match_content_type(content_type, "application/json"): - body = utils.unmarshal_json( - http_res.text, Optional[operations.ConsumeEventResponseBody] - ) - res.body = body - else: - raise errors.ClientError( - f"unknown content-type received: {content_type}", - http_res.status_code, - http_res.text, - http_res, - ) - elif http_res.status_code == 204: - # No messages to be consumed. - # update the retry delay - # Return an empty response body - body = operations.ConsumeEventResponseBody("", "", {}) - res.body = body - elif http_res.status_code == 429: - # update the retry delay - body = operations.ConsumeEventResponseBody("", "", {}) - res.body = body - elif http_res.status_code in [400, 500]: - if utils.match_content_type(content_type, "application/json"): - out = utils.unmarshal_json(http_res.text, errors.Error) - out.raw_response = http_res - raise out - else: - raise errors.ClientError( - f"unknown content-type received: {content_type}", - http_res.status_code, - http_res.text, - http_res, - ) - elif 400 < http_res.status_code < 600: - raise errors.ClientError( - "API error occurred", http_res.status_code, http_res.text, http_res - ) - - return res - - def consume_failed(self) -> operations.ConsumeFailedResponse: - """Consume the failed message from the pipeline - - Returns: - ConsumeFailedResponse: Response object containing the status code and the raw response - - Raises: - ClientError: If an error occurred while consuming the event - - """ - request = operations.ConsumeFailedRequest( - pipeline_id=self.pipeline_id, - organization_id=self.organization_id, - x_pipeline_access_token=self.pipeline_access_token, - ) - - base_url = self.glassflow_client.glassflow_config.server_url - - url = utils.generate_url( - operations.ConsumeFailedRequest, - base_url, - "/pipelines/{pipeline_id}/topics/failed/events/consume", - request, - ) - headers = self._get_headers(request) - query_params = utils.get_query_params(operations.ConsumeFailedRequest, request) - - client = self.glassflow_client.glassflow_config.client - self._respect_retry_delay() - http_res = client.request("POST", url, params=query_params, headers=headers) - content_type = http_res.headers.get("Content-Type") - - res = operations.ConsumeFailedResponse( - status_code=http_res.status_code, - content_type=content_type, - raw_response=http_res, - ) - - self._update_retry_delay(http_res.status_code) - if http_res.status_code == 200: - if utils.match_content_type(content_type, "application/json"): - body = utils.unmarshal_json( - http_res.text, Optional[operations.ConsumeFailedResponseBody] - ) - res.body = body - else: - raise errors.ClientError( - f"unknown content-type received: {content_type}", - http_res.status_code, - http_res.text, - http_res, - ) - elif http_res.status_code == 204: - # No messages to be consumed. Return an empty response body - body = operations.ConsumeFailedResponseBody("", "", {}) - res.body = body - elif http_res.status_code in [400, 500]: - if utils.match_content_type(content_type, "application/json"): - out = utils.unmarshal_json(http_res.text, errors.Error) - out.raw_response = http_res - raise out - else: - raise errors.ClientError( - f"unknown content-type received: {content_type}", - http_res.status_code, - http_res.text, - http_res, - ) - elif 400 < http_res.status_code < 600: - raise errors.ClientError( - "API error occurred", http_res.status_code, http_res.text, http_res - ) - - return res - - def _get_headers( - self, request: dataclasses.dataclass, req_content_type: Optional[str] = None - ) -> dict: - headers = utils.get_req_specific_headers(request) - headers["Accept"] = "application/json" - headers["Gf-Client"] = self.glassflow_client.glassflow_config.glassflow_client - headers["User-Agent"] = self.glassflow_client.glassflow_config.user_agent - headers["Gf-Python-Version"] = ( - f"{sys.version_info.major}." - f"{sys.version_info.minor}." - f"{sys.version_info.micro}" - ) - - if req_content_type and req_content_type not in ( - "multipart/form-data", - "multipart/mixed", - ): - headers["content-type"] = req_content_type - - return headers - - def _update_retry_delay(self, status_code: int): - if status_code == 200: - self._consume_retry_delay_current = self._consume_retry_delay_minimum - elif status_code == 204 or status_code == 429: - self._consume_retry_delay_current *= 2 - self._consume_retry_delay_current = min( - self._consume_retry_delay_current, self._consume_retry_delay_max - ) - self._consume_retry_delay_current += random.uniform(0, 0.1) - - def _respect_retry_delay(self): - if self._consume_retry_delay_current > self._consume_retry_delay_minimum: - # sleep before making the request - time.sleep(self._consume_retry_delay_current) diff --git a/src/glassflow/space.py b/src/glassflow/space.py new file mode 100644 index 0000000..6ad088a --- /dev/null +++ b/src/glassflow/space.py @@ -0,0 +1,116 @@ +from __future__ import annotations + +from .client import APIClient +from .models import api, errors, operations + + +class Space(APIClient): + def __init__( + self, + personal_access_token: str, + name: str | None = None, + id: str | None = None, + created_at: str | None = None, + organization_id: str | None = None, + ): + """Creates a new GlassFlow pipeline object + + Args: + personal_access_token: The personal access token to authenticate + against GlassFlow + name: Name of the space + id: ID of the GlassFlow Space you want to create the pipeline in + created_at: Timestamp when the space was created + + Returns: + Space: Space object to interact with the GlassFlow API + + Raises: + """ + super().__init__() + self.name = name + self.id = id + self.created_at = created_at + self.organization_id = organization_id + self.personal_access_token = personal_access_token + + def create(self) -> Space: + """ + Creates a new GlassFlow space + + Returns: + self: Space object + + Raises: + ValueError: If name is not provided in the constructor + + """ + if self.name is None: + raise ValueError("Name must be provided in order to create the space") + create_space = api.CreateSpace(name=self.name) + request = operations.CreateSpaceRequest( + organization_id=self.organization_id, + personal_access_token=self.personal_access_token, + **create_space.__dict__, + ) + base_res = self._request(method="POST", endpoint="/spaces", request=request) + + res = operations.CreateSpaceResponse( + status_code=base_res.status_code, + content_type=base_res.content_type, + raw_response=base_res.raw_response, + **base_res.raw_response.json(), + ) + + self.id = res.id + self.created_at = res.created_at + self.name = res.name + return self + + def delete(self) -> None: + """ + Deletes a GlassFlow space + + Returns: + + Raises: + ValueError: If ID is not provided in the constructor + SpaceNotFoundError: If ID provided does not match any + existing space in GlassFlow + UnauthorizedError: If the Personal Access Token is not + provided or is invalid + """ + if self.id is None: + raise ValueError("Space id must be provided in the constructor") + + request = operations.DeleteSpaceRequest( + space_id=self.id, + organization_id=self.organization_id, + personal_access_token=self.personal_access_token, + ) + self._request( + method="DELETE", + endpoint=f"/spaces/{self.id}", + request=request, + ) + + def _request( + self, + method: str, + endpoint: str, + request: operations.BaseManagementRequest, + **kwargs, + ) -> operations.BaseResponse: + try: + return super()._request( + method=method, endpoint=endpoint, request=request, **kwargs + ) + except errors.ClientError as e: + if e.status_code == 404: + raise errors.SpaceNotFoundError(self.id, e.raw_response) from e + elif e.status_code == 401: + raise errors.UnauthorizedError(e.raw_response) from e + elif e.status_code == 409: + raise errors.SpaceIsNotEmptyError(e.raw_response) from e + else: + raise e diff --git a/src/glassflow/utils/__init__.py b/src/glassflow/utils/__init__.py index c7eca83..5f5b06d 100644 --- a/src/glassflow/utils/__init__.py +++ b/src/glassflow/utils/__init__.py @@ -1,6 +1,13 @@ -from .utils import (generate_url, get_field_name, get_query_params, - get_req_specific_headers, marshal_json, match_content_type, - serialize_request_body, unmarshal_json) +from .utils import ( + generate_url, + get_field_name, + get_query_params, + get_req_specific_headers, + marshal_json, + match_content_type, + serialize_request_body, + unmarshal_json, +) __all__ = [ "match_content_type", @@ -10,5 +17,5 @@ "get_query_params", "get_req_specific_headers", "get_field_name", - "marshal_json" -] \ No newline at end of file + "marshal_json", +] diff --git a/src/glassflow/utils/utils.py b/src/glassflow/utils/utils.py index 60e0586..08b2473 100644 --- a/src/glassflow/utils/utils.py +++ b/src/glassflow/utils/utils.py @@ -1,3 +1,5 @@ +# ruff: noqa: E501, SIM102 + import json import re from dataclasses import Field, dataclass, fields, is_dataclass, make_dataclass @@ -5,8 +7,7 @@ from decimal import Decimal from email.message import Message from enum import Enum -from typing import (Any, Callable, Dict, List, Tuple, Union, get_args, - get_origin) +from typing import Any, Callable, Dict, List, Tuple, Union, get_args, get_origin from xmlrpc.client import boolean from dataclasses_json import DataClassJsonMixin @@ -306,9 +307,8 @@ def serialize_request_body( serialization_method: str, encoder=None, ) -> Tuple[str, any, any]: - if request is None: - if not nullable and optional: - return None, None, None + if request is None and not nullable and optional: + return None, None, None if not is_dataclass(request) or not hasattr(request, request_field_name): return serialize_content_type( @@ -321,9 +321,8 @@ def serialize_request_body( request_val = getattr(request, request_field_name) - if request_val is None: - if not nullable and optional: - return None, None, None + if request_val is None and not nullable and optional: + return None, None, None request_fields: Tuple[Field, ...] = fields(request) request_metadata = None @@ -387,7 +386,7 @@ def serialize_multipart_form( file_name = "" field_name = "" - content = bytes() + content = b"" for file_field in file_fields: file_metadata = file_field.metadata.get("multipart_form") @@ -399,7 +398,7 @@ def serialize_multipart_form( else: field_name = file_metadata.get("field_name", file_field.name) file_name = getattr(val, file_field.name) - if field_name == "" or file_name == "" or content == bytes(): + if field_name == "" or file_name == "" or content == b"": raise Exception("invalid multipart/form-data file") form.append([field_name, [file_name, content]]) @@ -635,11 +634,7 @@ def match_content_type(content_type: str, pattern: str) -> boolean: return True parts = media_type.split("/") - if len(parts) == 2: - if pattern in (f"{parts[0]}/*", f"*/{parts[1]}"): - return True - - return False + return len(parts) == 2 and pattern in (f"{parts[0]}/*", f"*/{parts[1]}") def get_field_name(name): diff --git a/tests/data/transformation.py b/tests/data/transformation.py new file mode 100644 index 0000000..ff23e28 --- /dev/null +++ b/tests/data/transformation.py @@ -0,0 +1,3 @@ +def handler(data, log): + data["new_field"] = "new_value" + return data diff --git a/tests/data/transformation_2.py b/tests/data/transformation_2.py new file mode 100644 index 0000000..9f903c3 --- /dev/null +++ b/tests/data/transformation_2.py @@ -0,0 +1,4 @@ +def handler(data, log): + data["new_field"] = "new_value" + log.info(data) + return data diff --git a/tests/glassflow/conftest.py b/tests/glassflow/conftest.py index dc367f8..9932a4a 100644 --- a/tests/glassflow/conftest.py +++ b/tests/glassflow/conftest.py @@ -1,35 +1,4 @@ -import os -import uuid +from glassflow.api_client import APIClient -import pytest - -from glassflow.client import GlassFlowClient - - -@pytest.fixture -def client(): - return GlassFlowClient() - - -@pytest.fixture -def pipeline_credentials(): - return { - "pipeline_id": os.getenv("PIPELINE_ID"), - "pipeline_access_token": os.getenv("PIPELINE_ACCESS_TOKEN"), - } - - -@pytest.fixture -def pipeline_credentials_invalid_token(): - return { - "pipeline_id": os.getenv("PIPELINE_ID"), - "pipeline_access_token": "invalid-pipeline-access-token", - } - - -@pytest.fixture -def pipeline_credentials_invalid_id(): - return { - "pipeline_id": str(uuid.uuid4()), - "pipeline_access_token": os.getenv("PIPELINE_ACCESS_TOKEN"), - } +# Use staging api server +APIClient.glassflow_config.server_url = "https://staging.api.glassflow.dev/v1" diff --git a/tests/glassflow/integration_tests/client_test.py b/tests/glassflow/integration_tests/client_test.py new file mode 100644 index 0000000..7def0bc --- /dev/null +++ b/tests/glassflow/integration_tests/client_test.py @@ -0,0 +1,15 @@ +def test_get_pipeline_ok(client, creating_pipeline): + p = client.get_pipeline(pipeline_id=creating_pipeline.id) + + assert p.id == creating_pipeline.id + assert p.name == creating_pipeline.name + + +def test_list_pipelines_ok(client, creating_pipeline): + res = client.list_pipelines() + + assert res.status_code == 200 + assert res.content_type == "application/json" + assert res.total_amount >= 1 + assert res.pipelines[-1]["id"] == creating_pipeline.id + assert res.pipelines[-1]["name"] == creating_pipeline.name diff --git a/tests/glassflow/integration_tests/conftest.py b/tests/glassflow/integration_tests/conftest.py new file mode 100644 index 0000000..4ec92fb --- /dev/null +++ b/tests/glassflow/integration_tests/conftest.py @@ -0,0 +1,117 @@ +import os +import uuid + +import pytest + +from glassflow import ( + GlassFlowClient, + Pipeline, + PipelineDataSink, + PipelineDataSource, + Space, +) + + +@pytest.fixture(scope="session") +def client(): + return GlassFlowClient(os.getenv("PERSONAL_ACCESS_TOKEN")) + + +@pytest.fixture +def space(client): + return Space( + name="integration-tests", personal_access_token=client.personal_access_token + ) + + +@pytest.fixture +def creating_space(space): + space.create() + yield space + space.delete() + + +@pytest.fixture +def space_with_random_id(client): + return Space( + id=str(uuid.uuid4()), + personal_access_token=client.personal_access_token, + ) + + +@pytest.fixture +def space_with_random_id_and_invalid_token(client): + return Space( + id=str(uuid.uuid4()), + personal_access_token="invalid-token", + ) + + +@pytest.fixture +def pipeline(client, creating_space): + return Pipeline( + name="test_pipeline", + space_id=creating_space.id, + transformation_file="tests/data/transformation.py", + personal_access_token=client.personal_access_token, + ) + + +@pytest.fixture +def pipeline_with_random_id(client): + return Pipeline( + id=str(uuid.uuid4()), + personal_access_token=client.personal_access_token, + ) + + +@pytest.fixture +def pipeline_with_random_id_and_invalid_token(): + return Pipeline( + id=str(uuid.uuid4()), + personal_access_token="invalid-token", + ) + + +@pytest.fixture +def creating_pipeline(pipeline): + pipeline.create() + yield pipeline + pipeline.delete() + + +@pytest.fixture +def source(creating_pipeline): + return PipelineDataSource( + pipeline_id=creating_pipeline.id, + pipeline_access_token=creating_pipeline.access_tokens[0]["token"], + ) + + +@pytest.fixture +def source_with_invalid_access_token(creating_pipeline): + return PipelineDataSource( + pipeline_id=creating_pipeline.id, pipeline_access_token="invalid-access-token" + ) + + +@pytest.fixture +def source_with_non_existing_id(creating_pipeline): + return PipelineDataSource( + pipeline_id=str(uuid.uuid4()), + pipeline_access_token=creating_pipeline.access_tokens[0]["token"], + ) + + +@pytest.fixture +def source_with_published_events(source): + source.publish({"test_field": "test_value"}) + yield source + + +@pytest.fixture +def sink(source_with_published_events): + return PipelineDataSink( + pipeline_id=source_with_published_events.pipeline_id, + pipeline_access_token=source_with_published_events.pipeline_access_token, + ) diff --git a/tests/glassflow/integration_tests/pipeline_data_test.py b/tests/glassflow/integration_tests/pipeline_data_test.py new file mode 100644 index 0000000..1dcf9d3 --- /dev/null +++ b/tests/glassflow/integration_tests/pipeline_data_test.py @@ -0,0 +1,53 @@ +import pytest + +from glassflow import errors + + +def test_using_staging_server(source, sink): + assert source.glassflow_config.server_url == "https://staging.api.glassflow.dev/v1" + assert sink.glassflow_config.server_url == "https://staging.api.glassflow.dev/v1" + + +def test_validate_credentials_from_pipeline_data_source_ok(source): + try: + source.validate_credentials() + except Exception as e: + pytest.fail(e) + + +def test_validate_credentials_from_pipeline_data_source_fail_with_invalid_access_token( + source_with_invalid_access_token, +): + with pytest.raises(errors.PipelineAccessTokenInvalidError): + source_with_invalid_access_token.validate_credentials() + + +def test_validate_credentials_from_pipeline_data_source_fail_with_id_not_found( + source_with_non_existing_id, +): + with pytest.raises(errors.PipelineNotFoundError): + source_with_non_existing_id.validate_credentials() + + +def test_publish_to_pipeline_data_source_ok(source): + publish_response = source.publish({"test_field": "test_value"}) + assert publish_response.status_code == 200 + + +def test_consume_from_pipeline_data_sink_ok(sink): + n_tries = 0 + max_tries = 10 + while True: + if n_tries == max_tries: + pytest.fail("Max tries exceeded") + + consume_response = sink.consume() + assert consume_response.status_code in (200, 204) + if consume_response.status_code == 200: + assert consume_response.json() == { + "test_field": "test_value", + "new_field": "new_value", + } + break + else: + n_tries += 1 diff --git a/tests/glassflow/integration_tests/pipeline_test.py b/tests/glassflow/integration_tests/pipeline_test.py index f525214..144fca8 100644 --- a/tests/glassflow/integration_tests/pipeline_test.py +++ b/tests/glassflow/integration_tests/pipeline_test.py @@ -1,66 +1,95 @@ import pytest -from glassflow.models.errors import ClientError - - -def test_pipeline_is_access_token_valid_ok(client, pipeline_credentials): - pipeline = client.pipeline_client(**pipeline_credentials) - - is_valid = pipeline.is_access_token_valid() - assert is_valid - - -def test_pipeline_is_access_token_valid_not_ok( - client, pipeline_credentials_invalid_token -): - pipeline = client.pipeline_client(**pipeline_credentials_invalid_token) - - is_valid = pipeline.is_access_token_valid() - assert not is_valid - - -def test_pipeline_is_access_token_valid_with_invalid_credentials( - client, pipeline_credentials_invalid_id -): - pipeline = client.pipeline_client(**pipeline_credentials_invalid_id) - - with pytest.raises(ClientError) as exc_info: - pipeline.is_access_token_valid() - exc = exc_info.value - assert exc.status_code == 404 - - -def test_pipeline_is_valid_with_invalid_pipeline_id( - client, pipeline_credentials_invalid_id -): - pipeline = client.pipeline_client(**pipeline_credentials_invalid_id) - - assert pipeline.is_valid() is False - - -def test_pipeline_is_valid_with_invalid_pipeline_token( - client, pipeline_credentials_invalid_token -): - pipeline = client.pipeline_client(**pipeline_credentials_invalid_token) - - assert pipeline.is_valid() is False - - -def test_pipeline_is_valid_ok( - client, pipeline_credentials -): - pipeline = client.pipeline_client(**pipeline_credentials) - - assert pipeline.is_valid() is True - - -def test_pipeline_publish_and_consume(client, pipeline_credentials): - pipeline = client.pipeline_client(**pipeline_credentials) - publish_response = pipeline.publish({"test-key": "test-value"}) - assert publish_response.status_code == 200 +from glassflow.models import api, errors + + +def test_create_pipeline_ok(creating_pipeline): + assert creating_pipeline.name == "test_pipeline" + assert creating_pipeline.id is not None + + +def test_fetch_pipeline_ok(creating_pipeline): + creating_pipeline.fetch() + assert creating_pipeline.name == "test_pipeline" + assert creating_pipeline.id is not None + assert creating_pipeline.created_at is not None + + +def test_fetch_pipeline_fail_with_404(pipeline_with_random_id): + with pytest.raises(errors.PipelineNotFoundError): + pipeline_with_random_id.fetch() + + +def test_fetch_pipeline_fail_with_401(pipeline_with_random_id_and_invalid_token): + with pytest.raises(errors.UnauthorizedError): + pipeline_with_random_id_and_invalid_token.fetch() + + +def test_update_pipeline_ok(creating_pipeline): + updated_pipeline = creating_pipeline.update( + name="new_name", + sink_kind="webhook", + state="paused", + sink_config={ + "url": "www.test-url.com", + "method": "GET", + "headers": [{"name": "header1", "value": "header1"}], + }, + transformation_file="tests/data/transformation_2.py", + requirements="requests,pandas", + env_vars=[ + {"name": "env1", "value": "env1"}, + {"name": "env2", "value": "env2"}, + ], + ) + assert updated_pipeline.name == "new_name" + assert updated_pipeline.sink_kind == "webhook" + assert updated_pipeline.sink_config == { + "url": "www.test-url.com", + "method": "GET", + "headers": [{"name": "header1", "value": "header1"}], + } + assert updated_pipeline.env_vars == [ + {"name": "env1", "value": "env1"}, + {"name": "env2", "value": "env2"}, + ] + with open("tests/data/transformation_2.py") as f: + assert updated_pipeline.transformation_code == f.read() + + assert updated_pipeline.source_kind == creating_pipeline.source_kind + assert updated_pipeline.source_config == creating_pipeline.source_config + assert updated_pipeline.state == api.PipelineState.paused + + +def test_delete_pipeline_fail_with_404(pipeline_with_random_id): + with pytest.raises(errors.PipelineNotFoundError): + pipeline_with_random_id.delete() + + +def test_delete_pipeline_fail_with_401(pipeline_with_random_id_and_invalid_token): + with pytest.raises(errors.UnauthorizedError): + pipeline_with_random_id_and_invalid_token.delete() + + +def test_get_logs_from_pipeline_ok(creating_pipeline): + import time + + n_tries = 0 + max_tries = 10 while True: - consume_response = pipeline.consume() - assert consume_response.status_code in (200, 204) - if consume_response.status_code == 200: - assert consume_response.json() == {"test-key": "test-value"} + if n_tries == max_tries: + pytest.fail("Max tries reached") + + logs = creating_pipeline.get_logs() + if len(logs.logs) >= 2: break + else: + n_tries += 1 + time.sleep(1) + + assert logs.status_code == 200 + assert logs.content_type == "application/json" + assert logs.logs[0].payload.message == "Function is uploaded" + assert logs.logs[0].level == "INFO" + assert logs.logs[1].payload.message == "Pipeline is created" + assert logs.logs[1].level == "INFO" diff --git a/tests/glassflow/integration_tests/space_test.py b/tests/glassflow/integration_tests/space_test.py new file mode 100644 index 0000000..e17278b --- /dev/null +++ b/tests/glassflow/integration_tests/space_test.py @@ -0,0 +1,18 @@ +import pytest + +from glassflow import errors + + +def test_create_space_ok(creating_space): + assert creating_space.name == "integration-tests" + assert creating_space.id is not None + + +def test_delete_space_fail_with_404(space_with_random_id): + with pytest.raises(errors.SpaceNotFoundError): + space_with_random_id.delete() + + +def test_delete_space_fail_with_401(space_with_random_id_and_invalid_token): + with pytest.raises(errors.UnauthorizedError): + space_with_random_id_and_invalid_token.delete() diff --git a/tests/glassflow/unit_tests/__init__.py b/tests/glassflow/unit_tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/glassflow/unit_tests/client_test.py b/tests/glassflow/unit_tests/client_test.py new file mode 100644 index 0000000..6363263 --- /dev/null +++ b/tests/glassflow/unit_tests/client_test.py @@ -0,0 +1,48 @@ +import pytest + +from glassflow.models import errors + + +@pytest.fixture +def list_pipelines_response(): + return { + "total_amount": 1, + "pipelines": [ + { + "name": "test-name", + "space_id": "test-space-id", + "metadata": {"additionalProp1": {}}, + "id": "test-id", + "created_at": "2024-09-25T13:52:17.910Z", + "state": "running", + "space_name": "test-space-name", + } + ], + } + + +def test_list_pipelines_ok(requests_mock, list_pipelines_response, client): + requests_mock.get( + client.glassflow_config.server_url + "/pipelines", + json=list_pipelines_response, + status_code=200, + headers={"Content-Type": "application/json"}, + ) + + res = client.list_pipelines() + + assert res.status_code == 200 + assert res.content_type == "application/json" + assert res.total_amount == list_pipelines_response["total_amount"] + assert res.pipelines == list_pipelines_response["pipelines"] + + +def test_list_pipelines_fail_with_401(requests_mock, client): + requests_mock.get( + client.glassflow_config.server_url + "/pipelines", + status_code=401, + headers={"Content-Type": "application/json"}, + ) + + with pytest.raises(errors.UnauthorizedError): + client.list_pipelines() diff --git a/tests/glassflow/unit_tests/conftest.py b/tests/glassflow/unit_tests/conftest.py new file mode 100644 index 0000000..ea936f0 --- /dev/null +++ b/tests/glassflow/unit_tests/conftest.py @@ -0,0 +1,170 @@ +import pytest + +from glassflow import GlassFlowClient + + +@pytest.fixture +def client(): + return GlassFlowClient() + + +@pytest.fixture +def get_pipeline_request_mock(client, requests_mock, fetch_pipeline_response): + return requests_mock.get( + client.glassflow_config.server_url + "/pipelines/test-id", + json=fetch_pipeline_response, + status_code=200, + headers={"Content-Type": "application/json"}, + ) + + +@pytest.fixture +def get_access_token_request_mock( + client, requests_mock, fetch_pipeline_response, access_tokens_response +): + return requests_mock.get( + client.glassflow_config.server_url + + f"/pipelines/{fetch_pipeline_response['id']}/access_tokens", + json=access_tokens_response, + status_code=200, + headers={"Content-Type": "application/json"}, + ) + + +@pytest.fixture +def get_pipeline_function_source_request_mock( + client, requests_mock, fetch_pipeline_response, function_source_response +): + return requests_mock.get( + client.glassflow_config.server_url + + f"/pipelines/{fetch_pipeline_response['id']}/functions/main/artifacts/latest", + json=function_source_response, + status_code=200, + headers={"Content-Type": "application/json"}, + ) + + +@pytest.fixture +def update_pipeline_request_mock( + client, requests_mock, fetch_pipeline_response, update_pipeline_response +): + return requests_mock.patch( + client.glassflow_config.server_url + + f"/pipelines/{fetch_pipeline_response['id']}", + json=update_pipeline_response, + status_code=200, + headers={"Content-Type": "application/json"}, + ) + + +@pytest.fixture +def fetch_pipeline_response(): + return { + "id": "test-id", + "name": "test-name", + "space_id": "test-space-id", + "metadata": {}, + "created_at": "2024-09-23T10:08:45.529Z", + "state": "running", + "space_name": "test-space-name", + "source_connector": { + "kind": "google_pubsub", + "config": { + "project_id": "test-project", + "subscription_id": "test-subscription", + "credentials_json": "credentials.json", + }, + }, + "sink_connector": { + "kind": "webhook", + "config": { + "url": "www.test-url.com", + "method": "GET", + "headers": [ + {"name": "header1", "value": "header1"}, + {"name": "header2", "value": "header2"}, + ], + }, + }, + "environments": [{"test-var": "test-var"}], + } + + +@pytest.fixture +def update_pipeline_response(fetch_pipeline_response): + fetch_pipeline_response["name"] = "updated name" + fetch_pipeline_response["source_connector"] = None + return fetch_pipeline_response + + +@pytest.fixture +def create_pipeline_response(): + return { + "name": "test-name", + "space_id": "string", + "metadata": {"additionalProp1": {}}, + "id": "test-id", + "created_at": "2024-09-23T10:08:45.529Z", + "state": "running", + "access_token": "string", + } + + +@pytest.fixture +def create_space_response(): + return { + "name": "test-space", + "id": "test-space-id", + "created_at": "2024-09-30T02:47:51.901Z", + } + + +@pytest.fixture +def access_tokens_response(): + return { + "total_amount": 2, + "access_tokens": [ + { + "name": "token1", + "id": "string", + "token": "string", + "created_at": "2024-09-25T10:46:18.468Z", + }, + { + "name": "token2", + "id": "string", + "token": "string", + "created_at": "2024-09-26T04:28:51.782Z", + }, + ], + } + + +@pytest.fixture +def function_source_response(): + return { + "files": [{"name": "string", "content": "string"}], + "transformation_function": "string", + "requirements_txt": "string", + } + + +@pytest.fixture +def get_logs_response(): + return { + "logs": [ + { + "level": "INFO", + "severity_code": 0, + "timestamp": "2024-09-30T16:04:08.211Z", + "payload": {"message": "Info Message Log", "additionalProp1": {}}, + }, + { + "level": "ERROR", + "severity_code": 500, + "timestamp": "2024-09-30T16:04:08.211Z", + "payload": {"message": "Error Message Log", "additionalProp1": {}}, + }, + ], + "next": "string", + } diff --git a/tests/glassflow/unit_tests/pipeline_data_test.py b/tests/glassflow/unit_tests/pipeline_data_test.py new file mode 100644 index 0000000..bc8bd79 --- /dev/null +++ b/tests/glassflow/unit_tests/pipeline_data_test.py @@ -0,0 +1,310 @@ +import pytest + +from glassflow import PipelineDataSink, PipelineDataSource +from glassflow.models import errors +from glassflow.pipeline_data import PipelineDataClient + + +@pytest.fixture +def consume_payload(): + return { + "req_id": "string", + "receive_time": "2024-09-23T07:28:27.958Z", + "payload": {}, + "event_context": { + "request_id": "string", + "external_id": "string", + "receive_time": "2024-09-23T07:28:27.958Z", + }, + "status": "string", + "response": {}, + } + + +def test_validate_credentials_ok(requests_mock): + data_client = PipelineDataClient( + pipeline_id="test-id", + pipeline_access_token="test-token", + ) + requests_mock.get( + data_client.glassflow_config.server_url + + "/pipelines/test-id/status/access_token", + status_code=200, + headers={ + "Content-Type": "application/json", + "X-pipeline-access-token": data_client.pipeline_access_token, + }, + ) + data_client.validate_credentials() + + +def test_push_to_pipeline_data_source_ok(requests_mock): + source = PipelineDataSource( + pipeline_id="test-id", + pipeline_access_token="test-access-token", + ) + requests_mock.post( + source.glassflow_config.server_url + "/pipelines/test-id/topics/input/events", + status_code=200, + headers={ + "Content-Type": "application/json", + "X-pipeline-access-token": "test-access-token", + }, + ) + + res = source.publish({"test": "test"}) + + assert res.status_code == 200 + assert res.content_type == "application/json" + + +def test_push_to_pipeline_data_source_fail_with_404(requests_mock): + source = PipelineDataSource( + pipeline_id="test-id", + pipeline_access_token="test-access-token", + ) + requests_mock.post( + source.glassflow_config.server_url + "/pipelines/test-id/topics/input/events", + status_code=404, + headers={ + "Content-Type": "application/json", + "X-pipeline-access-token": "test-access-token", + }, + ) + + with pytest.raises(errors.PipelineNotFoundError): + source.publish({"test": "test"}) + + +def test_push_to_pipeline_data_source_fail_with_401(requests_mock): + source = PipelineDataSource( + pipeline_id="test-id", + pipeline_access_token="test-access-token", + ) + requests_mock.post( + source.glassflow_config.server_url + "/pipelines/test-id/topics/input/events", + status_code=401, + headers={ + "Content-Type": "application/json", + "X-pipeline-access-token": "test-access-token", + }, + ) + + with pytest.raises(errors.PipelineAccessTokenInvalidError): + source.publish({"test": "test"}) + + +def test_consume_from_pipeline_data_sink_ok(requests_mock, consume_payload): + sink = PipelineDataSink( + pipeline_id="test-id", + pipeline_access_token="test-access-token", + ) + requests_mock.post( + sink.glassflow_config.server_url + + "/pipelines/test-id/topics/output/events/consume", + json=consume_payload, + status_code=200, + headers={ + "Content-Type": "application/json", + "X-pipeline-access-token": "test-access-token", + }, + ) + + res = sink.consume() + + assert res.status_code == 200 + assert res.content_type == "application/json" + assert res.body.req_id == consume_payload["req_id"] + + +def test_consume_from_pipeline_data_sink_fail_with_404(requests_mock): + sink = PipelineDataSink( + pipeline_id="test-id", + pipeline_access_token="test-access-token", + ) + requests_mock.post( + sink.glassflow_config.server_url + + "/pipelines/test-id/topics/output/events/consume", + json={"test-data": "test-data"}, + status_code=404, + headers={ + "Content-Type": "application/json", + "X-pipeline-access-token": "test-access-token", + }, + ) + + with pytest.raises(errors.PipelineNotFoundError): + sink.consume() + + +def test_consume_from_pipeline_data_sink_fail_with_401(requests_mock): + sink = PipelineDataSink( + pipeline_id="test-id", + pipeline_access_token="test-access-token", + ) + requests_mock.post( + sink.glassflow_config.server_url + + "/pipelines/test-id/topics/output/events/consume", + json={"test-data": "test-data"}, + status_code=401, + headers={ + "Content-Type": "application/json", + "X-pipeline-access-token": "test-access-token", + }, + ) + + with pytest.raises(errors.PipelineAccessTokenInvalidError): + sink.consume() + + +def test_consume_from_pipeline_data_sink_ok_with_empty_response(requests_mock): + sink = PipelineDataSink( + pipeline_id="test-id", + pipeline_access_token="test-access-token", + ) + requests_mock.post( + sink.glassflow_config.server_url + + "/pipelines/test-id/topics/output/events/consume", + status_code=204, + headers={ + "Content-Type": "application/json", + "X-pipeline-access-token": "test-access-token", + }, + ) + + res = sink.consume() + + assert res.status_code == 204 + assert res.content_type == "application/json" + assert res.body.event == {} + + +def test_consume_from_pipeline_data_sink_ok_with_too_many_requests(requests_mock): + sink = PipelineDataSink( + pipeline_id="test-id", + pipeline_access_token="test-access-token", + ) + requests_mock.post( + sink.glassflow_config.server_url + + "/pipelines/test-id/topics/output/events/consume", + status_code=429, + headers={ + "Content-Type": "application/json", + "X-pipeline-access-token": "test-access-token", + }, + ) + + res = sink.consume() + + assert res.status_code == 429 + assert res.content_type == "application/json" + assert res.body.event == {} + + +def test_consume_failed_from_pipeline_data_sink_ok(requests_mock, consume_payload): + sink = PipelineDataSink( + pipeline_id="test-id", + pipeline_access_token="test-access-token", + ) + requests_mock.post( + sink.glassflow_config.server_url + + "/pipelines/test-id/topics/failed/events/consume", + json=consume_payload, + status_code=200, + headers={ + "Content-Type": "application/json", + "X-pipeline-access-token": "test-access-token", + }, + ) + + res = sink.consume_failed() + + assert res.status_code == 200 + assert res.content_type == "application/json" + assert res.body.req_id == consume_payload["req_id"] + + +def test_consume_failed_from_pipeline_data_sink_ok_with_empty_response(requests_mock): + sink = PipelineDataSink( + pipeline_id="test-id", + pipeline_access_token="test-access-token", + ) + requests_mock.post( + sink.glassflow_config.server_url + + "/pipelines/test-id/topics/failed/events/consume", + status_code=204, + headers={ + "Content-Type": "application/json", + "X-pipeline-access-token": "test-access-token", + }, + ) + + res = sink.consume_failed() + + assert res.status_code == 204 + assert res.content_type == "application/json" + assert res.body.event == {} + + +def test_consume_failed_from_pipeline_data_sink_ok_with_too_many_requests( + requests_mock, +): + sink = PipelineDataSink( + pipeline_id="test-id", + pipeline_access_token="test-access-token", + ) + requests_mock.post( + sink.glassflow_config.server_url + + "/pipelines/test-id/topics/failed/events/consume", + status_code=429, + headers={ + "Content-Type": "application/json", + "X-pipeline-access-token": "test-access-token", + }, + ) + + res = sink.consume_failed() + + assert res.status_code == 429 + assert res.content_type == "application/json" + assert res.body.event == {} + + +def test_consume_failed_from_pipeline_data_sink_fail_with_404(requests_mock): + sink = PipelineDataSink( + pipeline_id="test-id", + pipeline_access_token="test-access-token", + ) + requests_mock.post( + sink.glassflow_config.server_url + + "/pipelines/test-id/topics/failed/events/consume", + json={"test-data": "test-data"}, + status_code=404, + headers={ + "Content-Type": "application/json", + "X-pipeline-access-token": "test-access-token", + }, + ) + + with pytest.raises(errors.PipelineNotFoundError): + sink.consume_failed() + + +def test_consume_failed_from_pipeline_data_sink_fail_with_401(requests_mock): + sink = PipelineDataSink( + pipeline_id="test-id", + pipeline_access_token="test-access-token", + ) + requests_mock.post( + sink.glassflow_config.server_url + + "/pipelines/test-id/topics/failed/events/consume", + json={"test-data": "test-data"}, + status_code=401, + headers={ + "Content-Type": "application/json", + "X-pipeline-access-token": "test-access-token", + }, + ) + + with pytest.raises(errors.PipelineAccessTokenInvalidError): + sink.consume_failed() diff --git a/tests/glassflow/unit_tests/pipeline_test.py b/tests/glassflow/unit_tests/pipeline_test.py new file mode 100644 index 0000000..4521d6d --- /dev/null +++ b/tests/glassflow/unit_tests/pipeline_test.py @@ -0,0 +1,272 @@ +import pytest + +from glassflow.models import errors +from glassflow.pipeline import Pipeline + + +def test_pipeline_with_transformation_file(): + try: + p = Pipeline( + transformation_file="tests/data/transformation.py", + personal_access_token="test-token", + ) + p._read_transformation_file() + assert p.transformation_code is not None + except Exception as e: + pytest.fail(e) + + +def test_pipeline_fail_with_file_not_found(): + with pytest.raises(FileNotFoundError): + p = Pipeline( + transformation_file="fake_file.py", personal_access_token="test-token" + ) + p._read_transformation_file() + + +def test_pipeline_fail_with_missing_sink_data(): + with pytest.raises(ValueError) as e: + Pipeline( + transformation_file="tests/data/transformation.py", + personal_access_token="test-token", + sink_kind="google_pubsub", + ) + assert str(e.value) == "Both sink_kind and sink_config must be provided" + + +def test_pipeline_fail_with_missing_source_data(): + with pytest.raises(ValueError) as e: + Pipeline( + transformation_file="tests/data/transformation.py", + personal_access_token="test-token", + source_kind="google_pubsub", + ) + assert str(e.value) == "Both source_kind and source_config must be provided" + + +def test_fetch_pipeline_ok( + get_pipeline_request_mock, + get_access_token_request_mock, + get_pipeline_function_source_request_mock, + fetch_pipeline_response, + function_source_response, +): + pipeline = Pipeline( + id=fetch_pipeline_response["id"], + personal_access_token="test-token", + ).fetch() + + assert pipeline.name == fetch_pipeline_response["name"] + assert len(pipeline.access_tokens) > 0 + assert ( + pipeline.transformation_code + == function_source_response["transformation_function"] + ) + assert pipeline.requirements == function_source_response["requirements_txt"] + + +def test_fetch_pipeline_fail_with_404(requests_mock, fetch_pipeline_response, client): + requests_mock.get( + client.glassflow_config.server_url + "/pipelines/test-id", + json=fetch_pipeline_response, + status_code=404, + headers={"Content-Type": "application/json"}, + ) + + with pytest.raises(errors.PipelineNotFoundError): + Pipeline( + id=fetch_pipeline_response["id"], + personal_access_token="test-token", + ).fetch() + + +def test_fetch_pipeline_fail_with_401(requests_mock, fetch_pipeline_response, client): + requests_mock.get( + client.glassflow_config.server_url + "/pipelines/test-id", + json=fetch_pipeline_response, + status_code=401, + headers={"Content-Type": "application/json"}, + ) + + with pytest.raises(errors.UnauthorizedError): + Pipeline( + id=fetch_pipeline_response["id"], + personal_access_token="test-token", + ).fetch() + + +def test_create_pipeline_ok( + requests_mock, fetch_pipeline_response, create_pipeline_response, client +): + requests_mock.post( + client.glassflow_config.server_url + "/pipelines", + json=create_pipeline_response, + status_code=200, + headers={"Content-Type": "application/json"}, + ) + pipeline = Pipeline( + name=fetch_pipeline_response["name"], + space_id=create_pipeline_response["space_id"], + transformation_file="tests/data/transformation.py", + personal_access_token="test-token", + ).create() + + assert pipeline.id == "test-id" + assert pipeline.name == "test-name" + + +def test_create_pipeline_fail_with_missing_name(client): + with pytest.raises(ValueError) as e: + Pipeline( + space_id="test-space-id", + transformation_file="tests/data/transformation.py", + personal_access_token="test-token", + ).create() + + assert e.value.__str__() == ( + "Name must be provided in order to " "create the pipeline" + ) + + +def test_create_pipeline_fail_with_missing_space_id(client): + with pytest.raises(ValueError) as e: + Pipeline( + name="test-name", + transformation_file="tests/data/transformation.py", + personal_access_token="test-token", + ).create() + + assert str(e.value) == ("Argument space_id must be provided in the constructor") + + +def test_create_pipeline_fail_with_missing_transformation(client): + with pytest.raises(ValueError) as e: + Pipeline( + name="test-name", + space_id="test-space-id", + personal_access_token="test-token", + ).create() + + assert str(e.value) == ( + "Argument transformation_file must be provided in the constructor" + ) + + +def test_update_pipeline_ok( + get_pipeline_request_mock, + get_access_token_request_mock, + get_pipeline_function_source_request_mock, + update_pipeline_request_mock, + fetch_pipeline_response, + update_pipeline_response, +): + pipeline = ( + Pipeline(personal_access_token="test-token") + ._fill_pipeline_details(fetch_pipeline_response) + .update() + ) + + assert pipeline.name == update_pipeline_response["name"] + assert pipeline.source_connector == update_pipeline_response["source_connector"] + + +def test_delete_pipeline_ok(requests_mock, client): + requests_mock.delete( + client.glassflow_config.server_url + "/pipelines/test-pipeline-id", + status_code=204, + headers={"Content-Type": "application/json"}, + ) + pipeline = Pipeline( + id="test-pipeline-id", + personal_access_token="test-token", + ) + pipeline.delete() + + +def test_delete_pipeline_fail_with_missing_pipeline_id(client): + pipeline = Pipeline( + personal_access_token="test-token", + ) + with pytest.raises(ValueError): + pipeline.delete() + + +def test_get_source_from_pipeline_ok( + client, + fetch_pipeline_response, + get_pipeline_request_mock, + get_access_token_request_mock, + get_pipeline_function_source_request_mock, + access_tokens_response, +): + p = client.get_pipeline(fetch_pipeline_response["id"]) + source = p.get_source() + source2 = p.get_source(pipeline_access_token_name="token2") + + assert source.pipeline_id == p.id + assert ( + source.pipeline_access_token + == access_tokens_response["access_tokens"][0]["token"] + ) + + assert source2.pipeline_id == p.id + assert ( + source2.pipeline_access_token + == access_tokens_response["access_tokens"][1]["token"] + ) + + +def test_get_source_from_pipeline_fail_with_missing_id(client): + pipeline = Pipeline(personal_access_token="test-token") + with pytest.raises(ValueError) as e: + pipeline.get_source() + + assert e.value.__str__() == "Pipeline id must be provided in the constructor" + + +def test_get_sink_from_pipeline_ok( + client, + fetch_pipeline_response, + get_pipeline_request_mock, + get_access_token_request_mock, + get_pipeline_function_source_request_mock, + access_tokens_response, +): + p = client.get_pipeline(fetch_pipeline_response["id"]) + sink = p.get_sink() + sink2 = p.get_sink(pipeline_access_token_name="token2") + + assert sink.pipeline_id == p.id + assert ( + sink.pipeline_access_token + == access_tokens_response["access_tokens"][0]["token"] + ) + + assert sink2.pipeline_id == p.id + assert ( + sink2.pipeline_access_token + == access_tokens_response["access_tokens"][1]["token"] + ) + + +def test_get_logs_from_pipeline_ok(client, requests_mock, get_logs_response): + pipeline_id = "test-pipeline-id" + requests_mock.get( + client.glassflow_config.server_url + + f"/pipelines/{pipeline_id}/functions/main/logs", + json=get_logs_response, + status_code=200, + headers={"Content-Type": "application/json"}, + ) + pipeline = Pipeline(id=pipeline_id, personal_access_token="test-token") + logs = pipeline.get_logs() + + assert logs.status_code == 200 + assert logs.content_type == "application/json" + assert logs.next == get_logs_response["next"] + for idx, log in enumerate(logs.logs): + assert log.level == get_logs_response["logs"][idx]["level"] + assert log.severity_code == get_logs_response["logs"][idx]["severity_code"] + assert ( + log.payload.message == get_logs_response["logs"][idx]["payload"]["message"] + ) diff --git a/tests/glassflow/unit_tests/space_test.py b/tests/glassflow/unit_tests/space_test.py new file mode 100644 index 0000000..db76510 --- /dev/null +++ b/tests/glassflow/unit_tests/space_test.py @@ -0,0 +1,46 @@ +import pytest + +from glassflow import Space + + +def test_create_space_ok(requests_mock, create_space_response, client): + requests_mock.post( + client.glassflow_config.server_url + "/spaces", + json=create_space_response, + status_code=200, + headers={"Content-Type": "application/json"}, + ) + space = Space( + name=create_space_response["name"], personal_access_token="test-token" + ).create() + + assert space.name == create_space_response["name"] + assert space.id == create_space_response["id"] + assert space.created_at == create_space_response["created_at"] + + +def test_create_space_fail_with_missing_name(client): + with pytest.raises(ValueError) as e: + Space(personal_access_token="test-token").create() + + assert str(e.value) == ("Name must be provided in order to create the space") + + +def test_delete_space_ok(requests_mock, client): + requests_mock.delete( + client.glassflow_config.server_url + "/spaces/test-space-id", + status_code=204, + headers={"Content-Type": "application/json"}, + ) + space = Space( + id="test-space-id", + personal_access_token="test-token", + ) + space.delete() + + +def test_delete_space_fail_with_missing_id(client): + with pytest.raises(ValueError) as e: + Space(personal_access_token="test-token").delete() + + assert str(e.value) == "Space id must be provided in the constructor"