Skip to content

Commit 0982bd7

Browse files
committed
kafka source tests
1 parent 57052de commit 0982bd7

File tree

6 files changed

+340
-20
lines changed

6 files changed

+340
-20
lines changed

quixstreams/app.py

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -666,7 +666,7 @@ def get_producer(self) -> Producer:
666666
```python
667667
from quixstreams import Application
668668
669-
app = Application.Quix(...)
669+
app = Application(...)
670670
topic = app.topic("input")
671671
672672
with app.get_producer() as producer:
@@ -706,7 +706,7 @@ def get_consumer(
706706
```python
707707
from quixstreams import Application
708708
709-
app = Application.Quix(...)
709+
app = Application(...)
710710
topic = app.topic("input")
711711
712712
with app.get_consumer() as consumer:
@@ -719,6 +719,11 @@ def get_consumer(
719719
# consumer.store_offsets(msg)
720720
721721
```
722+
723+
:param auto_commit_enable: Enable or disable auto commit
724+
Default - True
725+
:param consumer_group_suffix: Suffix added to the application consumer group
726+
Default - ""
722727
"""
723728
self._setup_topics()
724729

@@ -731,6 +736,24 @@ def get_consumer(
731736
)
732737

733738
def get_admin(self) -> TopicAdmin:
739+
"""
740+
Create and return a TopicAdmin instance.
741+
The TopicAdmin is initialized with params passed to Application
742+
743+
It's useful for configuring and inspecting kafka topics.
744+
745+
Example Snippet:
746+
747+
```python
748+
from quixstreams import Application
749+
750+
app = Application(...)
751+
admin = app.get_admin()
752+
topics = admin.list_topics():
753+
754+
print(topics)
755+
```
756+
"""
734757
return TopicAdmin(
735758
broker_address=self._broker_address,
736759
extra_config=self._producer_extra_config,

quixstreams/checkpointing/checkpoint.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,15 @@
2626

2727

2828
class BaseCheckpoint:
29+
"""
30+
Base class to keep track of state updates and consumer offsets and to checkpoint these
31+
updates on schedule.
32+
33+
Two implementation exist:
34+
* one for checkpointing the Application in quixstreams/checkpoint/checkpoint.py
35+
* one for checkpointing the kafka source in quixstreams/sources/kafka/checkpoint.py
36+
"""
37+
2938
def __init__(
3039
self,
3140
commit_interval: float,
@@ -106,8 +115,7 @@ def commit(self):
106115

107116
class Checkpoint(BaseCheckpoint):
108117
"""
109-
Class to keep track of state updates and consumer offsets and to checkpoint these
110-
updates on schedule.
118+
Checkpoint implementation used by the application
111119
"""
112120

113121
def __init__(

quixstreams/sources/kafka/checkpoint.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@
1313

1414

1515
class Checkpoint(BaseCheckpoint):
16+
"""
17+
Checkpoint implementation used by the KafkaSource
18+
"""
19+
1620
def __init__(
1721
self,
1822
producer: RowProducer,
@@ -33,6 +37,13 @@ def close(self):
3337
pass
3438

3539
def commit(self):
40+
"""
41+
Commit the checkpoint.
42+
43+
This method will:
44+
2. Flush the producer to ensure everything is delivered.
45+
3. Commit topic offsets.
46+
"""
3647
unproduced_msg_count = self._producer.flush(self._flush_timeout)
3748
if unproduced_msg_count > 0:
3849
raise CheckpointProducerTimeout(

quixstreams/sources/kafka/kafka.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ def __init__(
6565
commit_interval: float = 5,
6666
commit_every: int = 0,
6767
flush_timeout: float = 10,
68+
admin_timeout: float = 10,
6869
) -> None:
6970
super().__init__(name, shutdown_timeout)
7071

@@ -100,6 +101,7 @@ def __init__(
100101
self._commit_interval = commit_interval
101102
self._commit_every = commit_every
102103
self._flush_timeout = flush_timeout
104+
self._admin_timeout = admin_timeout
103105

104106
self._running = True
105107
self._error: Optional[Exception] = None
@@ -135,7 +137,7 @@ def run(self) -> None:
135137
topic_name, partition, offset = msg.topic(), msg.partition(), msg.offset()
136138
self._checkpoint.store_offset(topic_name, partition, offset)
137139
logger.log(
138-
5,
140+
40,
139141
"producing message to partition %s with offset %s",
140142
partition,
141143
offset,
@@ -177,13 +179,21 @@ def init_checkpoint(self) -> Checkpoint:
177179

178180
def _validate_topics(self):
179181
source_topic = self._source_cluster_admin.inspect_topics(
180-
topic_names=[self._topic], timeout=5
182+
topic_names=[self._topic], timeout=self._admin_timeout
181183
).get(self._topic)
184+
185+
if source_topic is None:
186+
raise ValueError(f"Source topic {self._topic} not found")
187+
182188
logger.debug("source topic %s configuration: %s", self._topic, source_topic)
183189

184190
target_topic = self._target_cluster_admin.inspect_topics(
185-
topic_names=[self._producer_topic.name], timeout=5
191+
topic_names=[self._producer_topic.name], timeout=self._admin_timeout
186192
).get(self._producer_topic.name)
193+
194+
if target_topic is None:
195+
raise ValueError(f"Destination topic {self._producer_topic.name} not found")
196+
187197
logger.debug(
188198
"destination topic %s configuration: %s",
189199
self._producer_topic.name,
@@ -203,7 +213,7 @@ def _target_cluster_offsets(self, partitions) -> Dict[int, int]:
203213
for partition in partitions
204214
]
205215
partitions_commited = self._target_cluster_consumer.committed(
206-
partitions, timeout=5
216+
partitions, timeout=self._admin_timeout
207217
)
208218
return {
209219
partition.partition: partition.offset for partition in partitions_commited
@@ -248,7 +258,7 @@ def cleanup(self, failed):
248258

249259
def default_topic(self) -> SourceTopic:
250260
config = self._source_cluster_admin.inspect_topics(
251-
topic_names=[self._topic], timeout=5
261+
topic_names=[self._topic], timeout=self._admin_timeout
252262
).get(self._topic)
253263
if config is None:
254264
config = TopicConfig(num_partitions=1, replication_factor=1)

tests/conftest.py

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,22 @@ def log_test_progress(request: pytest.FixtureRequest):
3434

3535

3636
@pytest.fixture(scope="session")
37-
def kafka_container() -> KafkaContainer:
38-
(
39-
kafka_container,
40-
broker_address,
41-
kafka_port,
42-
) = ContainerHelper.create_kafka_container()
43-
test_logger.debug(f"Starting Kafka container on {broker_address}")
44-
ContainerHelper.start_kafka_container(kafka_container)
45-
test_logger.debug(f"Started Kafka container on {broker_address}")
46-
yield KafkaContainer(broker_address=broker_address)
47-
kafka_container.stop()
37+
def kafka_container_factory() -> KafkaContainer:
38+
def factory():
39+
(
40+
kafka_container,
41+
broker_address,
42+
kafka_port,
43+
) = ContainerHelper.create_kafka_container()
44+
test_logger.debug(f"Starting Kafka container on {broker_address}")
45+
ContainerHelper.start_kafka_container(kafka_container)
46+
test_logger.debug(f"Started Kafka container on {broker_address}")
47+
yield KafkaContainer(broker_address=broker_address)
48+
kafka_container.stop()
49+
50+
return factory
51+
52+
53+
@pytest.fixture(scope="session")
54+
def kafka_container(kafka_container_factory) -> KafkaContainer:
55+
yield from kafka_container_factory()

0 commit comments

Comments
 (0)