Skip to content

Commit a37a619

Browse files
committed
external kafka source
1 parent fc264f9 commit a37a619

File tree

2 files changed

+244
-2
lines changed

2 files changed

+244
-2
lines changed

quixstreams/app.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -667,7 +667,9 @@ def get_producer(self) -> Producer:
667667
extra_config=self._producer_extra_config,
668668
)
669669

670-
def get_consumer(self, auto_commit_enable: bool = True) -> Consumer:
670+
def get_consumer(
671+
self, auto_commit_enable: bool = True, consumer_group_suffix: str = ""
672+
) -> Consumer:
671673
"""
672674
Create and return a pre-configured Consumer instance.
673675
The Consumer is initialized with params passed to Application.
@@ -708,7 +710,7 @@ def get_consumer(self, auto_commit_enable: bool = True) -> Consumer:
708710

709711
return Consumer(
710712
broker_address=self._broker_address,
711-
consumer_group=self._consumer_group,
713+
consumer_group=f"{self._consumer_group}{consumer_group_suffix}",
712714
auto_offset_reset=self._auto_offset_reset,
713715
auto_commit_enable=auto_commit_enable,
714716
extra_config=self._consumer_extra_config,

quixstreams/sources/kafka.py

Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
import logging
2+
import threading
3+
4+
from typing import Union, Optional, Dict, TYPE_CHECKING
5+
6+
from confluent_kafka import TopicPartition, KafkaException
7+
8+
from quixstreams.app import Application
9+
from quixstreams.kafka import Consumer, ConnectionConfig, AutoOffsetReset
10+
from quixstreams.kafka.exceptions import KafkaConsumerException
11+
from quixstreams.platforms.quix import QuixKafkaConfigsBuilder
12+
from quixstreams.checkpointing.exceptions import (
13+
CheckpointProducerTimeout,
14+
CheckpointConsumerCommitError,
15+
)
16+
from quixstreams.models.topics import TopicConfig, TopicAdmin
17+
from quixstreams.platforms.quix.api import QuixPortalApiService
18+
19+
from .base import CheckpointingSource, SourceTopic
20+
21+
if TYPE_CHECKING:
22+
from quixstreams.app import Application
23+
24+
logger = logging.getLogger(__name__)
25+
26+
27+
class ExternalKafkaSource(CheckpointingSource):
28+
29+
def __init__(
30+
self,
31+
name: str,
32+
app: "Application",
33+
topic: str,
34+
broker_address: Union[str, ConnectionConfig],
35+
consumer_group: Optional[str] = None,
36+
auto_offset_reset: AutoOffsetReset = "latest",
37+
consumer_extra_config: Optional[dict] = None,
38+
shutdown_timeout: int = 10,
39+
) -> None:
40+
super().__init__(name, shutdown_timeout)
41+
42+
if consumer_extra_config is None:
43+
consumer_extra_config = {}
44+
45+
if consumer_group is None:
46+
consumer_group = name
47+
48+
self._source_cluster_consumer = Consumer(
49+
broker_address=broker_address,
50+
consumer_group=consumer_group,
51+
auto_offset_reset=auto_offset_reset,
52+
extra_config=consumer_extra_config,
53+
auto_commit_enable=False,
54+
)
55+
56+
self._source_cluster_admin = TopicAdmin(
57+
broker_address=broker_address,
58+
extra_config=consumer_extra_config,
59+
)
60+
61+
self._target_cluster_consumer = app.get_consumer(
62+
auto_commit_enable=False, consumer_group_suffix="-offsets"
63+
)
64+
65+
self._running = True
66+
self._topic = topic
67+
self._tp_offsets = {}
68+
self._tp_offsets_lock = threading.Lock()
69+
70+
self._error: Optional[Exception] = None
71+
72+
def run(self) -> None:
73+
self._source_cluster_consumer.subscribe(
74+
topics=[self._topic],
75+
on_assign=self._on_assign,
76+
on_lost=self._on_lost,
77+
on_revoke=self._on_revoke,
78+
)
79+
80+
super().run()
81+
while self._running:
82+
if self._error:
83+
raise self._error
84+
85+
msg = self._source_cluster_consumer.poll(timeout=1)
86+
if msg is None:
87+
continue
88+
89+
if err := msg.error():
90+
raise KafkaConsumerException(error=err)
91+
92+
topic_name, partition, offset = msg.topic(), msg.partition(), msg.offset()
93+
self.produce(
94+
value=msg.value(),
95+
key=msg.key(),
96+
headers=msg.headers(),
97+
timestamp=msg.timestamp()[1],
98+
partition=partition,
99+
)
100+
with self._tp_offsets_lock:
101+
self._tp_offsets[(topic_name, partition)] = offset
102+
103+
def _target_cluster_offsets(self, partitions) -> Dict[int, int]:
104+
partitions = [
105+
TopicPartition(
106+
topic=self._producer_topic.name, partition=partition.partition
107+
)
108+
for partition in partitions
109+
]
110+
partitions_commited = self._target_cluster_consumer.committed(
111+
partitions, timeout=5
112+
)
113+
return {
114+
partition.partition: partition.offset for partition in partitions_commited
115+
}
116+
117+
def _on_assign(self, _, source_partitions):
118+
try:
119+
target_cluster_offset = self._target_cluster_offsets(source_partitions)
120+
for partition in source_partitions:
121+
partition.offset = target_cluster_offset.get(partition.partition, None)
122+
123+
self._source_cluster_consumer.incremental_assign(source_partitions)
124+
except Exception as exc:
125+
logger.exception("Error in source %s assigning partitions", self)
126+
self._error = exc
127+
128+
def _on_revoke(self, _, partitions):
129+
if not self._error:
130+
self.checkpoint()
131+
132+
def _on_lost(self, _, partitions):
133+
pass
134+
135+
def checkpoint(self):
136+
if not self._tp_offsets:
137+
return
138+
139+
super().checkpoint()
140+
with self._tp_offsets_lock:
141+
offsets = [
142+
TopicPartition(
143+
topic=self._producer_topic.name,
144+
partition=partition,
145+
offset=offset + 1,
146+
)
147+
for (topic, partition), offset in self._tp_offsets.items()
148+
]
149+
self._tp_offsets = {}
150+
151+
unproduced_msg_count = self._producer.flush(self.shutdown_timeout / 2)
152+
if unproduced_msg_count > 0:
153+
raise CheckpointProducerTimeout(
154+
f"'{unproduced_msg_count}' messages failed to be produced before the producer flush timeout"
155+
)
156+
157+
try:
158+
partitions = self._target_cluster_consumer.commit(
159+
offsets=offsets, asynchronous=False
160+
)
161+
except KafkaException as e:
162+
raise CheckpointConsumerCommitError(e.args[0]) from None
163+
164+
for partition in partitions:
165+
if partition.error:
166+
raise CheckpointConsumerCommitError(partition.error)
167+
168+
def stop(self):
169+
super().stop()
170+
self._running = False
171+
172+
def cleanup(self, failed):
173+
super().cleanup(failed)
174+
if not failed:
175+
self.checkpoint()
176+
177+
self._source_cluster_consumer.close()
178+
self._target_cluster_consumer.close()
179+
180+
def default_topic(self) -> SourceTopic:
181+
config = self._source_cluster_admin.inspect_topics(
182+
topic_names=[self._topic], timeout=5
183+
).get(self._topic)
184+
if config is None:
185+
config = TopicConfig(num_partitions=1, replication_factor=1)
186+
187+
return SourceTopic(
188+
name=self.name,
189+
value_serializer="bytes",
190+
value_deserializer="json",
191+
config=config,
192+
)
193+
194+
195+
class QuixEnvironmentSource(ExternalKafkaSource):
196+
def __init__(
197+
self,
198+
name: str,
199+
app: "Application",
200+
topic: str,
201+
quix_sdk_token: str,
202+
quix_workspace_id: str,
203+
consumer_group: Optional[str] = None,
204+
quix_portal_api: Optional[str] = None,
205+
auto_offset_reset: AutoOffsetReset = "latest",
206+
consumer_extra_config: Optional[dict] = None,
207+
shutdown_timeout: int = 10,
208+
) -> None:
209+
210+
if consumer_extra_config is None:
211+
consumer_extra_config = {}
212+
213+
if consumer_group is None:
214+
consumer_group = name
215+
216+
quix_config = QuixKafkaConfigsBuilder(
217+
quix_portal_api_service=QuixPortalApiService(
218+
default_workspace_id=quix_workspace_id,
219+
auth_token=quix_sdk_token,
220+
portal_api=quix_portal_api,
221+
)
222+
)
223+
224+
self._short_topic = topic
225+
self._quix_workspace_id = quix_workspace_id
226+
227+
consumer_extra_config.update(quix_config.librdkafka_extra_config)
228+
super().__init__(
229+
name=name,
230+
app=app,
231+
topic=quix_config.prepend_workspace_id(topic),
232+
consumer_group=quix_config.prepend_workspace_id(consumer_group),
233+
auto_offset_reset=auto_offset_reset,
234+
consumer_extra_config=consumer_extra_config,
235+
shutdown_timeout=shutdown_timeout,
236+
broker_address=quix_config.librdkafka_connection_config,
237+
)
238+
239+
def __repr__(self) -> str:
240+
return f"<{self.__class__.__name__}({self._short_topic}@{self._quix_workspace_id})>"

0 commit comments

Comments
 (0)