Skip to content

Commit a68d474

Browse files
authored
remove all source and sink setup abstracts (#784)
1 parent a6fb0fe commit a68d474

File tree

5 files changed

+9
-27
lines changed

5 files changed

+9
-27
lines changed

docs/connectors/sinks/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,6 @@ Assuming a given connector is set up to utilize them, there are two callbacks th
130130
be set for when a client connects/authenticates successfully or not, named
131131
`on_client_connect_success` and `on_client_connect_failure`, respectfully.
132132

133-
Though having a `setup` method is required, it is not guaranteed that it is implemented or
134-
utilized fully with `community` sinks; you can inspect a given Sink's `setup` method
133+
Though implementing an applicable `setup` method is strongly encouraged, it is not an
134+
enforced guarantee with `community` sinks; you can inspect a given Sink's `setup` method
135135
to confirm whether it tests the client connection there (and that the callbacks are then applicable).

quixstreams/sinks/base/sink.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@ def add(
8080
on flush().
8181
"""
8282

83-
@abc.abstractmethod
8483
def setup(self):
8584
"""
8685
When applicable, set up the client here along with any validation to affirm a

quixstreams/sources/base/source.py

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,13 @@ def configure(self, topic: Topic, producer: RowProducer, **kwargs) -> None:
122122
self._producer = producer
123123
self._producer_topic = topic
124124

125+
def setup(self):
126+
"""
127+
When applicable, set up the client here along with any validation to affirm a
128+
valid/successful authentication/connection.
129+
"""
130+
return
131+
125132
@property
126133
def producer(self) -> RowProducer:
127134
if self._producer is None:
@@ -161,13 +168,6 @@ def default_topic(self) -> Topic:
161168
Note: if the default topic is used, the Application will prefix its name with "source__".
162169
"""
163170

164-
@abstractmethod
165-
def setup(self):
166-
"""
167-
When applicable, set up the client here along with any validation to affirm a
168-
valid/successful authentication/connection.
169-
"""
170-
171171

172172
class Source(BaseSource):
173173
"""
@@ -256,13 +256,6 @@ def running(self) -> bool:
256256
"""
257257
return self._running
258258

259-
def setup(self):
260-
"""
261-
Though implemented for the sake of making simple sources easier to use,
262-
it is still recommended to override this when using a client-based pattern.
263-
"""
264-
return
265-
266259
def cleanup(self, failed: bool) -> None:
267260
"""
268261
This method is triggered once the `run` method completes.

tests/test_quixstreams/test_checkpointing.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,11 @@ def write(self, batch: SinkBatch):
6666
retry_after=999, topic=batch.topic, partition=batch.partition
6767
)
6868

69-
def setup(self): ...
70-
7169

7270
class FailingSink(BatchingSink):
7371
def write(self, batch: SinkBatch):
7472
raise ValueError("Sink write failed")
7573

76-
def setup(self): ...
77-
7874

7975
@pytest.mark.parametrize("store_type", SUPPORTED_STORES, indirect=True)
8076
class TestCheckpoint:

tests/utils.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -111,9 +111,6 @@ def __init__(self):
111111
super().__init__()
112112
self._results = []
113113

114-
def setup(self):
115-
return
116-
117114
def write(self, batch: SinkBatch):
118115
for item in batch:
119116
self._results.append(item)
@@ -161,9 +158,6 @@ def _produce(self):
161158
msg = self.serialize(key=self.key, value=value)
162159
self.produce(value=msg.value, key=msg.key)
163160

164-
def setup(self):
165-
return
166-
167161
def cleanup(self, failed):
168162
if "cleanup" in self.error_in:
169163
self.error("test cleanup error")

0 commit comments

Comments
 (0)