Skip to content

Commit ac830af

Browse files
authored
Add optional connector connect success and failure callbacks (#708)
Add optional connector connect success and failure callbacks
1 parent 3b2b28e commit ac830af

37 files changed

+753
-183
lines changed

quixstreams/processing/context.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ def on_partition_revoke(self, topic: str, partition: int):
101101
self.pausing_manager.revoke(topic=topic, partition=partition)
102102

103103
def __enter__(self):
104+
self.sink_manager.start_sinks()
104105
return self
105106

106107
def __exit__(self, exc_type, exc_val, exc_tb):

quixstreams/sinks/__init__.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,19 @@
1-
from .base import BaseSink, BatchingSink, SinkBackpressureError, SinkBatch, SinkManager
1+
from .base import (
2+
BaseSink,
3+
BatchingSink,
4+
ClientConnectFailureCallback,
5+
ClientConnectSuccessCallback,
6+
SinkBackpressureError,
7+
SinkBatch,
8+
SinkManager,
9+
)
210

311
__all__ = [
412
"BaseSink",
513
"BatchingSink",
614
"SinkBackpressureError",
715
"SinkBatch",
816
"SinkManager",
17+
"ClientConnectSuccessCallback",
18+
"ClientConnectFailureCallback",
919
]

quixstreams/sinks/base/__init__.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,19 @@
11
from .batch import SinkBatch
22
from .exceptions import SinkBackpressureError
33
from .manager import SinkManager
4-
from .sink import BaseSink, BatchingSink
4+
from .sink import (
5+
BaseSink,
6+
BatchingSink,
7+
ClientConnectFailureCallback,
8+
ClientConnectSuccessCallback,
9+
)
510

611
__all__ = (
712
"SinkBatch",
813
"SinkBackpressureError",
914
"SinkManager",
1015
"BatchingSink",
1116
"BaseSink",
17+
"ClientConnectSuccessCallback",
18+
"ClientConnectFailureCallback",
1219
)

quixstreams/sinks/base/manager.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ def register(self, sink: BaseSink):
1414
if sink_id not in self._sinks:
1515
self._sinks[id(sink)] = sink
1616

17+
def start_sinks(self):
18+
for sink in self.sinks:
19+
sink.start()
20+
1721
@property
1822
def sinks(self) -> List[BaseSink]:
1923
return list(self._sinks.values())

quixstreams/sinks/base/sink.py

Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,25 @@
11
import abc
22
import logging
3-
from typing import Any, Dict, Tuple
3+
from typing import Any, Callable, Dict, Optional, Tuple
44

55
from quixstreams.models import HeadersTuples
66
from quixstreams.sinks.base.batch import SinkBatch
77

88
logger = logging.getLogger(__name__)
99

1010

11+
ClientConnectSuccessCallback = Callable[[], None]
12+
ClientConnectFailureCallback = Callable[[Optional[Exception]], None]
13+
14+
15+
def _default_on_client_connect_success():
16+
logger.info("CONNECTED!")
17+
18+
19+
def _default_on_client_connect_failure(exception: Exception):
20+
logger.error(f"ERROR: Failed while connecting to client: {exception}")
21+
22+
1123
class BaseSink(abc.ABC):
1224
"""
1325
This is a base class for all sinks.
@@ -17,6 +29,26 @@ class BaseSink(abc.ABC):
1729
Note that Sinks are currently in beta, and their design may change over time.
1830
"""
1931

32+
def __init__(
33+
self,
34+
on_client_connect_success: Optional[ClientConnectSuccessCallback] = None,
35+
on_client_connect_failure: Optional[ClientConnectFailureCallback] = None,
36+
):
37+
"""
38+
:param on_client_connect_success: An optional callback made after successful
39+
client authentication, primarily for additional logging.
40+
:param on_client_connect_failure: An optional callback made after failed
41+
client authentication (which should raise an Exception).
42+
Callback should accept the raised Exception as an argument.
43+
Callback must resolve (or propagate/re-raise) the Exception.
44+
"""
45+
self._on_client_connect_success = (
46+
on_client_connect_success or _default_on_client_connect_success
47+
)
48+
self._on_client_connect_failure = (
49+
on_client_connect_failure or _default_on_client_connect_failure
50+
)
51+
2052
@abc.abstractmethod
2153
def flush(self, topic: str, partition: int):
2254
"""
@@ -48,6 +80,24 @@ def add(
4880
on flush().
4981
"""
5082

83+
@abc.abstractmethod
84+
def setup(self):
85+
"""
86+
When applicable, set up the client here along with any validation to affirm a
87+
valid/successful authentication/connection.
88+
"""
89+
90+
def start(self):
91+
"""
92+
Called as part of `Application.run()` to initialize the sink's client.
93+
Allows using a callback pattern around the connection attempt.
94+
"""
95+
try:
96+
self.setup()
97+
self._on_client_connect_success()
98+
except Exception as e:
99+
self._on_client_connect_failure(e)
100+
51101
def on_paused(self, topic: str, partition: int):
52102
"""
53103
This method is triggered when the sink is paused due to backpressure, when
@@ -60,7 +110,7 @@ def on_paused(self, topic: str, partition: int):
60110
class BatchingSink(BaseSink):
61111
"""
62112
A base class for batching sinks, that need to accumulate the data first before
63-
sending it to the external destinatios.
113+
sending it to the external destinations.
64114
65115
Examples: databases, objects stores, and other destinations where
66116
writing every message is not optimal.
@@ -73,7 +123,23 @@ class BatchingSink(BaseSink):
73123

74124
_batches: Dict[Tuple[str, int], SinkBatch]
75125

76-
def __init__(self):
126+
def __init__(
127+
self,
128+
on_client_connect_success: Optional[ClientConnectSuccessCallback] = None,
129+
on_client_connect_failure: Optional[ClientConnectFailureCallback] = None,
130+
):
131+
"""
132+
:param on_client_connect_success: An optional callback made after successful
133+
client authentication, primarily for additional logging.
134+
:param on_client_connect_failure: An optional callback made after failed
135+
client authentication (which should raise an Exception).
136+
Callback should accept the raised Exception as an argument.
137+
Callback must resolve (or propagate/re-raise) the Exception.
138+
"""
139+
super().__init__(
140+
on_client_connect_success=on_client_connect_success,
141+
on_client_connect_failure=on_client_connect_failure,
142+
)
77143
self._batches = {}
78144

79145
def __repr__(self):

quixstreams/sinks/community/bigquery.py

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@
33
import time
44
from datetime import date, datetime
55
from decimal import Decimal
6-
from typing import Any, Mapping
7-
8-
from typing_extensions import Optional
6+
from typing import Any, Mapping, Optional
97

108
try:
119
from google.cloud import bigquery
@@ -19,7 +17,12 @@
1917

2018
from quixstreams.exceptions import QuixException
2119
from quixstreams.models import HeadersTuples
22-
from quixstreams.sinks import BatchingSink, SinkBatch
20+
from quixstreams.sinks import (
21+
BatchingSink,
22+
ClientConnectFailureCallback,
23+
ClientConnectSuccessCallback,
24+
SinkBatch,
25+
)
2326

2427
__all__ = ("BigQuerySink", "BigQuerySinkException")
2528

@@ -62,6 +65,8 @@ def __init__(
6265
ddl_timeout: float = 10.0,
6366
insert_timeout: float = 10.0,
6467
retry_timeout: float = 30.0,
68+
on_client_connect_success: Optional[ClientConnectSuccessCallback] = None,
69+
on_client_connect_failure: Optional[ClientConnectFailureCallback] = None,
6570
**kwargs,
6671
):
6772
"""
@@ -101,10 +106,19 @@ def __init__(
101106
:param retry_timeout: a total timeout for each request to BigQuery API.
102107
During this timeout, a request can be retried according
103108
to the client's default retrying policy.
109+
:param on_client_connect_success: An optional callback made after successful
110+
client authentication, primarily for additional logging.
111+
:param on_client_connect_failure: An optional callback made after failed
112+
client authentication (which should raise an Exception).
113+
Callback should accept the raised Exception as an argument.
114+
Callback must resolve (or propagate/re-raise) the Exception.
104115
:param kwargs: Additional keyword arguments passed to `bigquery.Client`.
105116
"""
117+
super().__init__(
118+
on_client_connect_success=on_client_connect_success,
119+
on_client_connect_failure=on_client_connect_failure,
120+
)
106121

107-
super().__init__()
108122
self.location = location
109123
self.project_id = project_id
110124
self.dataset_id = f"{self.project_id}.{dataset_id}"
@@ -123,12 +137,16 @@ def __init__(
123137
scopes=["https://www.googleapis.com/auth/bigquery"],
124138
)
125139
kwargs["credentials"] = credentials
126-
127-
self._client = bigquery.Client(**kwargs)
128-
logger.info("Successfully authenticated to BigQuery.")
129-
if self.schema_auto_update:
130-
# Initialize a table in BigQuery if it doesn't exist already
131-
self._init_table()
140+
self._client: Optional[bigquery.Client] = None
141+
self._client_settings = kwargs
142+
143+
def setup(self):
144+
if not self._client:
145+
self._client = bigquery.Client(**self._client_settings)
146+
logger.info("Successfully authenticated to BigQuery.")
147+
if self.schema_auto_update:
148+
# Initialize a table in BigQuery if it doesn't exist already
149+
self._init_table()
132150

133151
def write(self, batch: SinkBatch):
134152
rows = []

quixstreams/sinks/community/file/destinations/azure.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import logging
2+
from typing import Optional
23

34
from quixstreams.sinks import SinkBatch
45
from quixstreams.sinks.community.file.destinations.base import Destination
@@ -51,17 +52,17 @@ def __init__(
5152
:raises AzureContainerAccessDeniedError: If access to the container is denied.
5253
"""
5354
self._container = container
54-
self._client = self._get_client(connection_string)
55-
self._validate_container()
55+
self._auth = connection_string
56+
self._client: Optional[ContainerClient] = None
5657

57-
def _get_client(self, auth: str) -> ContainerClient:
58+
def _get_client(self) -> ContainerClient:
5859
"""
5960
Get an Azure file container client and validate the container exists.
6061
6162
:param auth: Azure client authentication string.
6263
:return: An Azure ContainerClient
6364
"""
64-
storage_client = BlobServiceClient.from_connection_string(auth)
65+
storage_client = BlobServiceClient.from_connection_string(self._auth)
6566
container_client = storage_client.get_container_client(self._container)
6667
return container_client
6768

@@ -85,6 +86,11 @@ def _validate_container(self) -> None:
8586
raise
8687
raise AzureContainerNotFoundError(f"Container not found: {self._container}")
8788

89+
def setup(self):
90+
if not self._client:
91+
self._client = self._get_client()
92+
self._validate_container()
93+
8894
def write(self, data: bytes, batch: SinkBatch) -> None:
8995
"""
9096
Write data to Azure.

quixstreams/sinks/community/file/destinations/base.py

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
logger = logging.getLogger(__name__)
1212

13-
_UNSAFE_CHARACTERS_REGEX = re.compile(r"[^a-zA-Z0-9 ._]")
13+
_UNSAFE_CHARACTERS_REGEX = re.compile(r"[^a-zA-Z0-9 ._/]")
1414

1515

1616
class Destination(ABC):
@@ -25,14 +25,33 @@ class Destination(ABC):
2525
_base_directory: str = ""
2626
_extension: str = ""
2727

28-
def set_directory(self, directory: str) -> None:
28+
@abstractmethod
29+
def setup(self):
30+
"""Authenticate and validate connection here"""
31+
...
32+
33+
@abstractmethod
34+
def write(self, data: bytes, batch: SinkBatch) -> None:
35+
"""Write the serialized data to storage.
36+
37+
:param data: The serialized data to write.
38+
:param batch: The batch information containing topic, partition and offset
39+
details.
40+
"""
41+
...
42+
43+
def set_directory(
44+
self,
45+
directory: str,
46+
) -> None:
2947
"""Configure the base directory for storing files.
3048
3149
:param directory: The base directory path where files will be stored.
3250
:raises ValueError: If the directory path contains invalid characters.
33-
Only alphanumeric characters (a-zA-Z0-9), spaces, dots, and
51+
Only alphanumeric characters (a-zA-Z0-9), spaces, dots, slashes, and
3452
underscores are allowed.
3553
"""
54+
# TODO: logic around "/" for sinks that require special handling of them
3655
if _UNSAFE_CHARACTERS_REGEX.search(directory):
3756
raise ValueError(
3857
f"Invalid characters in directory path: {directory}. "
@@ -50,16 +69,6 @@ def set_extension(self, format: Format) -> None:
5069
self._extension = format.file_extension
5170
logger.info("File extension set to '%s'", self._extension)
5271

53-
@abstractmethod
54-
def write(self, data: bytes, batch: SinkBatch) -> None:
55-
"""Write the serialized data to storage.
56-
57-
:param data: The serialized data to write.
58-
:param batch: The batch information containing topic, partition and offset
59-
details.
60-
"""
61-
...
62-
6372
def _path(self, batch: SinkBatch) -> Path:
6473
"""Generate the full path where the batch data should be stored.
6574

quixstreams/sinks/community/file/destinations/local.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ def __init__(self, append: bool = False) -> None:
2929
self._mode = "ab" if append else "wb"
3030
logger.debug("LocalDestination initialized with append=%s", append)
3131

32+
def setup(self):
33+
return
34+
3235
def set_extension(self, format: Format) -> None:
3336
"""Set the file extension and validate append mode compatibility.
3437

0 commit comments

Comments
 (0)