Skip to content

Commit d4846b7

Browse files
authored
update docs and tutorials based on connect callback addition to source and sink (#775)
1 parent 0abda34 commit d4846b7

File tree

8 files changed

+53
-58
lines changed

8 files changed

+53
-58
lines changed

docs/connectors/sinks/README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,3 +123,13 @@ sdf.sink(influx_sink)
123123
if __name__ == '__main__':
124124
app.run()
125125
```
126+
127+
## Connection Callbacks
128+
129+
Assuming a given connector is set up to utilize them, there are two callbacks that can
130+
be set for when a client connects/authenticates successfully or not, named
131+
`on_client_connect_success` and `on_client_connect_failure`, respectfully.
132+
133+
Though having a `setup` method is required, it is not guaranteed that it is implemented or
134+
utilized fully with `community` sinks; you can inspect a given Sink's `setup` method
135+
to confirm whether it tests the client connection there (and that the callbacks are then applicable).

docs/connectors/sinks/custom-sinks.md

Lines changed: 11 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -11,61 +11,6 @@ The `StreamingDataFrame.sink()` accepts implementations of this class.
1111
Check out [InfluxDB3Sink](../../api-reference/sinks.md#influxdb3sink) and [CSVSink](../../api-reference/sinks.md#csvsink) for example implementations of the batching sinks.
1212

1313

14-
Here is the code for `BaseSink` class for the reference:
15-
16-
```python
17-
import abc
18-
19-
class BaseSink(abc.ABC):
20-
"""
21-
This is a base class for all sinks.
22-
23-
Subclass and implement its methods to create your own sink.
24-
25-
Note that sinks are currently in beta, and their design may change over time.
26-
"""
27-
28-
@abc.abstractmethod
29-
def flush(self, topic: str, partition: int):
30-
"""
31-
This method is triggered by the Checkpoint class when it commits.
32-
33-
You can use `flush()` to write the batched data to the destination (in case of
34-
a batching sink), or confirm the delivery of the previously sent messages
35-
(in case of a streaming sink).
36-
37-
If flush() fails, the checkpoint will be aborted.
38-
"""
39-
40-
@abc.abstractmethod
41-
def add(
42-
self,
43-
value: Any,
44-
key: Any,
45-
timestamp: int,
46-
headers: List[Tuple[str, HeaderValue]],
47-
topic: str,
48-
partition: int,
49-
offset: int,
50-
):
51-
"""
52-
This method is triggered on every new record sent to this sink.
53-
54-
You can use it to accumulate batches of data before sending them outside, or
55-
to send results right away in a streaming manner and confirm a delivery later
56-
on flush().
57-
"""
58-
59-
def on_paused(self, topic: str, partition: int):
60-
"""
61-
This method is triggered when the sink is paused due to backpressure, when
62-
the `SinkBackpressureError` is raised.
63-
64-
Here you can react to backpressure events.
65-
"""
66-
```
67-
68-
6914
## Sinks Workflow
7015

7116
During processing, sinks do the following operations:
@@ -109,14 +54,23 @@ class MyDatabaseSink(BatchingSink):
10954
"""
11055
Some sink writing data to a database
11156
"""
57+
def __init__(self, on_client_connect_success=None, on_client_connect_failure=None):
58+
super().__init__(
59+
on_client_connect_success=on_client_connect_success,
60+
on_client_connect_failure=on_client_connect_failure
61+
)
62+
self._db_connection = None
63+
64+
def setup(self):
65+
self._db_connection = my_database.connect('<connection credentials>')
66+
self._db_connection.test()
11267

11368
def write(self, batch: SinkBatch):
11469
# Simulate some DB connection here
115-
db_connection = my_database.connect('<connection credentials>')
11670
data = [{'value': item.value} for item in batch]
11771
try:
11872
# Try to write data to the db
119-
db_connection.write(data)
73+
self._db_connection.write(data)
12074
except TimeoutError:
12175
# In case of timeout, tell the app to wait for 30s
12276
# and retry the writing later

docs/connectors/sources/README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,3 +133,13 @@ def main():
133133
if __name__ == "__main__":
134134
main()
135135
```
136+
137+
## Connection Callbacks
138+
139+
Assuming a given connector is set up to utilize them, there are two callbacks that can
140+
be set for when a client connects/authenticates successfully or not, named
141+
`on_client_connect_success` and `on_client_connect_failure`, respectfully.
142+
143+
Though having a `setup` method is required, it is not guaranteed that it is implemented or
144+
utilized fully with `community` sources; you can inspect a given Source's `setup` method
145+
to confirm whether it tests the client connection there (and that the callbacks are then applicable).

docs/connectors/sources/custom-sources.md

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,23 @@ Quix Streams also provides a set of classes to help users implement custom sourc
1010

1111
The recommended parent class to create a new source. It handles configuring, starting and stopping the source, as well as implementing a series of helpers.
1212

13-
To get started, implement the [`run`](../../api-reference/sources.md#sourcerun) method and return when `self.running` is `False`.
13+
To get started, implement the [`run`](../../api-reference/sources.md#sourcerun) method
14+
which loops while `self.running` is `True` (or until it's done).
15+
16+
When you also have a client pattern, it is recommended to use the required `setup`
17+
method to establish the initial connection/authentication so that the built-in callbacks
18+
of `on_client_connect_success` and `on_client_connect_failure` can be utilized.
19+
Otherwise, just set it to return.
1420

1521
Example subclass:
1622

1723
```python
1824
from quixstreams.sources.base import Source
1925

2026
class MySource(Source):
27+
def setup(self):
28+
return
29+
2130
def run(self):
2231
with open("file.txt", "r") as f:
2332
while self.running:

docs/tutorials/anomaly-detection/tutorial_app.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ def __init__(self):
5050
}
5151
super().__init__(name="temperature-event-generator")
5252

53+
def setup(self):
54+
return
55+
5356
def update_machine_temp(self, machine_id):
5457
"""
5558
Updates the temperature for a machine by -1, 0, or 1 based on its current temp.

docs/tutorials/purchase-filtering/tutorial_app.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ class PurchaseGenerator(Source):
6666
def __init__(self):
6767
super().__init__(name="customer-purchases")
6868

69+
def setup(self):
70+
return
71+
6972
def run(self):
7073
for cid, purchase_info in enumerate(self._purchases_data):
7174
event = self.serialize(key=f"CUSTOMER_ID{cid}", value=purchase_info)

docs/tutorials/websocket-source/tutorial_app.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ def __init__(
3636
self._url = url
3737
self._product_ids = product_ids
3838

39+
def setup(self):
40+
return
41+
3942
def run(self) -> None:
4043
"""
4144
The main method of the source with the main logic.

docs/tutorials/word-count/tutorial_app.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ class ReviewGenerator(Source):
2727
def __init__(self):
2828
super().__init__(name="customer-reviews")
2929

30+
def setup(self):
31+
return
32+
3033
def run(self):
3134
for review in self._review_list:
3235
event = self.serialize(key=choice(self._product_list), value=review)

0 commit comments

Comments
 (0)