Skip to content

Connector secrets on pipelines #42

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ def glassflow():
pass


@click.command()
@click.command(name="help")
@click.argument("command", required=False)
def help(command):
def help_command(command):
"""Displays help information about Glassflow CLI and its commands."""

commands = {
Expand All @@ -37,7 +37,7 @@ def help(command):

# Add commands to CLI group
glassflow.add_command(get_started)
glassflow.add_command(help)
glassflow.add_command(help_command)

if __name__ == "__main__":
glassflow()
6 changes: 6 additions & 0 deletions src/glassflow/api_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import sys
from typing import Any

import requests as requests_http

Expand Down Expand Up @@ -59,3 +60,8 @@ def _request(
except requests_http.HTTPError as http_err:
raise errors.UnknownError(http_err.response) from http_err
return http_res

def __eq__(self, other: Any) -> bool:
vars_self = {k: v for k, v in vars(self).items() if k != "client"}
vars_other = {k: v for k, v in vars(other).items() if k != "client"}
return (type(self), vars_self) == (type(other), vars_other)
6 changes: 3 additions & 3 deletions src/glassflow/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from .api_client import APIClient
from .models import errors, responses
from .pipeline import Pipeline
from .pipeline import ConnectorConfiguration, Pipeline
from .secret import Secret
from .space import Space

Expand Down Expand Up @@ -65,9 +65,9 @@ def create_pipeline(
transformation_file: str = None,
requirements: str = None,
source_kind: str = None,
source_config: dict = None,
source_config: ConnectorConfiguration = None,
sink_kind: str = None,
sink_config: dict = None,
sink_config: ConnectorConfiguration = None,
env_vars: list[dict[str, str]] = None,
state: str = "running",
metadata: dict = None,
Expand Down
4 changes: 4 additions & 0 deletions src/glassflow/models/api/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from .api import (
ConnectorValueSecretRef,
ConnectorValueValue,
ConsumeOutputEvent,
CreatePipeline,
CreateSecret,
Expand Down Expand Up @@ -32,4 +34,6 @@
"ListSpaceScopes",
"ListPipelines",
"FunctionEnvironments",
"ConnectorValueSecretRef",
"ConnectorValueValue",
]
49 changes: 44 additions & 5 deletions src/glassflow/models/api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@ class CreateOrganization(BaseModel):
name: str


class PatchOrganization(BaseModel):
name: str


class Organization(CreateOrganization):
id: str
created_at: datetime


class OrganizationScope(Organization):
Expand Down Expand Up @@ -50,6 +55,28 @@ class CreateSecret(BaseModel):
value: str


class CreateInvite(BaseModel):
email: str


class Invite(CreateInvite):
id: str


class Invites(RootModel[list[Invite]]):
root: list[Invite]


class OrganizationMember(BaseModel):
id: str
name: str
email: str


class OrganizationMembers(RootModel[list[OrganizationMember]]):
root: list[OrganizationMember]


class SignUp(BaseModel):
access_token: str
id_token: str
Expand Down Expand Up @@ -341,6 +368,23 @@ class Secret(BaseModel):
key: SecretKey


class CreateInvites(BaseModel):
invites: list[CreateInvite]


class ListOrganizationInvites(PaginationResponse):
items: Invites


class ListOrganizationMembers(PaginationResponse):
items: OrganizationMembers


class ClientHeader1(BaseModel):
name: str
value: ConnectorValueValue


class ListPipelines(PaginationResponse):
pipelines: SpacePipelines

Expand Down Expand Up @@ -497,11 +541,6 @@ class SinkConnectorSnowflakeCDCJSON(BaseModel):
configuration: Optional[Configuration6] = None


class ClientHeader1(BaseModel):
name: str
value: ConnectorValue


class Configuration7(BaseModel):
api_key: ConnectorValue
api_host: ConnectorValue
Expand Down
40 changes: 33 additions & 7 deletions src/glassflow/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
from __future__ import annotations

from typing import Union

from typing_extensions import TypeAlias

from .client import APIClient
from .models import api, errors, operations, responses
from .models.responses.pipeline import AccessToken
from .pipeline_data import PipelineDataSink, PipelineDataSource
from .secret import Secret

ConnectorConfiguration: TypeAlias = dict[str, Union[str, list, Secret]]


class Pipeline(APIClient):
Expand All @@ -14,9 +21,9 @@ def __init__(
space_id: str | None = None,
id: str | None = None,
source_kind: str | None = None,
source_config: dict | None = None,
source_config: ConnectorConfiguration | None = None,
sink_kind: str | None = None,
sink_config: dict | None = None,
sink_config: ConnectorConfiguration | None = None,
requirements: str | None = None,
transformation_file: str | None = None,
env_vars: list[dict[str, str]] | None = None,
Expand Down Expand Up @@ -182,9 +189,9 @@ def update(
requirements: str | None = None,
metadata: dict | None = None,
source_kind: str | None = None,
source_config: dict | None = None,
source_config: ConnectorConfiguration | None = None,
sink_kind: str | None = None,
sink_config: dict | None = None,
sink_config: ConnectorConfiguration | None = None,
env_vars: list[dict[str, str]] | None = None,
) -> Pipeline:
"""
Expand Down Expand Up @@ -405,15 +412,19 @@ def _request(
) from e
raise e

@staticmethod
def _fill_connector(
connector_type: str, kind: str, config: dict
self, connector_type: str, kind: str, config: dict
) -> api.SourceConnector | api.SinkConnector:
"""Format connector input"""
if not kind and not config:
connector = None
elif kind and config:
connector = dict(kind=kind, config=config)
connector = dict(
kind=kind,
configuration={
k: self._format_connector_config_value(v) for k, v in config.items()
},
)
else:
raise errors.ConnectorConfigValueError(connector_type)

Expand All @@ -424,6 +435,21 @@ def _fill_connector(
else:
raise ValueError("connector_type must be 'source' or 'sink'")

@staticmethod
def _format_connector_config_value(value: str | list | Secret):
"""Formats configuration values to match API expectations"""
if isinstance(value, Secret):
config_value = api.ConnectorValueSecretRef(
**{"secret_ref": {"type": "organization", "key": value.key}}
)
elif isinstance(value, list):
config_value = value
else:
config_value = api.ConnectorValueValue(
value=value,
)
return config_value

def _list_access_tokens(self) -> Pipeline:
endpoint = f"/pipelines/{self.id}/access_tokens"
http_res = self._request(method="GET", endpoint=endpoint)
Expand Down
66 changes: 33 additions & 33 deletions tests/glassflow/integration_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,38 @@ def space_with_random_id_and_invalid_token():


@pytest.fixture
def pipeline(client, creating_space):
def secret(client):
return Secret(
key="SecretKey",
value="SecretValue",
personal_access_token=client.personal_access_token,
)


@pytest.fixture
def creating_secret(secret):
secret.create()
yield secret
secret.delete()


@pytest.fixture
def secret_with_invalid_key_and_token():
return Secret(
key="InvalidSecretKey",
personal_access_token="invalid-token",
)


@pytest.fixture
def secret_with_invalid_key(client):
return Secret(
key="InvalidSecretKey", personal_access_token=client.personal_access_token
)


@pytest.fixture
def pipeline(client, creating_space, creating_secret):
return Pipeline(
name="test_pipeline",
space_id=creating_space.id,
Expand All @@ -60,7 +91,7 @@ def pipeline(client, creating_space):
source_config={
"project_id": "my-project-id",
"subscription_id": "my-subscription-id",
"credentials_json": "my-credentials.json",
"credentials_json": creating_secret,
},
)

Expand Down Expand Up @@ -123,34 +154,3 @@ def sink(source_with_published_events):
pipeline_id=source_with_published_events.pipeline_id,
pipeline_access_token=source_with_published_events.pipeline_access_token,
)


@pytest.fixture
def secret(client):
return Secret(
key="SecretKey",
value="SecretValue",
personal_access_token=client.personal_access_token,
)


@pytest.fixture
def creating_secret(secret):
secret.create()
yield secret
secret.delete()


@pytest.fixture
def secret_with_invalid_key_and_token():
return Secret(
key="InvalidSecretKey",
personal_access_token="invalid-token",
)


@pytest.fixture
def secret_with_invalid_key(client):
return Secret(
key="InvalidSecretKey", personal_access_token=client.personal_access_token
)
68 changes: 67 additions & 1 deletion tests/glassflow/unit_tests/client_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pytest

from glassflow.models import errors
from glassflow import Pipeline, Secret, Space, errors


@pytest.fixture
Expand Down Expand Up @@ -44,3 +44,69 @@ def test_list_pipelines_fail_with_401(requests_mock, client):

with pytest.raises(errors.UnauthorizedError):
client.list_pipelines()


def test_create_pipeline_ok(requests_mock, client, create_pipeline_response):
requests_mock.post(
client.glassflow_config.server_url + "/pipelines",
json=create_pipeline_response,
status_code=200,
headers={"Content-Type": "application/json"},
)

pipeline_args = dict(
name=create_pipeline_response["name"],
space_id=create_pipeline_response["space_id"],
transformation_file="tests/data/transformation.py",
source_kind="google_pubsub",
source_config={
"project_id": "test-project-id",
"subscription_id": "test-subscription-id",
"credentials_json": Secret(
key="testCredentials",
personal_access_token=client.personal_access_token,
),
},
)
p1 = client.create_pipeline(**pipeline_args)
p2 = Pipeline(
personal_access_token=client.personal_access_token, **pipeline_args
).create()

assert p1 == p2


def test_create_space_ok(requests_mock, client, create_space_response):
requests_mock.post(
client.glassflow_config.server_url + "/spaces",
json=create_space_response,
status_code=200,
headers={"Content-Type": "application/json"},
)

s1 = client.create_space(name=create_space_response["name"])
s2 = Space(
personal_access_token=client.personal_access_token,
name=create_space_response["name"],
).create()

assert s1 == s2


def test_create_secret_ok(requests_mock, client):
requests_mock.post(
client.glassflow_config.server_url + "/secrets",
status_code=201,
headers={"Content-Type": "application/json"},
)

secret_args = {
"key": "testKey",
"value": "test-value",
}
s1 = client.create_secret(**secret_args)
s2 = Secret(
personal_access_token=client.personal_access_token, **secret_args
).create()

assert s1 == s2
Loading