diff --git a/.gitignore b/.gitignore index 249da53..f09f43d 100644 --- a/.gitignore +++ b/.gitignore @@ -12,4 +12,5 @@ dist/ build .env tests/reports -.coverage \ No newline at end of file +.coverage +.idea/ \ No newline at end of file diff --git a/docs/reference.md b/docs/reference.md index 5c87d68..575628c 100644 --- a/docs/reference.md +++ b/docs/reference.md @@ -1,7 +1,8 @@ ::: src.glassflow.client ::: src.glassflow.pipeline ::: src.glassflow.pipeline_data +::: src.glassflow.secret ::: src.glassflow.space ::: src.glassflow.config ::: src.glassflow.models.errors -::: src.glassflow.models.operations +::: src.glassflow.models.responses diff --git a/makefile b/makefile index 9efa0c0..aaa7953 100644 --- a/makefile +++ b/makefile @@ -9,16 +9,7 @@ add-noqa: generate-api-data-models echo "Add noqa comment ..." sed -i '' -e '1s/^/# ruff: noqa\n/' $(API_DATA_MODELS) - -add-dataclass-json-decorators: add-noqa - echo "Import dataclass_json ..." - sed -i '' -e '/^from __future__ import annotations/a\'$$'\n''from dataclasses_json import dataclass_json' $(API_DATA_MODELS) - - - echo "Add dataclass_json decorators ..." - sed -i '' -e '/@dataclass/ i\'$$'\n''@dataclass_json\''' $(API_DATA_MODELS) - -generate: add-dataclass-json-decorators +generate: add-noqa include .env export @@ -32,3 +23,14 @@ lint: formatter: ruff format --check . + +fix-format: + ruff format . + +fix-lint: + ruff check --fix . + +fix: fix-format fix-lint + +serve-docs-locally: + mkdocs serve diff --git a/mkdocs.yml b/mkdocs.yml index 13d6481..7880188 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -43,7 +43,11 @@ plugins: handlers: python: import: - - url: https://docs.python-requests.org/en/master/objects.inv + - url: https://docs.python.org/3/objects.inv # Add Python's objects.inv + domains: [ std, py ] + - url: https://docs.python-requests.org/en/master/objects.inv # Add requests objects.inv + domains: [ std, py ] + - url: https://docs.pydantic.dev/latest/objects.inv # Add Pydantic's objects.inv domains: [ std, py ] options: members_order: source diff --git a/pyproject.toml b/pyproject.toml index 5cd21c2..25f02f8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,11 +40,12 @@ convention = "google" field-constraints = true snake-case-field = true strip-default-none = false -target-python-version = "3.7" +target-python-version = "3.8" use-title-as-name = true disable-timestamp = true enable-version-header = true use-double-quotes = true use-subclass-enum=true +use-standard-collections=true input-file-type = "openapi" -output-model-type = "dataclasses.dataclass" \ No newline at end of file +output-model-type = "pydantic_v2.BaseModel" \ No newline at end of file diff --git a/setup.py b/setup.py index 799548a..8af30f6 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ setuptools.setup( name="glassflow", - version="2.0.8", + version="2.1.0", author="glassflow", description="GlassFlow Python Client SDK", url="https://www.glassflow.dev/docs", @@ -22,7 +22,7 @@ "urllib3==1.26.15", "certifi>=2023.7.22", "charset-normalizer>=3.2.0", - "dataclasses-json>=0.6.4", + "pydantic>=2.10.6", "idna>=3.4", "jsonpath-python>=1.0.6 ", "marshmallow>=3.19.0", @@ -34,19 +34,25 @@ "typing-inspect>=0.9.0", "typing_extensions>=4.7.1", "python-dotenv==1.0.1", + "eval_type_backport>=0.2.0", ], extras_require={ "dev": [ - "pylint==2.16.2", - "pytest==8.3.2", - "pytest-cov==5.0.0", - "datamodel-code-generator[http]==0.26.0", - "requests-mock==1.12.1", - "isort==5.13.2", - "ruff==0.6.3", + "pylint>=2.16.2", + "pytest>=8.3.2", + "pytest-cov>=5.0.0", + "datamodel-code-generator[http]>=0.27.0", + "requests-mock>=1.12.1", + "isort>=5.13.2", + "ruff>=0.9.0", ] }, package_dir={"": "src"}, python_requires=">=3.8", package_data={"glassflow": ["py.typed"]}, + entry_points={ + "console_scripts": [ + "glassflow = cli.cli:glassflow", + ], + }, ) diff --git a/src/cli/__init__.py b/src/cli/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/cli/cli.py b/src/cli/cli.py new file mode 100644 index 0000000..1dae9eb --- /dev/null +++ b/src/cli/cli.py @@ -0,0 +1,43 @@ +import click + +from .commands import get_started + + +@click.group() +def glassflow(): + """Glassflow CLI - Manage and control Glassflow SDK""" + pass + + +@click.command() +@click.argument("command", required=False) +def help(command): + """Displays help information about Glassflow CLI and its commands.""" + + commands = { + "get-started": "Initialize Glassflow with an access token.\nUsage: " + "glassflow get-started --token YOUR_TOKEN", + "help": "Shows help information.\nUsage: glassflow help [command]", + } + + if command: + if command in commands: + click.echo(f"ā„¹ļø Help for `{command}`:\n{commands[command]}") + else: + click.echo( + f"āŒ Unknown command: `{command}`. Run `glassflow help` for a " + f"list of commands." + ) + else: + click.echo("šŸ“– Glassflow CLI Help:") + for cmd, desc in commands.items(): + click.echo(f" āžœ {cmd}: {desc.splitlines()[0]}") + click.echo("\nRun `glassflow help ` for more details.") + + +# Add commands to CLI group +glassflow.add_command(get_started) +glassflow.add_command(help) + +if __name__ == "__main__": + glassflow() diff --git a/src/cli/commands/__init__.py b/src/cli/commands/__init__.py new file mode 100644 index 0000000..09e9bb2 --- /dev/null +++ b/src/cli/commands/__init__.py @@ -0,0 +1 @@ +from .get_started import get_started as get_started diff --git a/src/cli/commands/get_started.py b/src/cli/commands/get_started.py new file mode 100644 index 0000000..b43e1de --- /dev/null +++ b/src/cli/commands/get_started.py @@ -0,0 +1,107 @@ +import os + +import click +from dotenv import load_dotenv + + +@click.command() +@click.option( + "--personal-access-token", "-pat", default=None, help="Personal access token." +) +@click.option( + "--env-file", + "-e", + default=".env", + help="Path to the .env file (default: .env in current directory).", +) +def get_started(personal_access_token, env_file): + """Displays a welcome message and setup instructions.""" + + # Load token from .env if not provided in CLI + if personal_access_token is None: + if os.path.exists(env_file): + load_dotenv(env_file) # Load environment variables + personal_access_token = os.getenv("PERSONAL_ACCESS_TOKEN") + else: + click.echo("āš ļø No token provided and .env file not found!", err=True) + return + + if not personal_access_token: + click.echo("āŒ Error: Personal access token is required.", err=True) + return + + click.echo("šŸš€ Welcome to Glassflow! \n") + click.echo( + f"šŸ”‘ Using Personal Access Token: {personal_access_token[:4]}... " + f"(hidden for security)" + ) + click.echo("\nšŸ“ In this getting started guide, we will do the following:") + click.echo("1. Define a data transformation function in Python.\n") + click.echo("2. Create a pipeline with the function.\n") + click.echo("3. Send events to the pipeline.\n") + click.echo("4. Consume transformed events in real-time from the pipeline\n") + click.echo("5. Monitor the pipeline and view logs.\n") + + filename = create_transformation_function() + pipeline = create_space_pipeline(personal_access_token, filename) + send_consume_events(pipeline) + + click.echo( + "\nšŸŽ‰ Congratulations! You have successfully created a pipeline and sent" + " events to it.\n" + ) + click.echo( + "šŸ’» View the logs and monitor the Pipeline in the " + f"Glassflow Web App at https://app.glassflow.dev/pipelines/{pipeline.id}" + ) + + +def create_transformation_function(filename="transform_getting_started.py"): + file_content = """import json +import logging + +def handler(data: dict, log: logging.Logger): + log.info("Echo: " + json.dumps(data)) + data['transformed_by'] = "glassflow" + + return data +""" + with open(filename, "w") as f: + f.write(file_content) + click.echo(f"āœ… Transformation function created in {filename}") + click.echo("The transformation function is:\n") + click.echo(file_content) + click.echo("šŸ“ You can modify the transformation function in the file.") + return filename + + +def create_space_pipeline(personal_access_token, transform_filename): + import glassflow + + # create glassflow client to interact with GlassFlow + client = glassflow.GlassFlowClient(personal_access_token=personal_access_token) + example_space = client.create_space(name="getting-started") + pipeline = client.create_pipeline( + name="getting-started-pipeline", + transformation_file=transform_filename, + space_id=example_space.id, + ) + click.echo(f"āœ… Created a pipeline with pipeline_id {pipeline.id}") + return pipeline + + +def send_consume_events(pipeline): + click.echo("šŸ”„ Sending some generated events to pipeline .....") + data_source = pipeline.get_source() + for i in range(10): + event = {"data": f"hello GF {i}"} + res = data_source.publish(event) + if res.status_code == 200: + click.echo(f"Sent event: {event}") + + click.echo("šŸ“” Consuming transformed events from the pipeline") + data_sink = pipeline.get_sink() + for _ in range(10): + resp = data_sink.consume() + if resp.status_code == 200: + click.echo(f"Consumed event: {resp.event()} ") diff --git a/src/glassflow/__init__.py b/src/glassflow/__init__.py index e3b7ab2..45473e8 100644 --- a/src/glassflow/__init__.py +++ b/src/glassflow/__init__.py @@ -1,7 +1,10 @@ from .client import GlassFlowClient as GlassFlowClient from .config import GlassFlowConfig as GlassFlowConfig +from .models import api as internal # noqa: F401 from .models import errors as errors +from .models import responses as responses from .pipeline import Pipeline as Pipeline from .pipeline_data import PipelineDataSink as PipelineDataSink from .pipeline_data import PipelineDataSource as PipelineDataSource +from .secret import Secret as Secret from .space import Space as Space diff --git a/src/glassflow/api_client.py b/src/glassflow/api_client.py index 369362a..9188678 100644 --- a/src/glassflow/api_client.py +++ b/src/glassflow/api_client.py @@ -6,89 +6,56 @@ from .config import GlassFlowConfig from .models import errors -from .models.operations.base import BaseRequest, BaseResponse -from .utils import utils as utils class APIClient: glassflow_config = GlassFlowConfig() def __init__(self): + """API client constructor""" super().__init__() self.client = requests_http.Session() - def _get_headers( - self, request: BaseRequest, req_content_type: str | None = None - ) -> dict: - headers = utils.get_req_specific_headers(request) - headers["Accept"] = "application/json" - headers["Gf-Client"] = self.glassflow_config.glassflow_client - headers["User-Agent"] = self.glassflow_config.user_agent - headers["Gf-Python-Version"] = ( - f"{sys.version_info.major}." - f"{sys.version_info.minor}." - f"{sys.version_info.micro}" - ) - - if req_content_type and req_content_type not in ( - "multipart/form-data", - "multipart/mixed", - ): - headers["content-type"] = req_content_type - + def _get_core_headers(self) -> dict: + headers = { + "Accept": "application/json", + "Gf-Client": self.glassflow_config.glassflow_client, + "User-Agent": self.glassflow_config.user_agent, + "Gf-Python-Version": ( + f"{sys.version_info.major}." + f"{sys.version_info.minor}." + f"{sys.version_info.micro}" + ), + } return headers def _request( self, - method: str, - endpoint: str, - request: BaseRequest, - serialization_method: str = "json", - ) -> BaseResponse: - request_type = type(request) - - url = utils.generate_url( - request_type, - self.glassflow_config.server_url, - endpoint, - request, - ) - - req_content_type, data, form = utils.serialize_request_body( - request=request, - request_type=request_type, - request_field_name="request_body", - nullable=False, - optional=True, - serialization_method=serialization_method, - ) - if method == "GET": - data = None - - headers = self._get_headers(request, req_content_type) - query_params = utils.get_query_params(request_type, request) - - # make the request - http_res = self.client.request( - method, url=url, params=query_params, headers=headers, data=data, files=form - ) - content_type = http_res.headers.get("Content-Type") - - res = BaseResponse( - status_code=http_res.status_code, - content_type=content_type, - raw_response=http_res, - ) - - if http_res.status_code in [400, 500]: - out = utils.unmarshal_json(http_res.text, errors.Error) - out.raw_response = http_res - raise out - elif http_res.status_code == 429: - pass - elif 400 < http_res.status_code < 600: - raise errors.ClientError( - "API error occurred", http_res.status_code, http_res.text, http_res + method, + endpoint, + request_headers=None, + json=None, + request_query_params=None, + files=None, + data=None, + ): + headers = self._get_core_headers() + if request_headers: + headers.update(request_headers) + + url = self.glassflow_config.server_url + endpoint + + try: + http_res = self.client.request( + method, + url=url, + params=request_query_params, + headers=headers, + json=json, + files=files, + data=data, ) - - return res + http_res.raise_for_status() + except requests_http.HTTPError as http_err: + raise errors.UnknownError(http_err.response) from http_err + return http_res diff --git a/src/glassflow/client.py b/src/glassflow/client.py index 943d392..5f2e95b 100644 --- a/src/glassflow/client.py +++ b/src/glassflow/client.py @@ -1,11 +1,9 @@ """GlassFlow Python Client to interact with GlassFlow API""" -from __future__ import annotations - from .api_client import APIClient -from .models import errors, operations -from .models.api import PipelineState +from .models import errors, responses from .pipeline import Pipeline +from .secret import Secret from .space import Space @@ -15,9 +13,10 @@ class GlassFlowClient(APIClient): and other resources Attributes: - client: requests.Session object to make HTTP requests to GlassFlow API - glassflow_config: GlassFlowConfig object to store configuration - organization_id: Organization ID of the user. If not provided, + client (requests.Session): Session object to make HTTP requests to GlassFlow API + glassflow_config (GlassFlowConfig): GlassFlow config object to store + configuration + organization_id (str): Organization ID of the user. If not provided, the default organization will be used """ @@ -35,6 +34,8 @@ def __init__( super().__init__() self.personal_access_token = personal_access_token self.organization_id = organization_id + self.request_headers = {"Personal-Access-Token": self.personal_access_token} + self.request_query_params = {"organization_id": self.organization_id} def get_pipeline(self, pipeline_id: str) -> Pipeline: """Gets a Pipeline object from the GlassFlow API @@ -46,13 +47,15 @@ def get_pipeline(self, pipeline_id: str) -> Pipeline: Pipeline: Pipeline object from the GlassFlow API Raises: - PipelineNotFoundError: Pipeline does not exist - UnauthorizedError: User does not have permission to perform the - requested operation - ClientError: GlassFlow Client Error + errors.PipelineNotFoundError: Pipeline does not exist + errors.PipelineUnauthorizedError: User does not have permission to + perform the requested operation + errors.ClientError: GlassFlow Client Error """ return Pipeline( - personal_access_token=self.personal_access_token, id=pipeline_id + personal_access_token=self.personal_access_token, + id=pipeline_id, + organization_id=self.organization_id, ).fetch() def create_pipeline( @@ -66,7 +69,7 @@ def create_pipeline( sink_kind: str = None, sink_config: dict = None, env_vars: list[dict[str, str]] = None, - state: PipelineState = "running", + state: str = "running", metadata: dict = None, ) -> Pipeline: """Creates a new GlassFlow pipeline @@ -92,7 +95,7 @@ def create_pipeline( Pipeline: New pipeline Raises: - UnauthorizedError: User does not have permission to perform + errors.PipelineUnauthorizedError: User does not have permission to perform the requested operation """ return Pipeline( @@ -112,8 +115,8 @@ def create_pipeline( ).create() def list_pipelines( - self, space_ids: list[str] | None = None - ) -> operations.ListPipelinesResponse: + self, space_ids: list[str] = None + ) -> responses.ListPipelinesResponse: """ Lists all pipelines in the GlassFlow API @@ -122,73 +125,19 @@ def list_pipelines( If not specified, all the pipelines will be listed. Returns: - ListPipelinesResponse: Response object with the pipelines listed + responses.ListPipelinesResponse: Response object with the pipelines listed Raises: - UnauthorizedError: User does not have permission to perform the - requested operation + errors.PipelineUnauthorizedError: User does not have permission to + perform the requested operation """ - request = operations.ListPipelinesRequest( - space_id=space_ids, - organization_id=self.organization_id, - personal_access_token=self.personal_access_token, - ) - try: - res = self._request( - method="GET", - endpoint="/pipelines", - request=request, - ) - res_json = res.raw_response.json() - except errors.ClientError as e: - if e.status_code == 401: - raise errors.UnauthorizedError(e.raw_response) from e - else: - raise e - - return operations.ListPipelinesResponse( - content_type=res.content_type, - status_code=res.status_code, - raw_response=res.raw_response, - total_amount=res_json["total_amount"], - pipelines=res_json["pipelines"], - ) - def list_spaces(self) -> operations.ListSpacesResponse: - """ - Lists all GlassFlow spaces in the GlassFlow API - - Returns: - ListSpacesResponse: Response object with the spaces listed - - Raises: - UnauthorizedError: User does not have permission to perform the - requested operation - """ - request = operations.ListSpacesRequest( - organization_id=self.organization_id, - personal_access_token=self.personal_access_token, - ) - try: - res = self._request( - method="GET", - endpoint="/spaces", - request=request, - ) - res_json = res.raw_response.json() - except errors.ClientError as e: - if e.status_code == 401: - raise errors.UnauthorizedError(e.raw_response) from e - else: - raise e - - return operations.ListSpacesResponse( - content_type=res.content_type, - status_code=res.status_code, - raw_response=res.raw_response, - total_amount=res_json["total_amount"], - spaces=res_json["spaces"], + endpoint = "/pipelines" + query_params = {"space_id": space_ids} if space_ids else {} + http_res = self._request( + method="GET", endpoint=endpoint, request_query_params=query_params ) + return responses.ListPipelinesResponse(**http_res.json()) def create_space( self, @@ -203,7 +152,7 @@ def create_space( Space: New space Raises: - UnauthorizedError: User does not have permission to perform + errors.SpaceUnauthorizedError: User does not have permission to perform the requested operation """ return Space( @@ -211,3 +160,85 @@ def create_space( personal_access_token=self.personal_access_token, organization_id=self.organization_id, ).create() + + def list_spaces(self) -> responses.ListSpacesResponse: + """ + Lists all GlassFlow spaces in the GlassFlow API + + Returns: + response.ListSpacesResponse: Response object with the spaces listed + + Raises: + errors.SpaceUnauthorizedError: User does not have permission to perform the + requested operation + """ + + endpoint = "/spaces" + http_res = self._request(method="GET", endpoint=endpoint) + return responses.ListSpacesResponse(**http_res.json()) + + def create_secret(self, key: str, value: str) -> Secret: + """ + Creates a new secret + + Args: + key: Secret key (must be unique in your organization) + value: Secret value + + Returns: + Secret: New secret + + Raises: + errors.SecretUnauthorizedError: User does not have permission to perform the + requested operation + """ + return Secret( + key=key, + value=value, + personal_access_token=self.personal_access_token, + organization_id=self.organization_id, + ).create() + + def list_secrets(self) -> responses.ListSecretsResponse: + """ + Lists all GlassFlow secrets in the GlassFlow API + + Returns: + responses.ListSecretsResponse: Response object with the secrets listed + + Raises: + errors.SecretUnauthorizedError: User does not have permission to perform the + requested operation + """ + endpoint = "/secrets" + http_res = self._request(method="GET", endpoint=endpoint) + return responses.ListSecretsResponse(**http_res.json()) + + def _request( + self, + method, + endpoint, + request_headers=None, + json=None, + request_query_params=None, + files=None, + data=None, + ): + headers = {**self.request_headers, **(request_headers or {})} + query_params = {**self.request_query_params, **(request_query_params or {})} + + try: + http_res = super()._request( + method=method, + endpoint=endpoint, + request_headers=headers, + json=json, + request_query_params=query_params, + files=files, + data=data, + ) + return http_res + except errors.UnknownError as e: + if e.status_code == 401: + raise errors.UnauthorizedError(e.raw_response) from e + raise e diff --git a/src/glassflow/config.py b/src/glassflow/config.py index 206a2e8..8683fab 100644 --- a/src/glassflow/config.py +++ b/src/glassflow/config.py @@ -1,9 +1,9 @@ -from dataclasses import dataclass from importlib.metadata import version +from pydantic import BaseModel -@dataclass -class GlassFlowConfig: + +class GlassFlowConfig(BaseModel): """Configuration object for GlassFlowClient Attributes: diff --git a/src/glassflow/models/api/__init__.py b/src/glassflow/models/api/__init__.py index cf6e5a9..dc2a47f 100644 --- a/src/glassflow/models/api/__init__.py +++ b/src/glassflow/models/api/__init__.py @@ -1,35 +1,35 @@ from .api import ( ConsumeOutputEvent, CreatePipeline, + CreateSecret, CreateSpace, - EventContext, FunctionEnvironments, - FunctionLogEntry, - FunctionLogs, GetDetailedSpacePipeline, + ListAccessTokens, + ListPipelines, + ListSpaceScopes, + Pipeline, + PipelineFunctionOutput, PipelineState, - SeverityCodeInput, SinkConnector, SourceConnector, - SpacePipeline, - SpaceScope, - UpdatePipeline, + Space, ) __all__ = [ - "CreatePipeline", - "FunctionEnvironments", - "FunctionLogEntry", - "FunctionLogs", - "GetDetailedSpacePipeline", - "PipelineState", - "SeverityCodeInput", - "SinkConnector", - "SourceConnector", - "SpacePipeline", - "UpdatePipeline", - "SpaceScope", "CreateSpace", - "EventContext", + "Space", "ConsumeOutputEvent", + "CreateSecret", + "Pipeline", + "GetDetailedSpacePipeline", + "ListAccessTokens", + "SourceConnector", + "SinkConnector", + "CreatePipeline", + "PipelineState", + "PipelineFunctionOutput", + "ListSpaceScopes", + "ListPipelines", + "FunctionEnvironments", ] diff --git a/src/glassflow/models/api/api.py b/src/glassflow/models/api/api.py index 9801603..e0e876a 100644 --- a/src/glassflow/models/api/api.py +++ b/src/glassflow/models/api/api.py @@ -1,56 +1,64 @@ # ruff: noqa # generated by datamodel-codegen: # filename: https://api.glassflow.dev/v1/openapi.yaml -# version: 0.26.0 +# version: 0.27.2 from __future__ import annotations -from dataclasses_json import dataclass_json -from dataclasses import dataclass +from datetime import datetime from enum import Enum -from typing import Any, Dict, List, Optional, Union +from typing import Any, Literal, Optional, Union +from pydantic import BaseModel, ConfigDict, Field, RootModel -@dataclass_json -@dataclass -class Error: + +class Error(BaseModel): detail: str -@dataclass_json -@dataclass -class CreateOrganization: +class CreateOrganization(BaseModel): name: str -@dataclass_json -@dataclass class Organization(CreateOrganization): id: str -@dataclass_json -@dataclass class OrganizationScope(Organization): role: str -OrganizationScopes = List[OrganizationScope] +class OrganizationScopes(RootModel[list[OrganizationScope]]): + root: list[OrganizationScope] + + +class SecretKey(RootModel[str]): + root: str + + +class Type(str, Enum): + organization = "organization" -@dataclass_json -@dataclass -class SignUp: +class SecretRef(BaseModel): + type: Type + key: SecretKey + + +class CreateSecret(BaseModel): + key: SecretKey + value: str + + +class SignUp(BaseModel): access_token: str id_token: str -@dataclass_json -@dataclass -class BasePipeline: +class BasePipeline(BaseModel): name: str space_id: str - metadata: Dict[str, Any] + metadata: dict[str, Any] class PipelineState(str, Enum): @@ -58,82 +66,51 @@ class PipelineState(str, Enum): paused = "paused" -@dataclass_json -@dataclass -class FunctionEnvironment: +class FunctionEnvironment(BaseModel): name: str value: str -FunctionEnvironments = Optional[List[FunctionEnvironment]] +class FunctionEnvironments(RootModel[Optional[list[FunctionEnvironment]]]): + root: Optional[list[FunctionEnvironment]] = None class Kind(str, Enum): google_pubsub = "google_pubsub" -@dataclass_json -@dataclass -class Config: +class Config(BaseModel): project_id: str subscription_id: str credentials_json: str -@dataclass_json -@dataclass -class SourceConnector1: - kind: Kind - config: Config - - class Kind1(str, Enum): amazon_sqs = "amazon_sqs" -@dataclass_json -@dataclass -class Config1: +class Config1(BaseModel): queue_url: str aws_region: str aws_access_key: str aws_secret_key: str -@dataclass_json -@dataclass -class SourceConnector2: - kind: Kind1 - config: Config1 - - class Kind2(str, Enum): postgres = "postgres" -@dataclass_json -@dataclass -class Config2: +class Config2(BaseModel): db_host: str + db_port: Optional[str] = "5432" db_user: str db_pass: str db_name: str - replication_slot: str - db_port: Optional[str] = "5432" db_sslmode: Optional[str] = None + replication_slot: str publication: Optional[str] = None replication_output_plugin_name: Optional[str] = "wal2json" - replication_output_plugin_args: Optional[List[str]] = None - - -@dataclass_json -@dataclass -class SourceConnector3: - kind: Kind2 - config: Config2 - - -SourceConnector = Optional[Union[SourceConnector1, SourceConnector2, SourceConnector3]] + replication_output_plugin_args: Optional[list[str]] = None class Kind3(str, Enum): @@ -148,35 +125,33 @@ class Method(str, Enum): delete = "DELETE" -@dataclass_json -@dataclass -class Header: +class Header(BaseModel): name: str value: str -@dataclass_json -@dataclass -class Config3: +class Config3(BaseModel): url: str method: Method - headers: List[Header] + headers: list[Header] + + +class SinkConnectorWebhookConfigHeadersListItem(BaseModel): + name: str + value: str -@dataclass_json -@dataclass -class SinkConnector1: - kind: Kind3 - config: Config3 +class SinkConnectorWebhookConfigHeadersList( + RootModel[list[SinkConnectorWebhookConfigHeadersListItem]] +): + root: list[SinkConnectorWebhookConfigHeadersListItem] class Kind4(str, Enum): clickhouse = "clickhouse" -@dataclass_json -@dataclass -class Config4: +class Config4(BaseModel): addr: str database: str username: str @@ -184,20 +159,11 @@ class Config4: table: str -@dataclass_json -@dataclass -class SinkConnector2: - kind: Kind4 - config: Config4 - - class Kind5(str, Enum): amazon_s3 = "amazon_s3" -@dataclass_json -@dataclass -class Config5: +class Config5(BaseModel): s3_bucket: str s3_key: str aws_region: str @@ -205,20 +171,11 @@ class Config5: aws_secret_key: str -@dataclass_json -@dataclass -class SinkConnector3: - kind: Kind5 - config: Config5 - - class Kind6(str, Enum): snowflake_cdc_json = "snowflake_cdc_json" -@dataclass_json -@dataclass -class Config6: +class Config6(BaseModel): account: str warehouse: str db_user: str @@ -230,110 +187,75 @@ class Config6: db_role: Optional[str] = None -@dataclass_json -@dataclass -class SinkConnector4: - kind: Kind6 - config: Config6 - - class Kind7(str, Enum): pinecone_json = "pinecone_json" -@dataclass_json -@dataclass -class ClientHeader: +class ClientHeader(BaseModel): name: str value: str -@dataclass_json -@dataclass -class Config7: +class Config7(BaseModel): api_key: str api_host: str - index_host: str api_source_tag: Optional[str] = None - client_headers: Optional[List[ClientHeader]] = None + index_host: str + client_headers: Optional[list[ClientHeader]] = None -@dataclass_json -@dataclass -class SinkConnector5: - kind: Kind7 - config: Config7 +class Kind8(str, Enum): + mongodb_json = "mongodb_json" -SinkConnector = Optional[ - Union[ - SinkConnector1, SinkConnector2, SinkConnector3, SinkConnector4, SinkConnector5 - ] -] +class Config8(BaseModel): + connection_uri: str + db_name: str -@dataclass_json -@dataclass class Pipeline(BasePipeline): id: str - created_at: str + created_at: datetime state: PipelineState -@dataclass_json -@dataclass class SpacePipeline(Pipeline): space_name: str -@dataclass_json -@dataclass -class GetDetailedSpacePipeline(SpacePipeline): - source_connector: SourceConnector - sink_connector: SinkConnector +class PipelineFunctionOutput(BaseModel): environments: FunctionEnvironments -@dataclass_json -@dataclass -class PipelineFunctionOutput: - environments: FunctionEnvironments - +class SpacePipelines(RootModel[list[SpacePipeline]]): + root: list[SpacePipeline] -SpacePipelines = List[SpacePipeline] - -@dataclass_json -@dataclass -class CreateSpace: +class CreateSpace(BaseModel): name: str -@dataclass_json -@dataclass -class UpdateSpace: +class UpdateSpace(BaseModel): name: str -@dataclass_json -@dataclass class Space(CreateSpace): id: str - created_at: str + created_at: datetime -@dataclass_json -@dataclass class SpaceScope(Space): permission: str -SpaceScopes = List[SpaceScope] +class SpaceScopes(RootModel[list[SpaceScope]]): + root: list[SpaceScope] -@dataclass_json -@dataclass -class Payload: +class Payload(BaseModel): + model_config = ConfigDict( + extra="allow", + ) message: str @@ -344,59 +266,64 @@ class SeverityCodeInput(int, Enum): integer_500 = 500 -SeverityCode = int +class SeverityCode(RootModel[int]): + root: int -@dataclass_json -@dataclass -class CreateAccessToken: +class CreateAccessToken(BaseModel): name: str -@dataclass_json -@dataclass class AccessToken(CreateAccessToken): id: str token: str - created_at: str + created_at: datetime -AccessTokens = List[AccessToken] +class AccessTokens(RootModel[list[AccessToken]]): + root: list[AccessToken] -@dataclass_json -@dataclass -class PaginationResponse: +class PaginationResponse(BaseModel): total_amount: int -@dataclass_json -@dataclass -class SourceFile: +class SourceFile(BaseModel): name: str content: str -SourceFiles = List[SourceFile] +class SourceFiles(RootModel[list[SourceFile]]): + root: list[SourceFile] -@dataclass_json -@dataclass -class EventContext: +class EventContext(BaseModel): request_id: str - receive_time: str external_id: Optional[str] = None + receive_time: datetime -PersonalAccessToken = str +class PersonalAccessToken(RootModel[str]): + root: str -QueryRangeMatrix = Optional[Any] +class QueryRangeMatrix(RootModel[Optional[Any]]): + root: Optional[Any] = None -@dataclass_json -@dataclass -class Profile: +class ConnectorValueValue(BaseModel): + value: str + + +class ConnectorValueSecretRef(BaseModel): + secret_ref: SecretRef + + +class ConnectorValueList(RootModel[list[str]]): + root: list[str] + + +class Profile(BaseModel): id: str home_organization: Organization name: str @@ -406,97 +333,276 @@ class Profile: subscriber_id: str -@dataclass_json -@dataclass class ListOrganizationScopes(PaginationResponse): organizations: OrganizationScopes -@dataclass_json -@dataclass -class UpdatePipeline: - name: str - transformation_function: Optional[str] = None - transformation_requirements: Optional[List[str]] = None - requirements_txt: Optional[str] = None - metadata: Optional[Dict[str, Any]] = None - source_connector: Optional[SourceConnector] = None - sink_connector: Optional[SinkConnector] = None - environments: Optional[FunctionEnvironments] = None - - -@dataclass_json -@dataclass -class CreatePipeline(BasePipeline): - transformation_function: Optional[str] = None - transformation_requirements: Optional[List[str]] = None - requirements_txt: Optional[str] = None - source_connector: Optional[SourceConnector] = None - sink_connector: Optional[SinkConnector] = None - environments: Optional[FunctionEnvironments] = None - state: Optional[PipelineState] = None +class Secret(BaseModel): + key: SecretKey -@dataclass_json -@dataclass class ListPipelines(PaginationResponse): pipelines: SpacePipelines -@dataclass_json -@dataclass class ListSpaceScopes(PaginationResponse): spaces: SpaceScopes -@dataclass_json -@dataclass -class FunctionLogEntry: +class FunctionLogEntry(BaseModel): level: str severity_code: SeverityCode - timestamp: str + timestamp: datetime payload: Payload -@dataclass_json -@dataclass class ListAccessTokens(PaginationResponse): access_tokens: AccessTokens -@dataclass_json -@dataclass -class ConsumeInputEvent: +class ConsumeInputEvent(BaseModel): + req_id: Optional[str] = Field(None, description="DEPRECATED") + receive_time: Optional[datetime] = Field(None, description="DEPRECATED") payload: Any event_context: EventContext - req_id: Optional[str] = None - receive_time: Optional[str] = None -@dataclass_json -@dataclass -class ConsumeOutputEvent: +class ConsumeOutputEvent(BaseModel): + req_id: Optional[str] = Field(None, description="DEPRECATED") + receive_time: Optional[datetime] = Field(None, description="DEPRECATED") payload: Any event_context: EventContext status: str - req_id: Optional[str] = None - receive_time: Optional[str] = None response: Optional[Any] = None error_details: Optional[str] = None stack_trace: Optional[str] = None -@dataclass_json -@dataclass -class ListPersonalAccessTokens: - tokens: List[PersonalAccessToken] +class ListPersonalAccessTokens(BaseModel): + tokens: list[PersonalAccessToken] -@dataclass_json -@dataclass -class PipelineInputQueueRelativeLatencyMetricsResponse: +class PipelineInputQueueRelativeLatencyMetricsResponse(BaseModel): input_queue_total_push_events: QueryRangeMatrix input_queue_latency: QueryRangeMatrix -FunctionLogs = List[FunctionLogEntry] +class ConnectorValue(RootModel[Union[ConnectorValueValue, ConnectorValueSecretRef]]): + root: Union[ConnectorValueValue, ConnectorValueSecretRef] + + +class Secrets(RootModel[list[Secret]]): + root: list[Secret] + + +class Configuration(BaseModel): + project_id: ConnectorValue + subscription_id: ConnectorValue + credentials_json: ConnectorValue + + +class SourceConnectorGooglePubSub(BaseModel): + kind: Literal["google_pubsub"] + config: Optional[Config] = None + configuration: Optional[Configuration] = None + + +class Configuration1(BaseModel): + queue_url: ConnectorValue + aws_region: ConnectorValue + aws_access_key: ConnectorValue + aws_secret_key: ConnectorValue + + +class SourceConnectorAmazonSQS(BaseModel): + kind: Literal["amazon_sqs"] + config: Optional[Config1] = None + configuration: Optional[Configuration1] = None + + +class Configuration2(BaseModel): + db_host: ConnectorValue + db_port: Optional[ConnectorValue] = None + db_user: ConnectorValue + db_pass: ConnectorValue + db_name: ConnectorValue + db_sslmode: Optional[ConnectorValue] = None + replication_slot: ConnectorValue + publication: Optional[ConnectorValue] = None + replication_output_plugin_name: Optional[ConnectorValue] = None + replication_output_plugin_args: Optional[ConnectorValueList] = None + + +class SourceConnectorPostgres(BaseModel): + kind: Literal["postgres"] + config: Optional[Config2] = None + configuration: Optional[Configuration2] = None + + +class Configuration3(BaseModel): + url: ConnectorValue + method: ConnectorValue + headers: SinkConnectorWebhookConfigHeadersList + + +class SinkConnectorWebhook(BaseModel): + kind: Literal["webhook"] + config: Optional[Config3] = None + configuration: Optional[Configuration3] = None + + +class Configuration4(BaseModel): + addr: ConnectorValue + database: ConnectorValue + username: ConnectorValue + password: ConnectorValue + table: ConnectorValue + + +class SinkConnectorClickhouse(BaseModel): + kind: Literal["clickhouse"] + config: Optional[Config4] = None + configuration: Optional[Configuration4] = None + + +class Configuration5(BaseModel): + s3_bucket: ConnectorValue + s3_key: ConnectorValue + aws_region: ConnectorValue + aws_access_key: ConnectorValue + aws_secret_key: ConnectorValue + + +class SinkConnectorAmazonS3(BaseModel): + kind: Literal["amazon_s3"] + config: Optional[Config5] = None + configuration: Optional[Configuration5] = None + + +class Configuration6(BaseModel): + account: ConnectorValue + warehouse: ConnectorValue + db_user: ConnectorValue + db_pass: ConnectorValue + db_name: ConnectorValue + db_schema: ConnectorValue + db_host: Optional[ConnectorValue] = None + db_port: Optional[ConnectorValue] = None + db_role: Optional[ConnectorValue] = None + + +class SinkConnectorSnowflakeCDCJSON(BaseModel): + kind: Literal["snowflake_cdc_json"] + config: Optional[Config6] = None + configuration: Optional[Configuration6] = None + + +class ClientHeader1(BaseModel): + name: str + value: ConnectorValue + + +class Configuration7(BaseModel): + api_key: ConnectorValue + api_host: ConnectorValue + api_source_tag: Optional[ConnectorValue] = None + index_host: ConnectorValue + client_headers: Optional[list[ClientHeader1]] = None + + +class SinkConnectorPineconeJSON(BaseModel): + kind: Literal["pinecone_json"] + config: Optional[Config7] = None + configuration: Optional[Configuration7] = None + + +class Configuration8(BaseModel): + connection_uri: ConnectorValue + db_name: ConnectorValue + + +class SinkConnectorMongoDBJSON(BaseModel): + kind: Literal["mongodb_json"] + config: Optional[Config8] = None + configuration: Optional[Configuration8] = None + + +class FunctionLogs(RootModel[list[FunctionLogEntry]]): + root: list[FunctionLogEntry] + + +class ListOrganizationSecrets(PaginationResponse): + secrets: Secrets + + +class SourceConnector( + RootModel[ + Optional[ + Union[ + SourceConnectorGooglePubSub, + SourceConnectorAmazonSQS, + SourceConnectorPostgres, + ] + ] + ] +): + root: Optional[ + Union[ + SourceConnectorGooglePubSub, + SourceConnectorAmazonSQS, + SourceConnectorPostgres, + ] + ] = Field(None, discriminator="kind") + + +class SinkConnector( + RootModel[ + Optional[ + Union[ + SinkConnectorWebhook, + SinkConnectorClickhouse, + SinkConnectorAmazonS3, + SinkConnectorSnowflakeCDCJSON, + SinkConnectorPineconeJSON, + SinkConnectorMongoDBJSON, + ] + ] + ] +): + root: Optional[ + Union[ + SinkConnectorWebhook, + SinkConnectorClickhouse, + SinkConnectorAmazonS3, + SinkConnectorSnowflakeCDCJSON, + SinkConnectorPineconeJSON, + SinkConnectorMongoDBJSON, + ] + ] = Field(None, discriminator="kind") + + +class GetDetailedSpacePipeline(SpacePipeline): + source_connector: SourceConnector + sink_connector: SinkConnector + environments: FunctionEnvironments + + +class UpdatePipeline(BaseModel): + name: str + transformation_function: Optional[str] = None + transformation_requirements: Optional[list[str]] = None + requirements_txt: Optional[str] = None + metadata: Optional[dict[str, Any]] = None + source_connector: Optional[SourceConnector] = None + sink_connector: Optional[SinkConnector] = None + environments: Optional[FunctionEnvironments] = None + + +class CreatePipeline(BasePipeline): + transformation_function: Optional[str] = None + transformation_requirements: Optional[list[str]] = None + requirements_txt: Optional[str] = None + source_connector: Optional[SourceConnector] = None + sink_connector: Optional[SinkConnector] = None + environments: Optional[FunctionEnvironments] = None + state: Optional[PipelineState] = None diff --git a/src/glassflow/models/errors/__init__.py b/src/glassflow/models/errors/__init__.py index 6a8efd5..895edc2 100644 --- a/src/glassflow/models/errors/__init__.py +++ b/src/glassflow/models/errors/__init__.py @@ -1,21 +1,46 @@ from .clienterror import ( ClientError, + UnauthorizedError, + UnknownContentTypeError, + UnknownError, +) +from .error import Error +from .pipeline import ( + ConnectorConfigValueError, PipelineAccessTokenInvalidError, + PipelineArtifactStillInProgressError, PipelineNotFoundError, + PipelineTooManyRequestsError, + PipelineUnauthorizedError, +) +from .secret import ( + SecretInvalidKeyError, + SecretNotFoundError, + SecretUnauthorizedError, +) +from .space import ( SpaceIsNotEmptyError, SpaceNotFoundError, - UnauthorizedError, - UnknownContentTypeError, + SpaceUnauthorizedError, ) -from .error import Error __all__ = [ "Error", "ClientError", - "PipelineNotFoundError", - "PipelineAccessTokenInvalidError", - "SpaceNotFoundError", "UnknownContentTypeError", "UnauthorizedError", + "ConnectorConfigValueError", + "SecretInvalidKeyError", + "SecretNotFoundError", + "SecretUnauthorizedError", + "SpaceNotFoundError", "SpaceIsNotEmptyError", + "SpaceUnauthorizedError", + "PipelineArtifactStillInProgressError", + "PipelineNotFoundError", + "PipelineAccessTokenInvalidError", + "PipelineAccessTokenInvalidError", + "PipelineTooManyRequestsError", + "PipelineUnauthorizedError", + "UnknownError", ] diff --git a/src/glassflow/models/errors/clienterror.py b/src/glassflow/models/errors/clienterror.py index 5c1cd18..1a4f02b 100644 --- a/src/glassflow/models/errors/clienterror.py +++ b/src/glassflow/models/errors/clienterror.py @@ -51,30 +51,6 @@ def __str__(self) -> str: return f"{self.detail}: Status {self.status_code}{body}" -class PipelineNotFoundError(ClientError): - """Error caused by a pipeline ID not found.""" - - def __init__(self, pipeline_id: str, raw_response: requests_http.Response): - super().__init__( - detail=f"Pipeline ID {pipeline_id} does not exist", - status_code=404, - body=raw_response.text, - raw_response=raw_response, - ) - - -class SpaceNotFoundError(ClientError): - """Error caused by a pipeline ID not found.""" - - def __init__(self, space_id: str, raw_response: requests_http.Response): - super().__init__( - detail=f"Space ID {space_id} does not exist", - status_code=404, - body=raw_response.text, - raw_response=raw_response, - ) - - class UnauthorizedError(ClientError): """Error caused by a user not authorized.""" @@ -87,18 +63,6 @@ def __init__(self, raw_response: requests_http.Response): ) -class PipelineAccessTokenInvalidError(ClientError): - """Error caused by invalid access token.""" - - def __init__(self, raw_response: requests_http.Response): - super().__init__( - detail="The Pipeline Access Token used is invalid", - status_code=401, - body=raw_response.text, - raw_response=raw_response, - ) - - class UnknownContentTypeError(ClientError): """Error caused by an unknown content type response.""" @@ -112,13 +76,13 @@ def __init__(self, raw_response: requests_http.Response): ) -class SpaceIsNotEmptyError(ClientError): - """Error caused by trying to delete a space that is not empty.""" +class UnknownError(ClientError): + """Error caused by an unknown error.""" def __init__(self, raw_response: requests_http.Response): super().__init__( - detail=raw_response.json()["msg"], - status_code=409, + detail="Error in getting response from GlassFlow", + status_code=raw_response.status_code, body=raw_response.text, raw_response=raw_response, ) diff --git a/src/glassflow/models/errors/error.py b/src/glassflow/models/errors/error.py index 8633761..c38e600 100644 --- a/src/glassflow/models/errors/error.py +++ b/src/glassflow/models/errors/error.py @@ -1,25 +1,14 @@ -from __future__ import annotations +from pydantic import BaseModel -import dataclasses -from dataclasses_json import Undefined, dataclass_json - -from glassflow import utils - - -@dataclass_json(undefined=Undefined.EXCLUDE) -@dataclasses.dataclass -class Error(Exception): +class Error(BaseModel): """Bad request error response Attributes: - message: A message describing the error - + detail: A message describing the error """ - detail: str = dataclasses.field( - metadata={"dataclasses_json": {"letter_case": utils.get_field_name("detail")}} - ) + detail: str def __str__(self) -> str: - return utils.marshal_json(self, type(self)) + return self.model_dump_json() diff --git a/src/glassflow/models/errors/pipeline.py b/src/glassflow/models/errors/pipeline.py new file mode 100644 index 0000000..1f02c01 --- /dev/null +++ b/src/glassflow/models/errors/pipeline.py @@ -0,0 +1,73 @@ +from .clienterror import ClientError, requests_http + + +class ConnectorConfigValueError(Exception): + """Value error for missing connector settings.""" + + def __init__(self, connector_type: str): + super().__init__( + f"{connector_type}_kind and {connector_type}_config " + f"or {connector_type}_config_secret_refs must be provided" + ) + + +class PipelineNotFoundError(ClientError): + """Error caused by a pipeline ID not found.""" + + def __init__(self, pipeline_id: str, raw_response: requests_http.Response): + super().__init__( + detail=f"Pipeline ID {pipeline_id} does not exist", + status_code=raw_response.status_code, + body=raw_response.text, + raw_response=raw_response, + ) + + +class PipelineUnauthorizedError(ClientError): + """Pipeline operation not authorized, invalid Personal Access Token""" + + def __init__(self, pipeline_id: str, raw_response: requests_http.Response): + super().__init__( + detail=f"Unauthorized request on pipeline {pipeline_id}, " + f"Personal Access Token used is invalid", + status_code=raw_response.status_code, + body=raw_response.text, + raw_response=raw_response, + ) + + +class PipelineArtifactStillInProgressError(ClientError): + """Error returned when the pipeline artifact is still being processed.""" + + def __init__(self, pipeline_id: str, raw_response: requests_http.Response): + super().__init__( + detail=f"Artifact from pipeline {pipeline_id} " + f"is still in process, try again later.", + status_code=raw_response.status_code, + body=raw_response.text, + raw_response=raw_response, + ) + + +class PipelineTooManyRequestsError(ClientError): + """Error caused by too many requests to a pipeline.""" + + def __init__(self, raw_response: requests_http.Response): + super().__init__( + detail="Too many requests", + status_code=raw_response.status_code, + body=raw_response.text, + raw_response=raw_response, + ) + + +class PipelineAccessTokenInvalidError(ClientError): + """Error caused by invalid access token.""" + + def __init__(self, raw_response: requests_http.Response): + super().__init__( + detail="The Pipeline Access Token used is invalid", + status_code=raw_response.status_code, + body=raw_response.text, + raw_response=raw_response, + ) diff --git a/src/glassflow/models/errors/secret.py b/src/glassflow/models/errors/secret.py new file mode 100644 index 0000000..65371e8 --- /dev/null +++ b/src/glassflow/models/errors/secret.py @@ -0,0 +1,36 @@ +from .clienterror import ClientError, requests_http + + +class SecretNotFoundError(ClientError): + """Error caused by a Secret Key not found.""" + + def __init__(self, secret_key: str, raw_response: requests_http.Response): + super().__init__( + detail=f"Secret Key {secret_key} does not exist", + status_code=404, + body=raw_response.text, + raw_response=raw_response, + ) + + +class SecretUnauthorizedError(ClientError): + """Secret operation not authorized, invalid Personal Access Token""" + + def __init__(self, secret_key: str, raw_response: requests_http.Response): + super().__init__( + detail=f"Unauthorized request on Secret {secret_key}, " + f"Personal Access Token used is invalid", + status_code=raw_response.status_code, + body=raw_response.text, + raw_response=raw_response, + ) + + +class SecretInvalidKeyError(Exception): + """Error caused by a Secret Key has invalid format.""" + + def __init__(self, secret_key: str): + super().__init__( + f"Secret key {secret_key} has invalid format, it must start with a letter, " + f"and it can only contain characters in a-zA-Z0-9_" + ) diff --git a/src/glassflow/models/errors/space.py b/src/glassflow/models/errors/space.py new file mode 100644 index 0000000..c5519f0 --- /dev/null +++ b/src/glassflow/models/errors/space.py @@ -0,0 +1,38 @@ +from .clienterror import ClientError, requests_http + + +class SpaceNotFoundError(ClientError): + """Error caused by a space ID not found.""" + + def __init__(self, space_id: str, raw_response: requests_http.Response): + super().__init__( + detail=f"Space ID {space_id} does not exist", + status_code=404, + body=raw_response.text, + raw_response=raw_response, + ) + + +class SpaceIsNotEmptyError(ClientError): + """Error caused by trying to delete a space that is not empty.""" + + def __init__(self, raw_response: requests_http.Response): + super().__init__( + detail=raw_response.json()["msg"], + status_code=409, + body=raw_response.text, + raw_response=raw_response, + ) + + +class SpaceUnauthorizedError(ClientError): + """Space operation not authorized, invalid Personal Access Token""" + + def __init__(self, space_id: str, raw_response: requests_http.Response): + super().__init__( + detail=f"Unauthorized request on Space {space_id}, " + f"Personal Access Token used is invalid", + status_code=raw_response.status_code, + body=raw_response.text, + raw_response=raw_response, + ) diff --git a/src/glassflow/models/operations/__init__.py b/src/glassflow/models/operations/__init__.py index a0c7916..49cf330 100644 --- a/src/glassflow/models/operations/__init__.py +++ b/src/glassflow/models/operations/__init__.py @@ -1,96 +1,9 @@ -from .access_token import ListAccessTokensRequest, StatusAccessTokenRequest -from .artifact import ( - GetArtifactRequest, - PostArtifactRequest, -) -from .base import ( - BaseManagementRequest, - BasePipelineManagementRequest, - BaseRequest, - BaseResponse, - BaseSpaceManagementDataRequest, -) -from .consumeevent import ( - ConsumeEventRequest, - ConsumeEventResponse, - ConsumeEventResponseBody, -) -from .consumefailed import ( - ConsumeFailedRequest, - ConsumeFailedResponse, - ConsumeFailedResponseBody, -) -from .function import ( - FetchFunctionRequest, - GetFunctionLogsRequest, - GetFunctionLogsResponse, - TestFunctionRequest, - TestFunctionResponse, - UpdateFunctionRequest, -) -from .pipeline import ( - CreatePipelineRequest, - CreatePipelineResponse, - DeletePipelineRequest, - GetPipelineRequest, - GetPipelineResponse, - ListPipelinesRequest, - ListPipelinesResponse, +from glassflow.models.operations.pipeline import ( + CreatePipeline, UpdatePipelineRequest, - UpdatePipelineResponse, -) -from .publishevent import ( - PublishEventRequest, - PublishEventRequestBody, - PublishEventResponse, - PublishEventResponseBody, -) -from .space import ( - CreateSpaceRequest, - CreateSpaceResponse, - DeleteSpaceRequest, - ListSpacesRequest, - ListSpacesResponse, ) __all__ = [ - "BaseManagementRequest", - "BasePipelineManagementRequest", - "BaseRequest", - "BaseResponse", - "BaseSpaceManagementDataRequest", - "ConsumeEventRequest", - "ConsumeEventResponse", - "ConsumeEventResponseBody", - "ConsumeFailedRequest", - "ConsumeFailedResponse", - "ConsumeFailedResponseBody", - "CreatePipelineRequest", - "CreatePipelineResponse", - "DeletePipelineRequest", - "DeleteSpaceRequest", - "GetPipelineRequest", - "GetPipelineResponse", - "ListPipelinesRequest", - "ListPipelinesResponse", - "ListAccessTokensRequest", - "PublishEventRequest", - "PublishEventRequestBody", - "PublishEventResponse", - "PublishEventResponseBody", - "GetArtifactRequest", - "GetFunctionLogsRequest", - "GetFunctionLogsResponse", - "StatusAccessTokenRequest", - "ListSpacesResponse", - "ListSpacesRequest", - "CreateSpaceRequest", - "CreateSpaceResponse", + "CreatePipeline", "UpdatePipelineRequest", - "UpdatePipelineResponse", - "UpdateFunctionRequest", - "FetchFunctionRequest", - "PostArtifactRequest", - "TestFunctionRequest", - "TestFunctionResponse", ] diff --git a/src/glassflow/models/operations/access_token.py b/src/glassflow/models/operations/access_token.py deleted file mode 100644 index 1832fc4..0000000 --- a/src/glassflow/models/operations/access_token.py +++ /dev/null @@ -1,25 +0,0 @@ -from __future__ import annotations - -import dataclasses - -from .base import BasePipelineDataRequest, BasePipelineManagementRequest - - -@dataclasses.dataclass -class StatusAccessTokenRequest(BasePipelineDataRequest): - """Request check the status of an access token - - Attributes: - pipeline_id: The id of the pipeline - organization_id: The id of the organization - x_pipeline_access_token: The access token of the pipeline - - """ - - pass - - -@dataclasses.dataclass -class ListAccessTokensRequest(BasePipelineManagementRequest): - page_size: int = 50 - page: int = 1 diff --git a/src/glassflow/models/operations/artifact.py b/src/glassflow/models/operations/artifact.py deleted file mode 100644 index daad296..0000000 --- a/src/glassflow/models/operations/artifact.py +++ /dev/null @@ -1,23 +0,0 @@ -from __future__ import annotations - -import dataclasses - -from .base import BasePipelineManagementRequest - - -@dataclasses.dataclass -class GetArtifactRequest(BasePipelineManagementRequest): - pass - - -@dataclasses.dataclass -class PostArtifactRequest(BasePipelineManagementRequest): - file: str | None = dataclasses.field( - default=None, - metadata={ - "multipart_form": { - "field_name": "file", - } - }, - ) - requirementsTxt: str | None = dataclasses.field(default=None) diff --git a/src/glassflow/models/operations/base.py b/src/glassflow/models/operations/base.py deleted file mode 100644 index e6fce2f..0000000 --- a/src/glassflow/models/operations/base.py +++ /dev/null @@ -1,82 +0,0 @@ -import dataclasses -from typing import Optional - -from requests import Response - -from ...utils import generate_metadata_for_query_parameters - - -@dataclasses.dataclass() -class BaseRequest: - pass - - -@dataclasses.dataclass() -class BaseManagementRequest(BaseRequest): - organization_id: Optional[str] = dataclasses.field( - default=None, - metadata=generate_metadata_for_query_parameters("organization_id"), - ) - personal_access_token: str = dataclasses.field( - default=None, - metadata={ - "header": { - "field_name": "Personal-Access-Token", - "style": "simple", - "explode": False, - } - }, - ) - - -@dataclasses.dataclass -class BasePipelineRequest(BaseRequest): - pipeline_id: str = dataclasses.field( - metadata={ - "path_param": { - "field_name": "pipeline_id", - "style": "simple", - "explode": False, - } - } - ) - organization_id: Optional[str] = dataclasses.field( - default=None, - metadata=generate_metadata_for_query_parameters("organization_id"), - ) - - -@dataclasses.dataclass -class BasePipelineDataRequest(BasePipelineRequest): - x_pipeline_access_token: str = dataclasses.field( - default=None, - metadata={ - "header": { - "field_name": "X-PIPELINE-ACCESS-TOKEN", - "style": "simple", - "explode": False, - } - }, - ) - - -@dataclasses.dataclass -class BaseSpaceRequest(BaseRequest): - space_id: str - - -@dataclasses.dataclass -class BasePipelineManagementRequest(BaseManagementRequest, BasePipelineRequest): - pass - - -@dataclasses.dataclass -class BaseSpaceManagementDataRequest(BaseManagementRequest, BaseSpaceRequest): - pass - - -@dataclasses.dataclass -class BaseResponse: - content_type: str = dataclasses.field() - status_code: int = dataclasses.field() - raw_response: Response = dataclasses.field() diff --git a/src/glassflow/models/operations/consumeevent.py b/src/glassflow/models/operations/consumeevent.py deleted file mode 100644 index 4e6b2c2..0000000 --- a/src/glassflow/models/operations/consumeevent.py +++ /dev/null @@ -1,64 +0,0 @@ -"""Dataclasses for the consume event operation""" - -from __future__ import annotations - -import dataclasses - -from dataclasses_json import config, dataclass_json - -from .base import BasePipelineDataRequest, BaseResponse - - -@dataclasses.dataclass -class ConsumeEventRequest(BasePipelineDataRequest): - """Request to consume an event from a pipeline topic - - Attributes: - pipeline_id: The id of the pipeline - organization_id: The id of the organization - x_pipeline_access_token: The access token of the pipeline - - """ - - pass - - -@dataclass_json -@dataclasses.dataclass -class ConsumeEventResponseBody: - """Event response body after transformation - - Attributes: - req_id: The request id - receive_time: The time when the event was received - event: The event received - - """ - - req_id: str = dataclasses.field() - receive_time: str = dataclasses.field() - event: dict = dataclasses.field(metadata=config(field_name="response")) - - -@dataclasses.dataclass -class ConsumeEventResponse(BaseResponse): - """Response to consume an event from a pipeline topic - - Attributes: - content_type: HTTP response content type for this operation - status_code: HTTP response status code for this operation - raw_response: Raw HTTP response; suitable for custom response parsing - body: the response body from the api call - - """ - - body: ConsumeEventResponseBody | None = dataclasses.field(default=None) - - def json(self) -> dict: - """Return the response body as a JSON object. - This method is to have compatibility with the requests.Response.json() method - - Returns: - dict: The transformed event as a JSON object - """ - return self.body.event diff --git a/src/glassflow/models/operations/consumefailed.py b/src/glassflow/models/operations/consumefailed.py deleted file mode 100644 index 0e561ec..0000000 --- a/src/glassflow/models/operations/consumefailed.py +++ /dev/null @@ -1,64 +0,0 @@ -"""Dataclasses for the consume event operation""" - -from __future__ import annotations - -import dataclasses - -from dataclasses_json import config, dataclass_json - -from .base import BasePipelineDataRequest, BaseResponse - - -@dataclasses.dataclass -class ConsumeFailedRequest(BasePipelineDataRequest): - """Request to consume failed events from a pipeline - - Attributes: - pipeline_id: The id of the pipeline - organization_id: The id of the organization - x_pipeline_access_token: The access token of the pipeline - - """ - - pass - - -@dataclass_json -@dataclasses.dataclass -class ConsumeFailedResponseBody: - """Event response body after transformation - - Attributes: - req_id: The request id - receive_time: The time when the event was received - event: The event received - - """ - - req_id: str = dataclasses.field() - receive_time: str = dataclasses.field() - event: dict = dataclasses.field(metadata=config(field_name="payload")) - - -@dataclasses.dataclass -class ConsumeFailedResponse(BaseResponse): - """Response to consume a failed event from a pipeline - - Attributes: - content_type: HTTP response content type for this operation - status_code: HTTP response status code for this operation - raw_response: Raw HTTP response; suitable for custom response parsing - body: the response body from the api call - - """ - - body: ConsumeFailedResponseBody | None = dataclasses.field(default=None) - - def json(self) -> dict: - """Return the response body as a JSON object. - This method is to have compatibility with the requests.Response.json() method - - Returns: - dict: The transformed event as a JSON object - """ - return self.body.event diff --git a/src/glassflow/models/operations/function.py b/src/glassflow/models/operations/function.py deleted file mode 100644 index 657c894..0000000 --- a/src/glassflow/models/operations/function.py +++ /dev/null @@ -1,64 +0,0 @@ -from __future__ import annotations - -import dataclasses - -from ...utils import generate_metadata_for_query_parameters -from ..api import ( - ConsumeOutputEvent, - FunctionEnvironments, - FunctionLogs, - SeverityCodeInput, -) -from .base import BasePipelineManagementRequest, BaseResponse - - -@dataclasses.dataclass -class GetFunctionLogsRequest(BasePipelineManagementRequest): - page_size: int = dataclasses.field( - default=50, - metadata=generate_metadata_for_query_parameters("page_size"), - ) - page_token: str = dataclasses.field( - default=None, - metadata=generate_metadata_for_query_parameters("page_token"), - ) - severity_code: SeverityCodeInput | None = dataclasses.field( - default=None, - metadata=generate_metadata_for_query_parameters("severity_code"), - ) - start_time: str | None = dataclasses.field( - default=None, - metadata=generate_metadata_for_query_parameters("start_time"), - ) - end_time: str | None = dataclasses.field( - default=None, - metadata=generate_metadata_for_query_parameters("end_time"), - ) - - -@dataclasses.dataclass -class GetFunctionLogsResponse(BaseResponse): - logs: FunctionLogs - next: str - - -@dataclasses.dataclass -class FetchFunctionRequest(BasePipelineManagementRequest): - pass - - -@dataclasses.dataclass -class UpdateFunctionRequest(BasePipelineManagementRequest): - environments: FunctionEnvironments | None = dataclasses.field(default=None) - - -@dataclasses.dataclass -class TestFunctionRequest(BasePipelineManagementRequest): - request_body: dict = dataclasses.field( - default=None, metadata={"request": {"media_type": "application/json"}} - ) - - -@dataclasses.dataclass -class TestFunctionResponse(ConsumeOutputEvent, BaseResponse): - pass diff --git a/src/glassflow/models/operations/pipeline.py b/src/glassflow/models/operations/pipeline.py index 5a72926..2bde155 100644 --- a/src/glassflow/models/operations/pipeline.py +++ b/src/glassflow/models/operations/pipeline.py @@ -1,91 +1,27 @@ from __future__ import annotations -import dataclasses -from enum import Enum +from pydantic import AwareDatetime, BaseModel -from ...utils import generate_metadata_for_query_parameters -from ..api import ( - CreatePipeline, - GetDetailedSpacePipeline, +from glassflow.models.api import ( PipelineState, SinkConnector, SourceConnector, - SpacePipeline, ) -from .base import BaseManagementRequest, BasePipelineManagementRequest, BaseResponse -@dataclasses.dataclass -class GetPipelineRequest(BasePipelineManagementRequest): - pass - - -@dataclasses.dataclass -class GetPipelineResponse(BaseResponse): - pipeline: GetDetailedSpacePipeline | None = dataclasses.field(default=None) - - -@dataclasses.dataclass -class CreatePipelineRequest(BaseManagementRequest, CreatePipeline): - pass - - -@dataclasses.dataclass -class UpdatePipelineRequest(BaseManagementRequest): - name: str | None = dataclasses.field(default=None) - state: PipelineState | None = dataclasses.field(default=None) - metadata: dict | None = dataclasses.field(default=None) - source_connector: SourceConnector | None = dataclasses.field(default=None) - sink_connector: SinkConnector | None = dataclasses.field(default=None) - - -@dataclasses.dataclass -class UpdatePipelineResponse(BaseResponse): - pipeline: GetDetailedSpacePipeline | None = dataclasses.field(default=None) - - -@dataclasses.dataclass -class CreatePipelineResponse(BaseResponse): +class CreatePipeline(BaseModel): name: str space_id: str + metadata: dict | None = None id: str - created_at: str + created_at: AwareDatetime state: PipelineState access_token: str - metadata: dict | None = dataclasses.field(default=None) - - -@dataclasses.dataclass -class DeletePipelineRequest(BasePipelineManagementRequest): - pass - - -class Order(str, Enum): - asc = "asc" - desc = "desc" - - -@dataclasses.dataclass -class ListPipelinesRequest(BaseManagementRequest): - space_id: list[str] | None = dataclasses.field( - default=None, - metadata=generate_metadata_for_query_parameters("space_id"), - ) - page_size: int = dataclasses.field( - default=50, - metadata=generate_metadata_for_query_parameters("page_size"), - ) - page: int = dataclasses.field( - default=1, - metadata=generate_metadata_for_query_parameters("page"), - ) - order_by: Order = dataclasses.field( - default=Order.asc, - metadata=generate_metadata_for_query_parameters("order_by"), - ) -@dataclasses.dataclass -class ListPipelinesResponse(BaseResponse): - total_amount: int - pipelines: list[SpacePipeline] +class UpdatePipelineRequest(BaseModel): + name: str | None = None + state: str | None = None + metadata: dict | None = None + source_connector: SourceConnector | None = None + sink_connector: SinkConnector | None = None diff --git a/src/glassflow/models/operations/publishevent.py b/src/glassflow/models/operations/publishevent.py deleted file mode 100644 index f5e78ba..0000000 --- a/src/glassflow/models/operations/publishevent.py +++ /dev/null @@ -1,48 +0,0 @@ -"""Dataclasses for publish event operation""" - -from __future__ import annotations - -import dataclasses - -from .base import BasePipelineDataRequest, BaseResponse - - -@dataclasses.dataclass -class PublishEventRequestBody: - pass - - -@dataclasses.dataclass -class PublishEventRequest(BasePipelineDataRequest): - """Request to publish an event to a pipeline topic - - Attributes: - pipeline_id: The id of the pipeline - organization_id: The id of the organization - x_pipeline_access_token: The access token of the pipeline - request_body: The request body / event that should be published to the pipeline - """ - - request_body: dict = dataclasses.field( - default=None, metadata={"request": {"media_type": "application/json"}} - ) - - -@dataclasses.dataclass -class PublishEventResponseBody: - """Message pushed to the pipeline""" - - -@dataclasses.dataclass -class PublishEventResponse(BaseResponse): - """Response object for publish event operation - - Attributes: - content_type: HTTP response content type for this operation - status_code: HTTP response status code for this operation - raw_response: Raw HTTP response; suitable for custom response parsing - object: Response to the publish operation - - """ - - object: PublishEventResponseBody | None = dataclasses.field(default=None) diff --git a/src/glassflow/models/operations/space.py b/src/glassflow/models/operations/space.py deleted file mode 100644 index 10df6a2..0000000 --- a/src/glassflow/models/operations/space.py +++ /dev/null @@ -1,52 +0,0 @@ -from __future__ import annotations - -import dataclasses -from enum import Enum - -from ...utils import generate_metadata_for_query_parameters -from ..api import CreateSpace, SpaceScope -from .base import BaseManagementRequest, BaseResponse, BaseSpaceManagementDataRequest - - -@dataclasses.dataclass -class ListSpacesResponse(BaseResponse): - total_amount: int - spaces: list[SpaceScope] - - -class Order(str, Enum): - asc = "asc" - desc = "desc" - - -@dataclasses.dataclass -class ListSpacesRequest(BaseManagementRequest): - page_size: int = dataclasses.field( - default=50, - metadata=generate_metadata_for_query_parameters("page_size"), - ) - page: int = dataclasses.field( - default=1, - metadata=generate_metadata_for_query_parameters("page"), - ) - order_by: Order = dataclasses.field( - default=Order.asc, - metadata=generate_metadata_for_query_parameters("order_by"), - ) - - -@dataclasses.dataclass -class CreateSpaceRequest(BaseManagementRequest, CreateSpace): - pass - - -@dataclasses.dataclass -class CreateSpaceResponse(BaseResponse): - name: str - id: str - created_at: str - - -@dataclasses.dataclass -class DeleteSpaceRequest(BaseSpaceManagementDataRequest): - pass diff --git a/src/glassflow/models/responses/__init__.py b/src/glassflow/models/responses/__init__.py new file mode 100644 index 0000000..1a8d9a7 --- /dev/null +++ b/src/glassflow/models/responses/__init__.py @@ -0,0 +1,31 @@ +from .pipeline import ( + AccessToken, + ConsumeEventResponse, + ConsumeFailedResponse, + ConsumeOutputEvent, + FunctionLogEntry, + FunctionLogsResponse, + ListAccessTokensResponse, + ListPipelinesResponse, + PublishEventResponse, + TestFunctionResponse, +) +from .secret import ListSecretsResponse, Secret +from .space import ListSpacesResponse, Space + +__all__ = [ + "ListSpacesResponse", + "Space", + "FunctionLogsResponse", + "FunctionLogEntry", + "TestFunctionResponse", + "ListPipelinesResponse", + "ConsumeEventResponse", + "ConsumeOutputEvent", + "PublishEventResponse", + "ConsumeFailedResponse", + "ListAccessTokensResponse", + "AccessToken", + "Secret", + "ListSecretsResponse", +] diff --git a/src/glassflow/models/responses/pipeline.py b/src/glassflow/models/responses/pipeline.py new file mode 100644 index 0000000..10c289f --- /dev/null +++ b/src/glassflow/models/responses/pipeline.py @@ -0,0 +1,240 @@ +from __future__ import annotations + +from enum import Enum +from typing import Any + +from pydantic import AwareDatetime, BaseModel, ConfigDict + + +class Payload(BaseModel): + """ + Logs payload response object. + + Attributes: + message (str): log message + """ + + model_config = ConfigDict( + extra="allow", + ) + message: str + + +class FunctionLogEntry(BaseModel): + """ + Logs entry response object. + + Attributes: + level (int): Log level. + severity_code (int): Log severity code. + timestamp (AwareDatetime): Log timestamp. + payload (Payload): Log payload. + """ + + level: str + severity_code: int + timestamp: AwareDatetime + payload: Payload + + +class FunctionLogsResponse(BaseModel): + """ + Response for a function's logs endpoint. + + Attributes: + logs (list[FunctionLogEntry]): list of logs + next (str): ID used to retrieve next page of logs + """ + + logs: list[FunctionLogEntry] + next: str + + +class EventContext(BaseModel): + """ + Event context response object. + + Attributes: + request_id (str): Request ID. + external_id (str): External ID. + receive_time (AwareDatetime): Receive time. + """ + + request_id: str + external_id: str | None = None + receive_time: AwareDatetime + + +class ConsumeOutputEvent(BaseModel): + """ + Consume output event + + Attributes: + payload (Any): Payload + event_context (EventContext): Event context + status (str): Status + response (Any): request response + error_details (str): Error details + stack_trace (str): Error Stack trace + """ + + payload: Any + event_context: EventContext + status: str + response: Any | None = None + error_details: str | None = None + stack_trace: str | None = None + + +class TestFunctionResponse(ConsumeOutputEvent): + """Response for Test function endpoint.""" + + pass + + +class BasePipeline(BaseModel): + """ + Base pipeline response object. + + Attributes: + name (str): Pipeline name. + space_id (str): Space ID. + metadata (dict[str, Any]): Pipeline metadata. + """ + + name: str + space_id: str + metadata: dict[str, Any] + + +class PipelineState(str, Enum): + """ + Pipeline state + """ + + running = "running" + paused = "paused" + + +class Pipeline(BasePipeline): + """ + Pipeline response object. + + Attributes: + id (str): Pipeline id + created_at (AwareDatetime): Pipeline creation time + state (PipelineState): Pipeline state + """ + + id: str + created_at: AwareDatetime + state: PipelineState + + +class SpacePipeline(Pipeline): + """ + Pipeline with space response object. + + Attributes: + space_name (str): Space name + """ + + space_name: str + + +class ListPipelinesResponse(BaseModel): + """ + Response for list pipelines endpoint + + Attributes: + total_amount (int): Total amount of pipelines. + pipelines (list[SpacePipeline]): List of pipelines. + """ + + total_amount: int + pipelines: list[SpacePipeline] + + +class ConsumeEventResponse(BaseModel): + """ + Response from consume event + + Attributes: + status_code (int): HTTP status code + body (ConsumeOutputEvent): Body of the response + """ + + body: ConsumeOutputEvent | None = None + status_code: int | None = None + + def event(self) -> Any: + """Return event response.""" + if self.body: + return self.body.response + return None + + +class PublishEventResponseBody(BaseModel): + """Message pushed to the pipeline.""" + + pass + + +class PublishEventResponse(BaseModel): + """ + Response from publishing event + + Attributes: + status_code (int): HTTP status code + """ + + status_code: int | None = None + + +class ConsumeFailedResponse(BaseModel): + """ + Response from consuming failed event + + Attributes: + status_code (int): HTTP status code + body (ConsumeOutputEvent | None): ConsumeOutputEvent + """ + + body: ConsumeOutputEvent | None = None + status_code: int | None = None + + def event(self) -> Any: + """Return failed event response.""" + if self.body: + return self.body.response + return None + + +class AccessToken(BaseModel): + """ + Access Token response object. + + Attributes: + id (str): The access token id. + name (str): The access token name. + token (str): The access token string. + created_at (AwareDatetime): The access token creation date. + """ + + id: str + name: str + token: str + created_at: AwareDatetime + + +class ListAccessTokensResponse(BaseModel): + """ + Response for listing access tokens endpoint. + + Attributes: + total_amount (int): Total amount of access tokens. + access_tokens (list[AccessToken]): List of access tokens. + """ + + access_tokens: list[AccessToken] + total_amount: int diff --git a/src/glassflow/models/responses/secret.py b/src/glassflow/models/responses/secret.py new file mode 100644 index 0000000..cbecc8c --- /dev/null +++ b/src/glassflow/models/responses/secret.py @@ -0,0 +1,25 @@ +from pydantic import BaseModel + + +class Secret(BaseModel): + """ + Secret response object + + Attributes: + key (str): Secret key + """ + + key: str + + +class ListSecretsResponse(BaseModel): + """ + Response from the list secrets endpoint. + + Attributes: + total_amount (int): Total amount of the secrets. + secrets (list[Secret]): List of secrets. + """ + + total_amount: int + secrets: list[Secret] diff --git a/src/glassflow/models/responses/space.py b/src/glassflow/models/responses/space.py new file mode 100644 index 0000000..3a556ae --- /dev/null +++ b/src/glassflow/models/responses/space.py @@ -0,0 +1,33 @@ +from datetime import datetime + +from pydantic import BaseModel + + +class Space(BaseModel): + """ + Space response object. + + Attributes: + name (str): Space name. + id (int): Space id. + created_at (datetime): Space creation date. + permission (str): Space permission. + """ + + name: str + id: str + created_at: datetime + permission: str + + +class ListSpacesResponse(BaseModel): + """ + Response from list spaces endpoint. + + Attributes: + total_amount (int): Total amount of spaces. + spaces (list[Space]): List of spaces. + """ + + total_amount: int + spaces: list[Space] diff --git a/src/glassflow/pipeline.py b/src/glassflow/pipeline.py index 585f9f8..23f7b52 100644 --- a/src/glassflow/pipeline.py +++ b/src/glassflow/pipeline.py @@ -1,7 +1,8 @@ from __future__ import annotations from .client import APIClient -from .models import api, errors, operations +from .models import api, errors, operations, responses +from .models.responses.pipeline import AccessToken from .pipeline_data import PipelineDataSink, PipelineDataSource @@ -19,7 +20,7 @@ def __init__( requirements: str | None = None, transformation_file: str | None = None, env_vars: list[dict[str, str]] | None = None, - state: api.PipelineState = "running", + state: str = "running", organization_id: str | None = None, metadata: dict | None = None, created_at: str | None = None, @@ -48,7 +49,7 @@ def __init__( created_at: Timestamp when the pipeline was created Raises: - FailNotFoundError: If the transformation file is provided and + FileNotFoundError: If the transformation file is provided and does not exist """ super().__init__() @@ -68,43 +69,34 @@ def __init__( self.organization_id = organization_id self.metadata = metadata if metadata is not None else {} self.created_at = created_at - self.access_tokens = [] + self.access_tokens: list[AccessToken] = [] + self.headers = {"Personal-Access-Token": self.personal_access_token} + self.query_params = {"organization_id": self.organization_id} if self.transformation_file is not None: self._read_transformation_file() - if source_kind is not None and self.source_config is not None: - self.source_connector = dict( - kind=self.source_kind, - config=self.source_config, - ) - elif self.source_kind is None and self.source_config is None: - self.source_connector = None - else: - raise ValueError("Both source_kind and source_config must be provided") - - if self.sink_kind is not None and self.sink_config is not None: - self.sink_connector = dict( - kind=sink_kind, - config=sink_config, - ) - elif self.sink_kind is None and self.sink_config is None: - self.sink_connector = None - else: - raise ValueError("Both sink_kind and sink_config must be provided") + self.source_connector = self._fill_connector( + "source", + self.source_kind, + self.source_config, + ) + self.sink_connector = self._fill_connector( + "sink", self.sink_kind, self.sink_config + ) def fetch(self) -> Pipeline: """ Fetches pipeline information from the GlassFlow API Returns: - self: Pipeline object + Pipeline object Raises: ValueError: If ID is not provided in the constructor - PipelineNotFoundError: If ID provided does not match any + errors.PipelineNotFoundError: If ID provided does not match any existing pipeline in GlassFlow - UnauthorizedError: If the Personal Access Token is not + errors.PipelineUnauthorizedError: If the Personal Access Token is not provider or is invalid """ if self.id is None: @@ -112,25 +104,14 @@ def fetch(self) -> Pipeline: "Pipeline id must be provided in order to fetch it's details" ) - request = operations.GetPipelineRequest( - pipeline_id=self.id, - organization_id=self.organization_id, - personal_access_token=self.personal_access_token, - ) - - base_res = self._request( - method="GET", - endpoint=f"/pipelines/{self.id}", - request=request, - ) - self._fill_pipeline_details(base_res.raw_response.json()) - + endpoint = f"/pipelines/{self.id}" + http_res = self._request(method="GET", endpoint=endpoint) + fetched_pipeline = api.GetDetailedSpacePipeline(**http_res.json()) + self._fill_pipeline_details(fetched_pipeline) # Fetch Pipeline Access Tokens self._list_access_tokens() - # Fetch function source self._get_function_artifact() - return self def create(self) -> Pipeline: @@ -138,7 +119,7 @@ def create(self) -> Pipeline: Creates a new GlassFlow pipeline Returns: - self: Pipeline object + Pipeline object Raises: ValueError: If name is not provided in the constructor @@ -146,17 +127,7 @@ def create(self) -> Pipeline: ValueError: If transformation_file is not provided in the constructor """ - create_pipeline = api.CreatePipeline( - name=self.name, - space_id=self.space_id, - transformation_function=self.transformation_code, - requirements_txt=self.requirements, - source_connector=self.source_connector, - sink_connector=self.sink_connector, - environments=self.env_vars, - state=self.state, - metadata=self.metadata, - ) + if self.name is None: raise ValueError("Name must be provided in order to create the pipeline") if self.space_id is None: @@ -168,29 +139,45 @@ def create(self) -> Pipeline: else: self._read_transformation_file() - request = operations.CreatePipelineRequest( - organization_id=self.organization_id, - personal_access_token=self.personal_access_token, - **create_pipeline.__dict__, + create_pipeline = api.CreatePipeline( + name=self.name, + space_id=self.space_id, + transformation_function=self.transformation_code, + requirements_txt=self.requirements, + source_connector=self.source_connector, + sink_connector=self.sink_connector, + environments=self.env_vars, + state=api.PipelineState(self.state), + metadata=self.metadata, ) - - base_res = self._request(method="POST", endpoint="/pipelines", request=request) - res = operations.CreatePipelineResponse( - status_code=base_res.status_code, - content_type=base_res.content_type, - raw_response=base_res.raw_response, - **base_res.raw_response.json(), + endpoint = "/pipelines" + http_res = self._request( + method="POST", + endpoint=endpoint, + json=create_pipeline.model_dump(exclude_none=True), + ) + res_json = http_res.json() + # using custom operations model because api model does not exist + res = operations.CreatePipeline( + **res_json, ) - self.id = res.id self.created_at = res.created_at - self.access_tokens.append({"name": "default", "token": res.access_token}) + self.space_id = res.space_id + self.access_tokens.append( + AccessToken( + name="default", + token=res.access_token, + id="default", + created_at=res.created_at, + ) + ) return self def update( self, name: str | None = None, - state: api.PipelineState | None = None, + state: str | None = None, transformation_file: str | None = None, requirements: str | None = None, metadata: dict | None = None, @@ -204,7 +191,6 @@ def update( Updates a GlassFlow pipeline Args: - name: Name of the pipeline state: State of the pipeline after creation. It can be either "running" or "paused" @@ -221,13 +207,10 @@ def update( metadata: Metadata of the pipeline Returns: - self: Updated pipeline + Updated pipeline """ - - # Fetch current pipeline data self.fetch() - if transformation_file is not None or requirements is not None: if transformation_file is not None: with open(transformation_file) as f: @@ -243,27 +226,24 @@ def update( self.transformation_code = file if source_kind is not None: - source_connector = dict( - kind=source_kind, - config=source_config, + source_connector = self._fill_connector( + "source", + source_kind, + source_config, ) else: source_connector = self.source_connector if sink_kind is not None: - sink_connector = dict( - kind=sink_kind, - config=sink_config, - ) + sink_connector = self._fill_connector("sink", sink_kind, sink_config) else: sink_connector = self.sink_connector if env_vars is not None: self._update_function(env_vars) - request = operations.UpdatePipelineRequest( - organization_id=self.organization_id, - personal_access_token=self.personal_access_token, + # using custom model because api model does not exist + pipeline_req = operations.UpdatePipelineRequest( name=name if name is not None else self.name, state=state if state is not None else self.state, metadata=metadata if metadata is not None else self.metadata, @@ -271,173 +251,71 @@ def update( sink_connector=sink_connector, ) - base_res = self._request( - method="PATCH", endpoint=f"/pipelines/{self.id}", request=request - ) - self._fill_pipeline_details(base_res.raw_response.json()) + endpoint = f"/pipelines/{self.id}" + body = pipeline_req.model_dump_json(exclude_none=True) + http_res = self._request(method="PATCH", endpoint=endpoint, data=body) + # Fetch updated pipeline details and validate + updated_pipeline = api.GetDetailedSpacePipeline(**http_res.json()) + self._fill_pipeline_details(updated_pipeline) return self def delete(self) -> None: """ Deletes a GlassFlow pipeline - Returns: - Raises: ValueError: If ID is not provided in the constructor - PipelineNotFoundError: If ID provided does not match any + error.PipelineNotFoundError: If ID provided does not match any existing pipeline in GlassFlow - UnauthorizedError: If the Personal Access Token is not + errors.PipelineUnauthorizedError: If the Personal Access Token is not provided or is invalid """ if self.id is None: raise ValueError("Pipeline id must be provided") - request = operations.DeletePipelineRequest( - pipeline_id=self.id, - organization_id=self.organization_id, - personal_access_token=self.personal_access_token, - ) - self._request( - method="DELETE", - endpoint=f"/pipelines/{self.id}", - request=request, - ) + endpoint = f"/pipelines/{self.id}" + self._request(method="DELETE", endpoint=endpoint) def get_logs( self, page_size: int = 50, page_token: str | None = None, - severity_code: api.SeverityCodeInput = api.SeverityCodeInput.integer_100, + severity_code: int = 100, start_time: str | None = None, end_time: str | None = None, - ) -> operations.GetFunctionLogsResponse: + ) -> responses.FunctionLogsResponse: """ Get the pipeline's logs Args: page_size: Pagination size page_token: Page token filter (use for pagination) - severity_code: Severity code filter + severity_code: Severity code filter (100, 200, 300, 400, 500) start_time: Start time filter end_time: End time filter Returns: - PipelineFunctionsGetLogsResponse: Response with the logs + Response with the logs """ - request = operations.GetFunctionLogsRequest( - organization_id=self.organization_id, - personal_access_token=self.personal_access_token, - pipeline_id=self.id, - page_size=page_size, - page_token=page_token, - severity_code=severity_code, - start_time=start_time, - end_time=end_time, - ) - base_res = self._request( - method="GET", - endpoint=f"/pipelines/{self.id}/functions/main/logs", - request=request, + + query_params = { + "page_size": page_size, + "page_token": page_token, + "severity_code": severity_code, + "start_time": start_time, + "end_time": end_time, + } + endpoint = f"/pipelines/{self.id}/functions/main/logs" + http_res = self._request( + method="GET", endpoint=endpoint, request_query_params=query_params ) - base_res_json = base_res.raw_response.json() - logs = [ - api.FunctionLogEntry.from_dict(entry) for entry in base_res_json["logs"] - ] - return operations.GetFunctionLogsResponse( - status_code=base_res.status_code, - content_type=base_res.content_type, - raw_response=base_res.raw_response, + base_res_json = http_res.json() + logs = [responses.FunctionLogEntry(**entry) for entry in base_res_json["logs"]] + return responses.FunctionLogsResponse( logs=logs, next=base_res_json["next"], ) - def _list_access_tokens(self) -> Pipeline: - request = operations.ListAccessTokensRequest( - organization_id=self.organization_id, - personal_access_token=self.personal_access_token, - pipeline_id=self.id, - ) - base_res = self._request( - method="GET", - endpoint=f"/pipelines/{self.id}/access_tokens", - request=request, - ) - res_json = base_res.raw_response.json() - self.access_tokens = res_json["access_tokens"] - return self - - def _get_function_artifact(self) -> Pipeline: - """ - Fetch pipeline function source - - Returns: - self: Pipeline with function source details - """ - request = operations.GetArtifactRequest( - organization_id=self.organization_id, - personal_access_token=self.personal_access_token, - pipeline_id=self.id, - ) - base_res = self._request( - method="GET", - endpoint=f"/pipelines/{self.id}/functions/main/artifacts/latest", - request=request, - ) - res_json = base_res.raw_response.json() - self.transformation_code = res_json["transformation_function"] - - if "requirements_txt" in res_json: - self.requirements = res_json["requirements_txt"] - return self - - def _upload_function_artifact(self, file: str, requirements: str) -> None: - request = operations.PostArtifactRequest( - pipeline_id=self.id, - organization_id=self.organization_id, - personal_access_token=self.personal_access_token, - file=file, - requirementsTxt=requirements, - ) - try: - self._request( - method="POST", - endpoint=f"/pipelines/{self.id}/functions/main/artifacts", - request=request, - serialization_method="multipart", - ) - except errors.ClientError as e: - if e.status_code == 425: - # TODO: Figure out appropriate behaviour - print("Update still in progress") - else: - raise e - - def _update_function(self, env_vars): - """ - Patch pipeline function - - Args: - env_vars: Environment variables to update - - Returns: - self: Pipeline with updated function - """ - request = operations.UpdateFunctionRequest( - organization_id=self.organization_id, - personal_access_token=self.personal_access_token, - pipeline_id=self.id, - environments=env_vars, - ) - base_res = self._request( - method="PATCH", - endpoint=f"/pipelines/{self.id}/functions/main", - request=request, - ) - res_json = base_res.raw_response.json() - self.env_vars = res_json["environments"] - return self - def get_source( self, pipeline_access_token_name: str | None = None ) -> PipelineDataSource: @@ -450,7 +328,7 @@ def get_source( will be used Returns: - PipelineDataSource: Source client to publish data to the pipeline + Source client to publish data to the pipeline Raises: ValueError: If pipeline id is not provided in the constructor @@ -469,13 +347,133 @@ def get_sink( will be used Returns: - PipelineDataSink: Sink client to consume data from the pipeline + Sink client to consume data from the pipeline Raises: ValueError: If pipeline id is not provided in the constructor """ return self._get_data_client("sink", pipeline_access_token_name) + def test(self, data: dict) -> responses.TestFunctionResponse: + """ + Test a pipeline's function with a sample input JSON + + Args: + data: Input JSON + + Returns: + Test function response + """ + endpoint = f"/pipelines/{self.id}/functions/main/test" + request_body = data + http_res = self._request(method="POST", endpoint=endpoint, json=request_body) + base_res_json = http_res.json() + return responses.TestFunctionResponse( + **base_res_json, + ) + + def _request( + self, + method, + endpoint, + request_headers=None, + json=None, + request_query_params=None, + files=None, + data=None, + ): + headers = {**self.headers, **(request_headers or {})} + query_params = {**self.query_params, **(request_query_params or {})} + try: + return super()._request( + method=method, + endpoint=endpoint, + request_headers=headers, + json=json, + request_query_params=query_params, + files=files, + data=data, + ) + except errors.UnknownError as e: + if e.status_code == 401: + raise errors.PipelineUnauthorizedError(self.id, e.raw_response) from e + if e.status_code == 404: + raise errors.PipelineNotFoundError(self.id, e.raw_response) from e + if e.status_code == 425: + raise errors.PipelineArtifactStillInProgressError( + self.id, e.raw_response + ) from e + raise e + + @staticmethod + def _fill_connector( + connector_type: str, kind: str, config: dict + ) -> api.SourceConnector | api.SinkConnector: + """Format connector input""" + if not kind and not config: + connector = None + elif kind and config: + connector = dict(kind=kind, config=config) + else: + raise errors.ConnectorConfigValueError(connector_type) + + if connector_type == "source": + return api.SourceConnector(root=connector) + elif connector_type == "sink": + return api.SinkConnector(root=connector) + else: + raise ValueError("connector_type must be 'source' or 'sink'") + + def _list_access_tokens(self) -> Pipeline: + endpoint = f"/pipelines/{self.id}/access_tokens" + http_res = self._request(method="GET", endpoint=endpoint) + tokens = responses.ListAccessTokensResponse(**http_res.json()) + self.access_tokens = tokens.access_tokens + return self + + def _get_function_artifact(self) -> Pipeline: + """ + Fetch pipeline function source + + Returns: + Pipeline with function source details + """ + endpoint = f"/pipelines/{self.id}/functions/main/artifacts/latest" + http_res = self._request(method="GET", endpoint=endpoint) + res_json = http_res.json() + self.transformation_code = res_json["transformation_function"] + # you would never know what else was changed + + if "requirements_txt" in res_json: + self.requirements = res_json["requirements_txt"] + return self + + def _upload_function_artifact(self, file: str, requirements: str) -> None: + files = {"file": file} + data = { + "requirementsTxt": requirements, + } + endpoint = f"/pipelines/{self.id}/functions/main/artifacts" + self._request(method="POST", endpoint=endpoint, files=files, data=data) + + def _update_function(self, env_vars): + """ + Patch pipeline function + + Args: + env_vars: Environment variables to update + + Returns: + Pipeline with updated function + """ + endpoint = f"/pipelines/{self.id}/functions/main" + body = api.PipelineFunctionOutput(environments=env_vars) + http_res = self._request( + method="PATCH", endpoint=endpoint, json=body.model_dump() + ) + self.env_vars = http_res.json()["environments"] + return self + def _get_data_client( self, client_type: str, pipeline_access_token_name: str | None = None ) -> PipelineDataSource | PipelineDataSink: @@ -486,15 +484,15 @@ def _get_data_client( if pipeline_access_token_name is not None: for t in self.access_tokens: - if t["name"] == pipeline_access_token_name: - token = t["token"] + if t.name == pipeline_access_token_name: + token = t.token break else: raise ValueError( - f"Token with name {pipeline_access_token_name} " f"was not found" + f"Token with name {pipeline_access_token_name} was not found" ) else: - token = self.access_tokens[0]["token"] + token = self.access_tokens[0].token if client_type == "source": client = PipelineDataSource( pipeline_id=self.id, @@ -509,25 +507,6 @@ def _get_data_client( raise ValueError("client_type must be either source or sink") return client - def _request( - self, - method: str, - endpoint: str, - request: operations.BaseManagementRequest, - **kwargs, - ) -> operations.BaseResponse: - try: - return super()._request( - method=method, endpoint=endpoint, request=request, **kwargs - ) - except errors.ClientError as e: - if e.status_code == 404: - raise errors.PipelineNotFoundError(self.id, e.raw_response) from e - elif e.status_code == 401: - raise errors.UnauthorizedError(e.raw_response) from e - else: - raise e - def _read_transformation_file(self): try: with open(self.transformation_file) as f: @@ -535,51 +514,22 @@ def _read_transformation_file(self): except FileNotFoundError: raise - def _fill_pipeline_details(self, pipeline_details: dict) -> Pipeline: - self.id = pipeline_details["id"] - self.name = pipeline_details["name"] - self.space_id = pipeline_details["space_id"] - self.state = pipeline_details["state"] - if pipeline_details["source_connector"]: - self.source_kind = pipeline_details["source_connector"]["kind"] - self.source_config = pipeline_details["source_connector"]["config"] - if pipeline_details["sink_connector"]: - self.sink_kind = pipeline_details["sink_connector"]["kind"] - self.sink_config = pipeline_details["sink_connector"]["config"] - self.created_at = pipeline_details["created_at"] - self.env_vars = pipeline_details["environments"] + def _fill_pipeline_details( + self, pipeline_details: api.GetDetailedSpacePipeline + ) -> Pipeline: + self.id = pipeline_details.id + self.name = pipeline_details.name + self.space_id = pipeline_details.space_id + self.state = pipeline_details.state + source_connector = pipeline_details.source_connector + if source_connector.root: + self.source_kind = source_connector.root.kind + self.source_config = source_connector.root.config + sink_connector = pipeline_details.sink_connector + if sink_connector.root: + self.sink_kind = sink_connector.root.kind + self.sink_config = sink_connector.root.config + self.created_at = pipeline_details.created_at + self.env_vars = pipeline_details.environments return self - - def test(self, data: dict) -> operations.TestFunctionResponse: - """ - Test a pipeline's function with a sample input JSON - - Args: - data: Input JSON - - Returns: - TestFunctionResponse: Test function response - """ - request = operations.TestFunctionRequest( - pipeline_id=self.id, - organization_id=self.organization_id, - personal_access_token=self.personal_access_token, - request_body=data, - ) - - base_res = self._request( - method="POST", - endpoint=f"/pipelines/{self.id}/functions/main/test", - request=request, - ) - base_res_json = base_res.raw_response.json() - base_res_json["event_context"] = api.EventContext( - **base_res_json["event_context"] - ) - return operations.TestFunctionResponse( - status_code=base_res.status_code, - content_type=base_res.content_type, - raw_response=base_res.raw_response, - **base_res_json, - ) diff --git a/src/glassflow/pipeline_data.py b/src/glassflow/pipeline_data.py index 52d0380..cd4dafa 100644 --- a/src/glassflow/pipeline_data.py +++ b/src/glassflow/pipeline_data.py @@ -1,87 +1,86 @@ import random import time -from typing import Optional from .api_client import APIClient -from .models import errors, operations -from .models.operations.base import BasePipelineDataRequest, BaseResponse -from .utils import utils +from .models import errors, responses class PipelineDataClient(APIClient): """Base Client object to publish and consume events from the given pipeline. Attributes: - glassflow_config: GlassFlowConfig object to interact with GlassFlow API - pipeline_id: The pipeline id to interact with - pipeline_access_token: The access token to access the pipeline + glassflow_config (GlassFlowConfig): GlassFlowConfig object to interact + with GlassFlow API + pipeline_id (str): The pipeline id to interact with + pipeline_access_token (str): The access token to access the pipeline """ def __init__(self, pipeline_id: str, pipeline_access_token: str): super().__init__() self.pipeline_id = pipeline_id self.pipeline_access_token = pipeline_access_token + self.headers = {"X-PIPELINE-ACCESS-TOKEN": self.pipeline_access_token} + self.query_params = {} def validate_credentials(self) -> None: """ Check if the pipeline credentials are valid and raise an error if not """ - request = operations.StatusAccessTokenRequest( - pipeline_id=self.pipeline_id, - x_pipeline_access_token=self.pipeline_access_token, - ) - self._request( - method="GET", - endpoint="/pipelines/{pipeline_id}/status/access_token", - request=request, - ) + + endpoint = f"/pipelines/{self.pipeline_id}/status/access_token" + return self._request(method="GET", endpoint=endpoint) def _request( - self, method: str, endpoint: str, request: BasePipelineDataRequest, **kwargs - ) -> BaseResponse: + self, + method, + endpoint, + request_headers=None, + json=None, + request_query_params=None, + files=None, + data=None, + ): + headers = {**self.headers, **(request_headers or {})} + query_params = {**self.query_params, **(request_query_params or {})} try: - res = super()._request(method, endpoint, request, **kwargs) - except errors.ClientError as e: + return super()._request( + method=method, + endpoint=endpoint, + request_headers=headers, + json=json, + request_query_params=query_params, + files=files, + data=data, + ) + except errors.UnknownError as e: if e.status_code == 401: raise errors.PipelineAccessTokenInvalidError(e.raw_response) from e - elif e.status_code == 404: + if e.status_code == 404: raise errors.PipelineNotFoundError( self.pipeline_id, e.raw_response ) from e - else: - raise e - return res + if e.status_code == 429: + return errors.PipelineTooManyRequestsError(e.raw_response) + raise e class PipelineDataSource(PipelineDataClient): - def publish(self, request_body: dict) -> operations.PublishEventResponse: + def publish(self, request_body: dict) -> responses.PublishEventResponse: """Push a new message into the pipeline Args: request_body: The message to be published into the pipeline Returns: - PublishEventResponse: Response object containing the status - code and the raw response + Response object containing the status code and the raw response Raises: - ClientError: If an error occurred while publishing the event + errors.ClientError: If an error occurred while publishing the event """ - request = operations.PublishEventRequest( - pipeline_id=self.pipeline_id, - x_pipeline_access_token=self.pipeline_access_token, - request_body=request_body, - ) - base_res = self._request( - method="POST", - endpoint="/pipelines/{pipeline_id}/topics/input/events", - request=request, - ) - - return operations.PublishEventResponse( - status_code=base_res.status_code, - content_type=base_res.content_type, - raw_response=base_res.raw_response, + endpoint = f"/pipelines/{self.pipeline_id}/topics/input/events" + http_res = self._request(method="POST", endpoint=endpoint, json=request_body) + return responses.PublishEventResponse( + status_code=http_res.status_code, ) @@ -94,110 +93,55 @@ def __init__(self, pipeline_id: str, pipeline_access_token: str): self._consume_retry_delay_current = 1 self._consume_retry_delay_max = 60 - def consume(self) -> operations.ConsumeEventResponse: + def consume(self) -> responses.ConsumeEventResponse: """Consume the last message from the pipeline Returns: - ConsumeEventResponse: Response object containing the status - code and the raw response + Response object containing the status code and the raw response Raises: - ClientError: If an error occurred while consuming the event + errors.ClientError: If an error occurred while consuming the event """ - request = operations.ConsumeEventRequest( - pipeline_id=self.pipeline_id, - x_pipeline_access_token=self.pipeline_access_token, - ) + endpoint = f"/pipelines/{self.pipeline_id}/topics/output/events/consume" self._respect_retry_delay() - base_res = self._request( - method="POST", - endpoint="/pipelines/{pipeline_id}/topics/output/events/consume", - request=request, - ) + http_res = self._request(method="POST", endpoint=endpoint) + self._update_retry_delay(http_res.status_code) - res = operations.ConsumeEventResponse( - status_code=base_res.status_code, - content_type=base_res.content_type, - raw_response=base_res.raw_response, - ) + body = None + if http_res.status_code == 200: + body = http_res.json() + self._consume_retry_delay_current = self._consume_retry_delay_minimum - self._update_retry_delay(base_res.status_code) - if res.status_code == 200: - if not utils.match_content_type(res.content_type, "application/json"): - raise errors.UnknownContentTypeError(res.raw_response) + return responses.ConsumeEventResponse( + status_code=http_res.status_code, body=body + ) - self._consume_retry_delay_current = self._consume_retry_delay_minimum - body = utils.unmarshal_json( - res.raw_response.text, Optional[operations.ConsumeEventResponseBody] - ) - res.body = body - elif res.status_code == 204: - # No messages to be consumed. - # update the retry delay - # Return an empty response body - body = operations.ConsumeEventResponseBody("", "", {}) - res.body = body - elif res.status_code == 429: - # update the retry delay - body = operations.ConsumeEventResponseBody("", "", {}) - res.body = body - elif not utils.match_content_type(res.content_type, "application/json"): - raise errors.UnknownContentTypeError(res.raw_response) - - return res - - def consume_failed(self) -> operations.ConsumeFailedResponse: + def consume_failed(self) -> responses.ConsumeFailedResponse: """Consume the failed message from the pipeline Returns: - ConsumeFailedResponse: Response object containing the status - code and the raw response + Response object containing the status code and the raw response Raises: - ClientError: If an error occurred while consuming the event + errors.ClientError: If an error occurred while consuming the event """ - request = operations.ConsumeFailedRequest( - pipeline_id=self.pipeline_id, - x_pipeline_access_token=self.pipeline_access_token, - ) self._respect_retry_delay() - base_res = self._request( - method="POST", - endpoint="/pipelines/{pipeline_id}/topics/failed/events/consume", - request=request, - ) - - res = operations.ConsumeFailedResponse( - status_code=base_res.status_code, - content_type=base_res.content_type, - raw_response=base_res.raw_response, - ) - - self._update_retry_delay(res.status_code) - if res.status_code == 200: - if not utils.match_content_type(res.content_type, "application/json"): - raise errors.UnknownContentTypeError(res.raw_response) + endpoint = f"/pipelines/{self.pipeline_id}/topics/failed/events/consume" + http_res = self._request(method="POST", endpoint=endpoint) + self._update_retry_delay(http_res.status_code) + body = None + if http_res.status_code == 200: + body = http_res.json() self._consume_retry_delay_current = self._consume_retry_delay_minimum - body = utils.unmarshal_json( - res.raw_response.text, Optional[operations.ConsumeFailedResponseBody] - ) - res.body = body - elif res.status_code == 204: - # No messages to be consumed. Return an empty response body - body = operations.ConsumeFailedResponseBody("", "", {}) - res.body = body - elif res.status_code == 429: - # update the retry delay - body = operations.ConsumeEventResponseBody("", "", {}) - res.body = body - elif not utils.match_content_type(res.content_type, "application/json"): - raise errors.UnknownContentTypeError(res.raw_response) - return res + + return responses.ConsumeFailedResponse( + status_code=http_res.status_code, body=body + ) def _update_retry_delay(self, status_code: int): if status_code == 200: diff --git a/src/glassflow/secret.py b/src/glassflow/secret.py new file mode 100644 index 0000000..1961778 --- /dev/null +++ b/src/glassflow/secret.py @@ -0,0 +1,117 @@ +from __future__ import annotations + +import re + +from .api_client import APIClient +from .models import api, errors + + +class Secret(APIClient): + def __init__( + self, + personal_access_token: str, + key: str | None = None, + value: str | None = None, + organization_id: str | None = None, + ): + """ + Creates a new Glassflow Secret object + + Args: + personal_access_token: The personal access token to authenticate + against GlassFlow + key: Name of the secret. It must start with a letter, + and it can only contain characters in a-zA-Z0-9_ + value: Value of the secret to store + + Raises: + errors.SecretInvalidKeyError: If secret key is invalid + """ + super().__init__() + self.personal_access_token = personal_access_token + self.organization_id = organization_id + self.key = key + self.value = value + self.headers = {"Personal-Access-Token": self.personal_access_token} + self.query_params = {"organization_id": self.organization_id} + + if self.key and not self._is_key_valid(self.key): + raise errors.SecretInvalidKeyError(self.key) + + def create(self) -> Secret: + """ + Creates a new Glassflow Secret + + Returns: + Secret object + + Raises: + ValueError: If secret key or value are not set in the constructor + errors.SecretUnauthorizedError: If personal access token is invalid + """ + if self.key is None: + raise ValueError("Secret key is required in the constructor") + if self.value is None: + raise ValueError("Secret value is required in the constructor") + + secret_api_obj = api.CreateSecret( + **{ + "key": self.key, + "value": self.value, + } + ) + + endpoint = "/secrets" + self._request( + method="POST", endpoint=endpoint, json=secret_api_obj.model_dump() + ) + return self + + def delete(self) -> None: + """ + Deletes a Glassflow Secret. + + Raises: + errors.SecretUnauthorizedError: If personal access token is invalid + errors.SecretNotFoundError: If secret key does not exist + ValueError: If secret key is not set in the constructor + """ + if self.key is None: + raise ValueError("Secret key is required in the constructor") + + endpoint = f"/secrets/{self.key}" + self._request(method="DELETE", endpoint=endpoint) + + @staticmethod + def _is_key_valid(key: str) -> bool: + search = re.compile(r"[^a-zA-Z0-9_]").search + return not bool(search(key)) + + def _request( + self, + method, + endpoint, + request_headers=None, + json=None, + request_query_params=None, + files=None, + data=None, + ): + headers = {**self.headers, **(request_headers or {})} + query_params = {**self.query_params, **(request_query_params or {})} + try: + return super()._request( + method=method, + endpoint=endpoint, + request_headers=headers, + json=json, + request_query_params=query_params, + files=files, + data=data, + ) + except errors.UnknownError as e: + if e.status_code == 401: + raise errors.SecretUnauthorizedError(self.key, e.raw_response) from e + if e.status_code == 404: + raise errors.SecretNotFoundError(self.key, e.raw_response) from e + raise e diff --git a/src/glassflow/space.py b/src/glassflow/space.py index ec96308..312e3e9 100644 --- a/src/glassflow/space.py +++ b/src/glassflow/space.py @@ -1,7 +1,9 @@ from __future__ import annotations +import datetime + from .client import APIClient -from .models import api, errors, operations +from .models import api, errors class Space(APIClient): @@ -10,10 +12,10 @@ def __init__( personal_access_token: str, name: str | None = None, id: str | None = None, - created_at: str | None = None, + created_at: datetime.datetime | None = None, organization_id: str | None = None, ): - """Creates a new GlassFlow pipeline object + """Creates a new GlassFlow space object Args: personal_access_token: The personal access token to authenticate @@ -29,84 +31,78 @@ def __init__( self.created_at = created_at self.organization_id = organization_id self.personal_access_token = personal_access_token + self.headers = {"Personal-Access-Token": self.personal_access_token} + self.query_params = {"organization_id": self.organization_id} def create(self) -> Space: """ Creates a new GlassFlow space Returns: - self: Space object + Space object Raises: ValueError: If name is not provided in the constructor """ - if self.name is None: - raise ValueError("Name must be provided in order to create the space") - create_space = api.CreateSpace(name=self.name) - request = operations.CreateSpaceRequest( - organization_id=self.organization_id, - personal_access_token=self.personal_access_token, - **create_space.__dict__, - ) - base_res = self._request(method="POST", endpoint="/spaces", request=request) + space_api_obj = api.CreateSpace(name=self.name) - res = operations.CreateSpaceResponse( - status_code=base_res.status_code, - content_type=base_res.content_type, - raw_response=base_res.raw_response, - **base_res.raw_response.json(), + endpoint = "/spaces" + http_res = self._request( + method="POST", endpoint=endpoint, json=space_api_obj.model_dump() ) - self.id = res.id - self.created_at = res.created_at - self.name = res.name + space_created = api.Space(**http_res.json()) + self.id = space_created.id + self.created_at = space_created.created_at + self.name = space_created.name return self def delete(self) -> None: """ Deletes a GlassFlow space - Returns: - Raises: ValueError: If ID is not provided in the constructor - SpaceNotFoundError: If ID provided does not match any + errors.SpaceNotFoundError: If ID provided does not match any existing space in GlassFlow - UnauthorizedError: If the Personal Access Token is not + errors.SpaceUnauthorizedError: If the Personal Access Token is not provided or is invalid + errors.SpaceIsNotEmptyError: If the Space is not empty """ if self.id is None: raise ValueError("Space id must be provided in the constructor") - request = operations.DeleteSpaceRequest( - space_id=self.id, - organization_id=self.organization_id, - personal_access_token=self.personal_access_token, - ) - self._request( - method="DELETE", - endpoint=f"/spaces/{self.id}", - request=request, - ) + endpoint = f"/spaces/{self.id}" + self._request(method="DELETE", endpoint=endpoint) def _request( self, - method: str, - endpoint: str, - request: operations.BaseManagementRequest, - **kwargs, - ) -> operations.BaseResponse: + method, + endpoint, + request_headers=None, + json=None, + request_query_params=None, + files=None, + data=None, + ): + headers = {**self.headers, **(request_headers or {})} + query_params = {**self.query_params, **(request_query_params or {})} try: return super()._request( - method=method, endpoint=endpoint, request=request, **kwargs + method=method, + endpoint=endpoint, + request_headers=headers, + json=json, + request_query_params=query_params, + files=files, + data=data, ) - except errors.ClientError as e: + except errors.UnknownError as e: + if e.status_code == 401: + raise errors.SpaceUnauthorizedError(self.id, e.raw_response) from e if e.status_code == 404: raise errors.SpaceNotFoundError(self.id, e.raw_response) from e - elif e.status_code == 401: - raise errors.UnauthorizedError(e.raw_response) from e - elif e.status_code == 409: + if e.status_code == 409: raise errors.SpaceIsNotEmptyError(e.raw_response) from e - else: - raise e + raise e diff --git a/src/glassflow/utils/__init__.py b/src/glassflow/utils/__init__.py deleted file mode 100644 index 771f331..0000000 --- a/src/glassflow/utils/__init__.py +++ /dev/null @@ -1,23 +0,0 @@ -from .utils import ( - generate_metadata_for_query_parameters, - generate_url, - get_field_name, - get_query_params, - get_req_specific_headers, - marshal_json, - match_content_type, - serialize_request_body, - unmarshal_json, -) - -__all__ = [ - "match_content_type", - "unmarshal_json", - "generate_url", - "serialize_request_body", - "get_query_params", - "get_req_specific_headers", - "get_field_name", - "marshal_json", - "generate_metadata_for_query_parameters", -] diff --git a/src/glassflow/utils/utils.py b/src/glassflow/utils/utils.py deleted file mode 100644 index 806f06d..0000000 --- a/src/glassflow/utils/utils.py +++ /dev/null @@ -1,688 +0,0 @@ -# ruff: noqa: E501, SIM102 - -import json -import re -from dataclasses import Field, dataclass, fields, is_dataclass, make_dataclass -from datetime import datetime -from decimal import Decimal -from email.message import Message -from enum import Enum -from typing import Any, Callable, Dict, List, Tuple, Union, get_args, get_origin -from xmlrpc.client import boolean - -from dataclasses_json import DataClassJsonMixin -from typing_inspect import is_optional_type - - -def generate_metadata_for_query_parameters(param_name): - return { - "query_param": { - "field_name": param_name, - "style": "form", - "explode": True, - } - } - - -def generate_url( - clazz: type, - server_url: str, - path: str, - path_params: dataclass, - gbls: Dict[str, Dict[str, Dict[str, Any]]] = None, -) -> str: - path_param_fields: Tuple[Field, ...] = fields(clazz) - for field in path_param_fields: - request_metadata = field.metadata.get("request") - if request_metadata is not None: - continue - - param_metadata = field.metadata.get("path_param") - if param_metadata is None: - continue - - param = getattr(path_params, field.name) if path_params is not None else None - param = _populate_from_globals(field.name, param, "pathParam", gbls) - - if param is None: - continue - - f_name = param_metadata.get("field_name", field.name) - serialization = param_metadata.get("serialization", "") - if serialization != "": - serialized_params = _get_serialized_params( - param_metadata, field.type, f_name, param - ) - for key, value in serialized_params.items(): - path = path.replace("{" + key + "}", value, 1) - else: - if param_metadata.get("style", "simple") == "simple": - if isinstance(param, List): - pp_vals: List[str] = [] - for pp_val in param: - if pp_val is None: - continue - pp_vals.append(_val_to_string(pp_val)) - path = path.replace( - "{" + param_metadata.get("field_name", field.name) + "}", - ",".join(pp_vals), - 1, - ) - elif isinstance(param, Dict): - pp_vals: List[str] = [] - for pp_key in param: - if param[pp_key] is None: - continue - if param_metadata.get("explode"): - pp_vals.append(f"{pp_key}={_val_to_string(param[pp_key])}") - else: - pp_vals.append(f"{pp_key},{_val_to_string(param[pp_key])}") - path = path.replace( - "{" + param_metadata.get("field_name", field.name) + "}", - ",".join(pp_vals), - 1, - ) - elif not isinstance(param, (str, int, float, complex, bool, Decimal)): - pp_vals: List[str] = [] - param_fields: Tuple[Field, ...] = fields(param) - for param_field in param_fields: - param_value_metadata = param_field.metadata.get("path_param") - if not param_value_metadata: - continue - - parm_name = param_value_metadata.get("field_name", field.name) - - param_field_val = getattr(param, param_field.name) - if param_field_val is None: - continue - if param_metadata.get("explode"): - pp_vals.append( - f"{parm_name}={_val_to_string(param_field_val)}" - ) - else: - pp_vals.append( - f"{parm_name},{_val_to_string(param_field_val)}" - ) - path = path.replace( - "{" + param_metadata.get("field_name", field.name) + "}", - ",".join(pp_vals), - 1, - ) - else: - path = path.replace( - "{" + param_metadata.get("field_name", field.name) + "}", - _val_to_string(param), - 1, - ) - - return remove_suffix(server_url, "/") + path - - -def is_optional(field): - return get_origin(field) is Union and type(None) in get_args(field) - - -def get_query_params( - clazz: type, - query_params: dataclass, - gbls: Dict[str, Dict[str, Dict[str, Any]]] = None, -) -> Dict[str, List[str]]: - params: Dict[str, List[str]] = {} - - param_fields: Tuple[Field, ...] = fields(clazz) - for field in param_fields: - request_metadata = field.metadata.get("request") - if request_metadata is not None: - continue - - metadata = field.metadata.get("query_param") - if not metadata: - continue - - param_name = field.name - value = getattr(query_params, param_name) if query_params is not None else None - - value = _populate_from_globals(param_name, value, "queryParam", gbls) - - f_name = metadata.get("field_name") - serialization = metadata.get("serialization", "") - if serialization != "": - serialized_parms = _get_serialized_params( - metadata, field.type, f_name, value - ) - for key, value in serialized_parms.items(): - if key in params: - params[key].extend(value) - else: - params[key] = [value] - else: - style = metadata.get("style", "form") - if style == "deepObject": - params = { - **params, - **_get_deep_object_query_params(metadata, f_name, value), - } - elif style == "form": - params = { - **params, - **_get_delimited_query_params(metadata, f_name, value, ","), - } - elif style == "pipeDelimited": - params = { - **params, - **_get_delimited_query_params(metadata, f_name, value, "|"), - } - else: - raise Exception("not yet implemented") - return params - - -def get_req_specific_headers(headers_params: dataclass) -> Dict[str, str]: - if headers_params is None: - return {} - - headers: Dict[str, str] = {} - - param_fields: Tuple[Field, ...] = fields(headers_params) - for field in param_fields: - metadata = field.metadata.get("header") - if not metadata: - continue - - value = _serialize_header( - metadata.get("explode", False), getattr(headers_params, field.name) - ) - - if value != "": - headers[metadata.get("field_name", field.name)] = value - - return headers - - -def _get_serialized_params( - metadata: Dict, field_type: type, field_name: str, obj: any -) -> Dict[str, str]: - params: Dict[str, str] = {} - - serialization = metadata.get("serialization", "") - if serialization == "json": - params[metadata.get("field_name", field_name)] = marshal_json(obj, field_type) - - return params - - -def _get_deep_object_query_params( - metadata: Dict, field_name: str, obj: any -) -> Dict[str, List[str]]: - params: Dict[str, List[str]] = {} - - if obj is None: - return params - - if is_dataclass(obj): - obj_fields: Tuple[Field, ...] = fields(obj) - for obj_field in obj_fields: - obj_param_metadata = obj_field.metadata.get("query_param") - if not obj_param_metadata: - continue - - obj_val = getattr(obj, obj_field.name) - if obj_val is None: - continue - - if isinstance(obj_val, List): - for val in obj_val: - if val is None: - continue - - if ( - params.get( - f'{metadata.get("field_name", field_name)}[{obj_param_metadata.get("field_name", obj_field.name)}]' - ) - is None - ): - params[ - f'{metadata.get("field_name", field_name)}[{obj_param_metadata.get("field_name", obj_field.name)}]' - ] = [] - - params[ - f'{metadata.get("field_name", field_name)}[{obj_param_metadata.get("field_name", obj_field.name)}]' - ].append(_val_to_string(val)) - else: - params[ - f'{metadata.get("field_name", field_name)}[{obj_param_metadata.get("field_name", obj_field.name)}]' - ] = [_val_to_string(obj_val)] - elif isinstance(obj, Dict): - for key, value in obj.items(): - if value is None: - continue - - if isinstance(value, List): - for val in value: - if val is None: - continue - - if ( - params.get(f'{metadata.get("field_name", field_name)}[{key}]') - is None - ): - params[f'{metadata.get("field_name", field_name)}[{key}]'] = [] - - params[f'{metadata.get("field_name", field_name)}[{key}]'].append( - _val_to_string(val) - ) - else: - params[f'{metadata.get("field_name", field_name)}[{key}]'] = [ - _val_to_string(value) - ] - return params - - -def _get_query_param_field_name(obj_field: Field) -> str: - obj_param_metadata = obj_field.metadata.get("query_param") - - if not obj_param_metadata: - return "" - - return obj_param_metadata.get("field_name", obj_field.name) - - -def _get_delimited_query_params( - metadata: Dict, field_name: str, obj: any, delimiter: str -) -> Dict[str, List[str]]: - return _populate_form( - field_name, - metadata.get("explode", True), - obj, - _get_query_param_field_name, - delimiter, - ) - - -SERIALIZATION_METHOD_TO_CONTENT_TYPE = { - "json": "application/json", - "form": "application/x-www-form-urlencoded", - "multipart": "multipart/form-data", - "raw": "application/octet-stream", - "string": "text/plain", -} - - -def serialize_request_body( - request: dataclass, - request_type: type, - request_field_name: str, - nullable: bool, - optional: bool, - serialization_method: str, - encoder=None, -) -> Tuple[str, any, any]: - if request is None and not nullable and optional: - return None, None, None - - if not is_dataclass(request) or not hasattr(request, request_field_name): - return serialize_content_type( - request_field_name, - request_type, - SERIALIZATION_METHOD_TO_CONTENT_TYPE[serialization_method], - request, - encoder, - ) - - request_val = getattr(request, request_field_name) - - if request_val is None and not nullable and optional: - return None, None, None - - request_fields: Tuple[Field, ...] = fields(request) - request_metadata = None - - for field in request_fields: - if field.name == request_field_name: - request_metadata = field.metadata.get("request") - break - - if request_metadata is None: - raise Exception("invalid request type") - - return serialize_content_type( - request_field_name, - request_type, - request_metadata.get("media_type", "application/octet-stream"), - request_val, - ) - - -def serialize_content_type( - field_name: str, - request_type: any, - media_type: str, - request: dataclass, - encoder=None, -) -> Tuple[str, any, List[List[any]]]: - if re.match(r"(application|text)\/.*?\+*json.*", media_type) is not None: - return media_type, marshal_json(request, request_type, encoder), None - if re.match(r"multipart\/.*", media_type) is not None: - return serialize_multipart_form(media_type, request) - if re.match(r"application\/x-www-form-urlencoded.*", media_type) is not None: - return media_type, serialize_form_data(field_name, request), None - if isinstance(request, (bytes, bytearray)): - return media_type, request, None - if isinstance(request, str): - return media_type, request, None - - raise Exception( - f"invalid request body type {type(request)} for mediaType {media_type}" - ) - - -def serialize_multipart_form( - media_type: str, request: dataclass -) -> Tuple[str, any, List[List[any]]]: - form: List[List[any]] = [] - request_fields = fields(request) - - for field in request_fields: - val = getattr(request, field.name) - if val is None: - continue - - field_metadata = field.metadata.get("multipart_form") - if not field_metadata: - continue - - if field_metadata.get("file") is True: - file_fields = fields(val) - - file_name = "" - field_name = "" - content = b"" - - for file_field in file_fields: - file_metadata = file_field.metadata.get("multipart_form") - if file_metadata is None: - continue - - if file_metadata.get("content") is True: - content = getattr(val, file_field.name) - else: - field_name = file_metadata.get("field_name", file_field.name) - file_name = getattr(val, file_field.name) - if field_name == "" or file_name == "" or content == b"": - raise Exception("invalid multipart/form-data file") - - form.append([field_name, [file_name, content]]) - elif field_metadata.get("json") is True: - to_append = [ - field_metadata.get("field_name", field.name), - [None, marshal_json(val, field.type), "application/json"], - ] - form.append(to_append) - else: - field_name = field_metadata.get("field_name", field.name) - if isinstance(val, List): - for value in val: - if value is None: - continue - form.append([field_name + "[]", [None, _val_to_string(value)]]) - else: - form.append([field_name, [None, _val_to_string(val)]]) - return media_type, None, form - - -def serialize_form_data(field_name: str, data: dataclass) -> Dict[str, any]: - form: Dict[str, List[str]] = {} - - if is_dataclass(data): - for field in fields(data): - val = getattr(data, field.name) - if val is None: - continue - - metadata = field.metadata.get("form") - if metadata is None: - continue - - field_name = metadata.get("field_name", field.name) - - if metadata.get("json"): - form[field_name] = [marshal_json(val, field.type)] - else: - if metadata.get("style", "form") == "form": - form = { - **form, - **_populate_form( - field_name, - metadata.get("explode", True), - val, - _get_form_field_name, - ",", - ), - } - else: - raise Exception(f"Invalid form style for field {field.name}") - elif isinstance(data, Dict): - for key, value in data.items(): - form[key] = [_val_to_string(value)] - else: - raise Exception(f"Invalid request body type for field {field_name}") - - return form - - -def _get_form_field_name(obj_field: Field) -> str: - obj_param_metadata = obj_field.metadata.get("form") - - if not obj_param_metadata: - return "" - - return obj_param_metadata.get("field_name", obj_field.name) - - -def _populate_form( - field_name: str, - explode: boolean, - obj: any, - get_field_name_func: Callable, - delimiter: str, -) -> Dict[str, List[str]]: - params: Dict[str, List[str]] = {} - - if obj is None: - return params - - if is_dataclass(obj): - items = [] - - obj_fields: Tuple[Field, ...] = fields(obj) - for obj_field in obj_fields: - obj_field_name = get_field_name_func(obj_field) - if obj_field_name == "": - continue - - val = getattr(obj, obj_field.name) - if val is None: - continue - - if explode: - params[obj_field_name] = [_val_to_string(val)] - else: - items.append(f"{obj_field_name}{delimiter}{_val_to_string(val)}") - - if len(items) > 0: - params[field_name] = [delimiter.join(items)] - elif isinstance(obj, Dict): - items = [] - for key, value in obj.items(): - if value is None: - continue - - if explode: - params[key] = _val_to_string(value) - else: - items.append(f"{key}{delimiter}{_val_to_string(value)}") - - if len(items) > 0: - params[field_name] = [delimiter.join(items)] - elif isinstance(obj, List): - items = [] - - for value in obj: - if value is None: - continue - - if explode: - if field_name not in params: - params[field_name] = [] - params[field_name].append(_val_to_string(value)) - else: - items.append(_val_to_string(value)) - - if len(items) > 0: - params[field_name] = [delimiter.join([str(item) for item in items])] - else: - params[field_name] = [_val_to_string(obj)] - - return params - - -def _serialize_header(explode: bool, obj: any) -> str: - if obj is None: - return "" - - if is_dataclass(obj): - items = [] - obj_fields: Tuple[Field, ...] = fields(obj) - for obj_field in obj_fields: - obj_param_metadata = obj_field.metadata.get("header") - - if not obj_param_metadata: - continue - - obj_field_name = obj_param_metadata.get("field_name", obj_field.name) - if obj_field_name == "": - continue - - val = getattr(obj, obj_field.name) - if val is None: - continue - - if explode: - items.append(f"{obj_field_name}={_val_to_string(val)}") - else: - items.append(obj_field_name) - items.append(_val_to_string(val)) - - if len(items) > 0: - return ",".join(items) - elif isinstance(obj, Dict): - items = [] - - for key, value in obj.items(): - if value is None: - continue - - if explode: - items.append(f"{key}={_val_to_string(value)}") - else: - items.append(key) - items.append(_val_to_string(value)) - - if len(items) > 0: - return ",".join([str(item) for item in items]) - elif isinstance(obj, List): - items = [] - - for value in obj: - if value is None: - continue - - items.append(_val_to_string(value)) - - if len(items) > 0: - return ",".join(items) - else: - return f"{_val_to_string(obj)}" - - return "" - - -def unmarshal_json(data, typ, decoder=None): - unmarshal = make_dataclass("Unmarshal", [("res", typ)], bases=(DataClassJsonMixin,)) - json_dict = json.loads(data) - try: - out = unmarshal.from_dict({"res": json_dict}) - except AttributeError as attr_err: - raise AttributeError( - f"unable to unmarshal {data} as {typ} - {attr_err}" - ) from attr_err - - return out.res if decoder is None else decoder(out.res) - - -def marshal_json(val, typ, encoder=None): - if not is_optional_type(typ) and val is None: - raise ValueError(f"Could not marshal None into non-optional type: {typ}") - - marshal = make_dataclass("Marshal", [("res", typ)], bases=(DataClassJsonMixin,)) - marshaller = marshal(res=val) - json_dict = marshaller.to_dict() - val = json_dict["res"] if encoder is None else encoder(json_dict["res"]) - - return json.dumps(val, separators=(",", ":"), sort_keys=True) - - -def match_content_type(content_type: str, pattern: str) -> boolean: - if pattern in (content_type, "*", "*/*"): - return True - - msg = Message() - msg["content-type"] = content_type - media_type = msg.get_content_type() - - if media_type == pattern: - return True - - parts = media_type.split("/") - return len(parts) == 2 and pattern in (f"{parts[0]}/*", f"*/{parts[1]}") - - -def get_field_name(name): - def override(_, _field_name=name): - return _field_name - - return override - - -def _val_to_string(val): - if isinstance(val, bool): - return str(val).lower() - if isinstance(val, datetime): - return val.isoformat().replace("+00:00", "Z") - if isinstance(val, Enum): - return str(val.value) - - return str(val) - - -def _populate_from_globals( - param_name: str, - value: any, - param_type: str, - gbls: Dict[str, Dict[str, Dict[str, Any]]], -): - if value is None and gbls is not None: - if "parameters" in gbls: - if param_type in gbls["parameters"]: - if param_name in gbls["parameters"][param_type]: - global_value = gbls["parameters"][param_type][param_name] - if global_value is not None: - value = global_value - - return value - - -def remove_suffix(input_string, suffix): - if suffix and input_string.endswith(suffix): - return input_string[: -len(suffix)] - return input_string diff --git a/tests/glassflow/conftest.py b/tests/glassflow/conftest.py index 9932a4a..6b49766 100644 --- a/tests/glassflow/conftest.py +++ b/tests/glassflow/conftest.py @@ -1,4 +1,4 @@ from glassflow.api_client import APIClient -# Use staging api server +# Use staging v2 server APIClient.glassflow_config.server_url = "https://staging.api.glassflow.dev/v1" diff --git a/tests/glassflow/integration_tests/client_test.py b/tests/glassflow/integration_tests/client_test.py index 7def0bc..43bb6ee 100644 --- a/tests/glassflow/integration_tests/client_test.py +++ b/tests/glassflow/integration_tests/client_test.py @@ -8,8 +8,21 @@ def test_get_pipeline_ok(client, creating_pipeline): def test_list_pipelines_ok(client, creating_pipeline): res = client.list_pipelines() - assert res.status_code == 200 - assert res.content_type == "application/json" assert res.total_amount >= 1 - assert res.pipelines[-1]["id"] == creating_pipeline.id - assert res.pipelines[-1]["name"] == creating_pipeline.name + assert res.pipelines[-1].id == creating_pipeline.id + assert res.pipelines[-1].name == creating_pipeline.name + + +def test_list_spaces_ok(client, creating_space): + res = client.list_spaces() + + assert res.total_amount >= 1 + assert res.spaces[-1].id == creating_space.id + assert res.spaces[-1].name == creating_space.name + + +def test_list_secrets_ok(client, creating_secret): + res = client.list_secrets() + + assert res.total_amount >= 1 + assert res.secrets[-1].key == creating_secret.key diff --git a/tests/glassflow/integration_tests/conftest.py b/tests/glassflow/integration_tests/conftest.py index 4ec92fb..627227a 100644 --- a/tests/glassflow/integration_tests/conftest.py +++ b/tests/glassflow/integration_tests/conftest.py @@ -8,6 +8,7 @@ Pipeline, PipelineDataSink, PipelineDataSource, + Secret, Space, ) @@ -40,7 +41,7 @@ def space_with_random_id(client): @pytest.fixture -def space_with_random_id_and_invalid_token(client): +def space_with_random_id_and_invalid_token(): return Space( id=str(uuid.uuid4()), personal_access_token="invalid-token", @@ -54,6 +55,13 @@ def pipeline(client, creating_space): space_id=creating_space.id, transformation_file="tests/data/transformation.py", personal_access_token=client.personal_access_token, + metadata={"view_only": True}, + source_kind="google_pubsub", + source_config={ + "project_id": "my-project-id", + "subscription_id": "my-subscription-id", + "credentials_json": "my-credentials.json", + }, ) @@ -84,7 +92,7 @@ def creating_pipeline(pipeline): def source(creating_pipeline): return PipelineDataSource( pipeline_id=creating_pipeline.id, - pipeline_access_token=creating_pipeline.access_tokens[0]["token"], + pipeline_access_token=creating_pipeline.access_tokens[0].token, ) @@ -99,7 +107,7 @@ def source_with_invalid_access_token(creating_pipeline): def source_with_non_existing_id(creating_pipeline): return PipelineDataSource( pipeline_id=str(uuid.uuid4()), - pipeline_access_token=creating_pipeline.access_tokens[0]["token"], + pipeline_access_token=creating_pipeline.access_tokens[0].token, ) @@ -115,3 +123,34 @@ def sink(source_with_published_events): pipeline_id=source_with_published_events.pipeline_id, pipeline_access_token=source_with_published_events.pipeline_access_token, ) + + +@pytest.fixture +def secret(client): + return Secret( + key="SecretKey", + value="SecretValue", + personal_access_token=client.personal_access_token, + ) + + +@pytest.fixture +def creating_secret(secret): + secret.create() + yield secret + secret.delete() + + +@pytest.fixture +def secret_with_invalid_key_and_token(): + return Secret( + key="InvalidSecretKey", + personal_access_token="invalid-token", + ) + + +@pytest.fixture +def secret_with_invalid_key(client): + return Secret( + key="InvalidSecretKey", personal_access_token=client.personal_access_token + ) diff --git a/tests/glassflow/integration_tests/pipeline_data_test.py b/tests/glassflow/integration_tests/pipeline_data_test.py index 1dcf9d3..5fcb5fa 100644 --- a/tests/glassflow/integration_tests/pipeline_data_test.py +++ b/tests/glassflow/integration_tests/pipeline_data_test.py @@ -44,7 +44,7 @@ def test_consume_from_pipeline_data_sink_ok(sink): consume_response = sink.consume() assert consume_response.status_code in (200, 204) if consume_response.status_code == 200: - assert consume_response.json() == { + assert consume_response.event() == { "test_field": "test_value", "new_field": "new_value", } diff --git a/tests/glassflow/integration_tests/pipeline_test.py b/tests/glassflow/integration_tests/pipeline_test.py index 2ac0e6b..12f3766 100644 --- a/tests/glassflow/integration_tests/pipeline_test.py +++ b/tests/glassflow/integration_tests/pipeline_test.py @@ -21,11 +21,14 @@ def test_fetch_pipeline_fail_with_404(pipeline_with_random_id): def test_fetch_pipeline_fail_with_401(pipeline_with_random_id_and_invalid_token): - with pytest.raises(errors.UnauthorizedError): + with pytest.raises(errors.PipelineUnauthorizedError): pipeline_with_random_id_and_invalid_token.fetch() def test_update_pipeline_ok(creating_pipeline): + import time + + time.sleep(1) updated_pipeline = creating_pipeline.update( name="new_name", sink_kind="webhook", @@ -44,12 +47,12 @@ def test_update_pipeline_ok(creating_pipeline): ) assert updated_pipeline.name == "new_name" assert updated_pipeline.sink_kind == "webhook" - assert updated_pipeline.sink_config == { + assert updated_pipeline.sink_config.model_dump(mode="json") == { "url": "www.test-url.com", "method": "GET", "headers": [{"name": "header1", "value": "header1"}], } - assert updated_pipeline.env_vars == [ + assert updated_pipeline.env_vars.model_dump(mode="json") == [ {"name": "env1", "value": "env1"}, {"name": "env2", "value": "env2"}, ] @@ -67,7 +70,7 @@ def test_delete_pipeline_fail_with_404(pipeline_with_random_id): def test_delete_pipeline_fail_with_401(pipeline_with_random_id_and_invalid_token): - with pytest.raises(errors.UnauthorizedError): + with pytest.raises(errors.PipelineUnauthorizedError): pipeline_with_random_id_and_invalid_token.delete() @@ -75,7 +78,7 @@ def test_get_logs_from_pipeline_ok(creating_pipeline): import time n_tries = 0 - max_tries = 10 + max_tries = 20 while True: if n_tries == max_tries: pytest.fail("Max tries reached") @@ -86,19 +89,13 @@ def test_get_logs_from_pipeline_ok(creating_pipeline): else: n_tries += 1 time.sleep(1) - - assert logs.status_code == 200 - assert logs.content_type == "application/json" - assert logs.logs[0].payload.message == "Function is uploaded" - assert logs.logs[0].level == "INFO" - assert logs.logs[1].payload.message == "Pipeline is created" - assert logs.logs[1].level == "INFO" + log_records = [log for log in logs.logs if log.level == "INFO"] + assert log_records[0].payload.message == "Function is uploaded" + assert log_records[1].payload.message == "Pipeline is created" def test_test_pipeline_ok(creating_pipeline): test_message = {"message": "test"} response = creating_pipeline.test(test_message) - assert response.status_code == 200 - assert response.content_type == "application/json" assert response.payload == test_message diff --git a/tests/glassflow/integration_tests/secret_test.py b/tests/glassflow/integration_tests/secret_test.py new file mode 100644 index 0000000..e3e6fd6 --- /dev/null +++ b/tests/glassflow/integration_tests/secret_test.py @@ -0,0 +1,18 @@ +import pytest + +from glassflow import errors + + +def test_create_secret_ok(creating_secret): + assert creating_secret.key == "SecretKey" + assert creating_secret.value == "SecretValue" + + +def test_delete_secret_fail_with_401(secret_with_invalid_key_and_token): + with pytest.raises(errors.SecretUnauthorizedError): + secret_with_invalid_key_and_token.delete() + + +def test_delete_secret_fail_with_404(secret_with_invalid_key): + with pytest.raises(errors.SecretNotFoundError): + secret_with_invalid_key.delete() diff --git a/tests/glassflow/integration_tests/space_test.py b/tests/glassflow/integration_tests/space_test.py index e17278b..27d3fac 100644 --- a/tests/glassflow/integration_tests/space_test.py +++ b/tests/glassflow/integration_tests/space_test.py @@ -14,5 +14,5 @@ def test_delete_space_fail_with_404(space_with_random_id): def test_delete_space_fail_with_401(space_with_random_id_and_invalid_token): - with pytest.raises(errors.UnauthorizedError): + with pytest.raises(errors.SpaceUnauthorizedError): space_with_random_id_and_invalid_token.delete() diff --git a/tests/glassflow/unit_tests/client_test.py b/tests/glassflow/unit_tests/client_test.py index 6363263..e62eb3b 100644 --- a/tests/glassflow/unit_tests/client_test.py +++ b/tests/glassflow/unit_tests/client_test.py @@ -31,10 +31,8 @@ def test_list_pipelines_ok(requests_mock, list_pipelines_response, client): res = client.list_pipelines() - assert res.status_code == 200 - assert res.content_type == "application/json" assert res.total_amount == list_pipelines_response["total_amount"] - assert res.pipelines == list_pipelines_response["pipelines"] + assert res.pipelines[0].name == list_pipelines_response["pipelines"][0]["name"] def test_list_pipelines_fail_with_401(requests_mock, client): diff --git a/tests/glassflow/unit_tests/conftest.py b/tests/glassflow/unit_tests/conftest.py index a24d3cf..c00b6cd 100644 --- a/tests/glassflow/unit_tests/conftest.py +++ b/tests/glassflow/unit_tests/conftest.py @@ -1,6 +1,7 @@ import pytest from glassflow import GlassFlowClient +from glassflow.models import api @pytest.fixture @@ -12,7 +13,7 @@ def client(): def get_pipeline_request_mock(client, requests_mock, fetch_pipeline_response): return requests_mock.get( client.glassflow_config.server_url + "/pipelines/test-id", - json=fetch_pipeline_response, + json=fetch_pipeline_response.model_dump(mode="json"), status_code=200, headers={"Content-Type": "application/json"}, ) @@ -24,7 +25,7 @@ def get_access_token_request_mock( ): return requests_mock.get( client.glassflow_config.server_url - + f"/pipelines/{fetch_pipeline_response['id']}/access_tokens", + + f"/pipelines/{fetch_pipeline_response.id}/access_tokens", json=access_tokens_response, status_code=200, headers={"Content-Type": "application/json"}, @@ -37,7 +38,7 @@ def get_pipeline_function_source_request_mock( ): return requests_mock.get( client.glassflow_config.server_url - + f"/pipelines/{fetch_pipeline_response['id']}/functions/main/artifacts/latest", + + f"/pipelines/{fetch_pipeline_response.id}/functions/main/artifacts/latest", json=function_source_response, status_code=200, headers={"Content-Type": "application/json"}, @@ -49,9 +50,8 @@ def update_pipeline_request_mock( client, requests_mock, fetch_pipeline_response, update_pipeline_response ): return requests_mock.patch( - client.glassflow_config.server_url - + f"/pipelines/{fetch_pipeline_response['id']}", - json=update_pipeline_response, + client.glassflow_config.server_url + f"/pipelines/{fetch_pipeline_response.id}", + json=update_pipeline_response.model_dump(mode="json"), status_code=200, headers={"Content-Type": "application/json"}, ) @@ -59,41 +59,43 @@ def update_pipeline_request_mock( @pytest.fixture def fetch_pipeline_response(): - return { - "id": "test-id", - "name": "test-name", - "space_id": "test-space-id", - "metadata": {}, - "created_at": "2024-09-23T10:08:45.529Z", - "state": "running", - "space_name": "test-space-name", - "source_connector": { - "kind": "google_pubsub", - "config": { - "project_id": "test-project", - "subscription_id": "test-subscription", - "credentials_json": "credentials.json", + return api.GetDetailedSpacePipeline( + **{ + "id": "test-id", + "name": "test-name", + "space_id": "test-space-id", + "metadata": {}, + "created_at": "2024-09-23T10:08:45.529Z", + "state": "running", + "space_name": "test-space-name", + "source_connector": { + "kind": "google_pubsub", + "config": { + "project_id": "test-project", + "subscription_id": "test-subscription", + "credentials_json": "credentials.json", + }, }, - }, - "sink_connector": { - "kind": "webhook", - "config": { - "url": "www.test-url.com", - "method": "GET", - "headers": [ - {"name": "header1", "value": "header1"}, - {"name": "header2", "value": "header2"}, - ], + "sink_connector": { + "kind": "webhook", + "config": { + "url": "www.test-url.com", + "method": "GET", + "headers": [ + {"name": "header1", "value": "header1"}, + {"name": "header2", "value": "header2"}, + ], + }, }, - }, - "environments": [{"test-var": "test-var"}], - } + "environments": [{"name": "test-var", "value": "test-var"}], + } + ) @pytest.fixture def update_pipeline_response(fetch_pipeline_response): - fetch_pipeline_response["name"] = "updated name" - fetch_pipeline_response["source_connector"] = None + fetch_pipeline_response.name = "updated name" + fetch_pipeline_response.source_connector = api.SourceConnector(root=None) return fetch_pipeline_response @@ -186,3 +188,10 @@ def test_pipeline_response(): "error_details": "Error message", "stack_trace": "Error Stack trace", } + + +@pytest.fixture +def create_secret_response(): + return { + "name": "test-name", + } diff --git a/tests/glassflow/unit_tests/pipeline_data_test.py b/tests/glassflow/unit_tests/pipeline_data_test.py index bc8bd79..8e3ae83 100644 --- a/tests/glassflow/unit_tests/pipeline_data_test.py +++ b/tests/glassflow/unit_tests/pipeline_data_test.py @@ -55,7 +55,6 @@ def test_push_to_pipeline_data_source_ok(requests_mock): res = source.publish({"test": "test"}) assert res.status_code == 200 - assert res.content_type == "application/json" def test_push_to_pipeline_data_source_fail_with_404(requests_mock): @@ -113,8 +112,7 @@ def test_consume_from_pipeline_data_sink_ok(requests_mock, consume_payload): res = sink.consume() assert res.status_code == 200 - assert res.content_type == "application/json" - assert res.body.req_id == consume_payload["req_id"] + assert res.body.event_context.request_id == consume_payload["req_id"] def test_consume_from_pipeline_data_sink_fail_with_404(requests_mock): @@ -175,8 +173,7 @@ def test_consume_from_pipeline_data_sink_ok_with_empty_response(requests_mock): res = sink.consume() assert res.status_code == 204 - assert res.content_type == "application/json" - assert res.body.event == {} + assert res.body is None def test_consume_from_pipeline_data_sink_ok_with_too_many_requests(requests_mock): @@ -197,8 +194,7 @@ def test_consume_from_pipeline_data_sink_ok_with_too_many_requests(requests_mock res = sink.consume() assert res.status_code == 429 - assert res.content_type == "application/json" - assert res.body.event == {} + assert res.body is None def test_consume_failed_from_pipeline_data_sink_ok(requests_mock, consume_payload): @@ -220,8 +216,7 @@ def test_consume_failed_from_pipeline_data_sink_ok(requests_mock, consume_payloa res = sink.consume_failed() assert res.status_code == 200 - assert res.content_type == "application/json" - assert res.body.req_id == consume_payload["req_id"] + assert res.body.event_context.request_id == consume_payload["req_id"] def test_consume_failed_from_pipeline_data_sink_ok_with_empty_response(requests_mock): @@ -242,8 +237,7 @@ def test_consume_failed_from_pipeline_data_sink_ok_with_empty_response(requests_ res = sink.consume_failed() assert res.status_code == 204 - assert res.content_type == "application/json" - assert res.body.event == {} + assert res.body is None def test_consume_failed_from_pipeline_data_sink_ok_with_too_many_requests( @@ -266,8 +260,7 @@ def test_consume_failed_from_pipeline_data_sink_ok_with_too_many_requests( res = sink.consume_failed() assert res.status_code == 429 - assert res.content_type == "application/json" - assert res.body.event == {} + assert res.body is None def test_consume_failed_from_pipeline_data_sink_fail_with_404(requests_mock): diff --git a/tests/glassflow/unit_tests/pipeline_test.py b/tests/glassflow/unit_tests/pipeline_test.py index 2b95072..78cb410 100644 --- a/tests/glassflow/unit_tests/pipeline_test.py +++ b/tests/glassflow/unit_tests/pipeline_test.py @@ -24,24 +24,27 @@ def test_pipeline_fail_with_file_not_found(): p._read_transformation_file() -def test_pipeline_fail_with_missing_sink_data(): - with pytest.raises(ValueError) as e: +def test_pipeline_fail_with_connection_config_value_error(): + with pytest.raises(errors.ConnectorConfigValueError): Pipeline( transformation_file="tests/data/transformation.py", personal_access_token="test-token", - sink_kind="google_pubsub", + sink_kind="webhook", ) - assert str(e.value) == "Both sink_kind and sink_config must be provided" - -def test_pipeline_fail_with_missing_source_data(): - with pytest.raises(ValueError) as e: + with pytest.raises(errors.ConnectorConfigValueError): Pipeline( transformation_file="tests/data/transformation.py", personal_access_token="test-token", source_kind="google_pubsub", ) - assert str(e.value) == "Both source_kind and source_config must be provided" + + with pytest.raises(errors.ConnectorConfigValueError): + Pipeline( + transformation_file="tests/data/transformation.py", + personal_access_token="test-token", + source_config={"url": "test-url"}, + ) def test_fetch_pipeline_ok( @@ -52,11 +55,11 @@ def test_fetch_pipeline_ok( function_source_response, ): pipeline = Pipeline( - id=fetch_pipeline_response["id"], + id=fetch_pipeline_response.id, personal_access_token="test-token", ).fetch() - assert pipeline.name == fetch_pipeline_response["name"] + assert pipeline.name == fetch_pipeline_response.name assert len(pipeline.access_tokens) > 0 assert ( pipeline.transformation_code @@ -68,14 +71,14 @@ def test_fetch_pipeline_ok( def test_fetch_pipeline_fail_with_404(requests_mock, fetch_pipeline_response, client): requests_mock.get( client.glassflow_config.server_url + "/pipelines/test-id", - json=fetch_pipeline_response, + json=fetch_pipeline_response.model_dump(mode="json"), status_code=404, headers={"Content-Type": "application/json"}, ) with pytest.raises(errors.PipelineNotFoundError): Pipeline( - id=fetch_pipeline_response["id"], + id=fetch_pipeline_response.id, personal_access_token="test-token", ).fetch() @@ -83,14 +86,14 @@ def test_fetch_pipeline_fail_with_404(requests_mock, fetch_pipeline_response, cl def test_fetch_pipeline_fail_with_401(requests_mock, fetch_pipeline_response, client): requests_mock.get( client.glassflow_config.server_url + "/pipelines/test-id", - json=fetch_pipeline_response, + json=fetch_pipeline_response.model_dump(mode="json"), status_code=401, headers={"Content-Type": "application/json"}, ) - with pytest.raises(errors.UnauthorizedError): + with pytest.raises(errors.PipelineUnauthorizedError): Pipeline( - id=fetch_pipeline_response["id"], + id=fetch_pipeline_response.id, personal_access_token="test-token", ).fetch() @@ -105,7 +108,7 @@ def test_create_pipeline_ok( headers={"Content-Type": "application/json"}, ) pipeline = Pipeline( - name=fetch_pipeline_response["name"], + name=fetch_pipeline_response.name, space_id=create_pipeline_response["space_id"], transformation_file="tests/data/transformation.py", personal_access_token="test-token", @@ -124,7 +127,7 @@ def test_create_pipeline_fail_with_missing_name(client): ).create() assert e.value.__str__() == ( - "Name must be provided in order to " "create the pipeline" + "Name must be provided in order to create the pipeline" ) @@ -166,8 +169,8 @@ def test_update_pipeline_ok( .update() ) - assert pipeline.name == update_pipeline_response["name"] - assert pipeline.source_connector == update_pipeline_response["source_connector"] + assert pipeline.name == update_pipeline_response.name + assert pipeline.source_connector == update_pipeline_response.source_connector def test_delete_pipeline_ok(requests_mock, client): @@ -199,7 +202,7 @@ def test_get_source_from_pipeline_ok( get_pipeline_function_source_request_mock, access_tokens_response, ): - p = client.get_pipeline(fetch_pipeline_response["id"]) + p = client.get_pipeline(fetch_pipeline_response.id) source = p.get_source() source2 = p.get_source(pipeline_access_token_name="token2") @@ -232,7 +235,7 @@ def test_get_sink_from_pipeline_ok( get_pipeline_function_source_request_mock, access_tokens_response, ): - p = client.get_pipeline(fetch_pipeline_response["id"]) + p = client.get_pipeline(fetch_pipeline_response.id) sink = p.get_sink() sink2 = p.get_sink(pipeline_access_token_name="token2") @@ -261,8 +264,6 @@ def test_get_logs_from_pipeline_ok(client, requests_mock, get_logs_response): pipeline = Pipeline(id=pipeline_id, personal_access_token="test-token") logs = pipeline.get_logs(page_size=50, severity_code=100) - assert logs.status_code == 200 - assert logs.content_type == "application/json" assert logs.next == get_logs_response["next"] for idx, log in enumerate(logs.logs): assert log.level == get_logs_response["logs"][idx]["level"] @@ -284,8 +285,9 @@ def test_test_pipeline_ok(client, requests_mock, test_pipeline_response): pipeline = Pipeline(id=pipeline_id, personal_access_token="test-token") response = pipeline.test(test_pipeline_response["payload"]) - assert response.status_code == 200 - assert response.content_type == "application/json" - assert response.event_context.to_dict() == test_pipeline_response["event_context"] + assert ( + response.event_context.external_id + == test_pipeline_response["event_context"]["external_id"] + ) assert response.status == test_pipeline_response["status"] assert response.response == test_pipeline_response["response"] diff --git a/tests/glassflow/unit_tests/secret_test.py b/tests/glassflow/unit_tests/secret_test.py new file mode 100644 index 0000000..d428e07 --- /dev/null +++ b/tests/glassflow/unit_tests/secret_test.py @@ -0,0 +1,41 @@ +import pytest + +from glassflow import Secret, errors + + +def test_create_secret_ok(requests_mock, client): + requests_mock.post( + client.glassflow_config.server_url + "/secrets", + status_code=201, + headers={"Content-Type": "application/json"}, + ) + Secret( + key="SecretKey", value="SecretValue", personal_access_token="test-token" + ).create() + + +def test_create_secret_fail_with_invalid_key_error(client): + with pytest.raises(errors.SecretInvalidKeyError): + Secret( + key="secret-key", value="secret-value", personal_access_token="test-token" + ) + + +def test_create_secret_fail_with_value_error(client): + with pytest.raises(ValueError): + Secret(personal_access_token="test-token").create() + + +def test_delete_secret_ok(requests_mock, client): + secret_key = "SecretKey" + requests_mock.delete( + client.glassflow_config.server_url + f"/secrets/{secret_key}", + status_code=204, + headers={"Content-Type": "application/json"}, + ) + Secret(key=secret_key, personal_access_token="test-token").delete() + + +def test_delete_secret_fail_with_value_error(client): + with pytest.raises(ValueError): + Secret(personal_access_token="test-token").delete() diff --git a/tests/glassflow/unit_tests/space_test.py b/tests/glassflow/unit_tests/space_test.py index db76510..b4e5470 100644 --- a/tests/glassflow/unit_tests/space_test.py +++ b/tests/glassflow/unit_tests/space_test.py @@ -1,3 +1,5 @@ +from datetime import datetime + import pytest from glassflow import Space @@ -16,14 +18,11 @@ def test_create_space_ok(requests_mock, create_space_response, client): assert space.name == create_space_response["name"] assert space.id == create_space_response["id"] - assert space.created_at == create_space_response["created_at"] - -def test_create_space_fail_with_missing_name(client): - with pytest.raises(ValueError) as e: - Space(personal_access_token="test-token").create() - - assert str(e.value) == ("Name must be provided in order to create the space") + parsed_response_space_created_at = datetime.strptime( + create_space_response["created_at"], "%Y-%m-%dT%H:%M:%S.%fZ" + ) + assert space.created_at.replace(tzinfo=None) == parsed_response_space_created_at def test_delete_space_ok(requests_mock, client):