Skip to content

Commit 3a7968a

Browse files
authored
Refactor RowConsumer and RowProducer (#861)
* Rename `RowConsumer` -> `InternalConsumer` * Rename `RowProducer` -> `InternalProducer` * Rename internal consumer test folder * Merge TransactionalProducer to Producer
1 parent 3b435ee commit 3a7968a

File tree

31 files changed

+479
-417
lines changed

31 files changed

+479
-417
lines changed

quixstreams/app.py

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
ProducerErrorCallback,
3232
default_on_processing_error,
3333
)
34+
from .internal_consumer import InternalConsumer
35+
from .internal_producer import InternalProducer
3436
from .kafka import AutoOffsetReset, ConnectionConfig, Consumer, Producer
3537
from .logging import LogLevel, configure_logging
3638
from .models import (
@@ -50,8 +52,6 @@
5052
is_quix_deployment,
5153
)
5254
from .processing import ProcessingContext
53-
from .rowconsumer import RowConsumer
54-
from .rowproducer import RowProducer
5555
from .runtracker import RunTracker
5656
from .sinks import SinkManager
5757
from .sources import BaseSource, SourceException, SourceManager
@@ -66,7 +66,7 @@
6666
ProcessingGuarantee = Literal["at-least-once", "exactly-once"]
6767
MessageProcessedCallback = Callable[[str, int, int], None]
6868

69-
# Enforce idempotent producing for the internal RowProducer
69+
# Enforce idempotent producing for InternalProducer
7070
_default_producer_extra_config = {"enable.idempotence": True}
7171

7272
# Default config for the internal consumer
@@ -198,8 +198,8 @@ def __init__(
198198
Default - `"state"`.
199199
:param rocksdb_options: RocksDB options.
200200
If `None`, the default options will be used.
201-
:param consumer_poll_timeout: timeout for `RowConsumer.poll()`. Default - `1.0`s
202-
:param producer_poll_timeout: timeout for `RowProducer.poll()`. Default - `0`s.
201+
:param consumer_poll_timeout: timeout for `InternalConsumer.poll()`. Default - `1.0`s
202+
:param producer_poll_timeout: timeout for `InternalProducer.poll()`. Default - `0`s.
203203
:param on_message_processed: a callback triggered when message is successfully
204204
processed.
205205
:param loglevel: a log level for "quixstreams" logger.
@@ -227,11 +227,11 @@ def __init__(
227227
exceptions occur on different stages of stream processing. If the callback
228228
returns `True`, the exception will be ignored. Otherwise, the exception
229229
will be propagated and the processing will eventually stop.
230-
:param on_consumer_error: triggered when internal `RowConsumer` fails
230+
:param on_consumer_error: triggered when internal `InternalConsumer` fails
231231
to poll Kafka or cannot deserialize a message.
232232
:param on_processing_error: triggered when exception is raised within
233233
`StreamingDataFrame.process()`.
234-
:param on_producer_error: triggered when `RowProducer` fails to serialize
234+
:param on_producer_error: triggered when `InternalProducer` fails to serialize
235235
or to produce a message to Kafka.
236236
<br><br>***Quix Cloud Parameters***<br>
237237
:param quix_config_builder: instance of `QuixKafkaConfigsBuilder` to be used
@@ -335,11 +335,11 @@ def __init__(
335335
self._on_message_processed = on_message_processed
336336
self._on_processing_error = on_processing_error or default_on_processing_error
337337

338-
self._consumer = self._get_rowconsumer(
338+
self._consumer = self._get_internal_consumer(
339339
on_error=on_consumer_error,
340340
extra_config_overrides=consumer_extra_config_overrides,
341341
)
342-
self._producer = self._get_rowproducer(on_error=on_producer_error)
342+
self._producer = self._get_internal_producer(on_error=on_producer_error)
343343
self._running = False
344344
self._failed = False
345345

@@ -577,21 +577,21 @@ def stop(self, fail: bool = False):
577577
if self._state_manager.using_changelogs:
578578
self._state_manager.stop_recovery()
579579

580-
def _get_rowproducer(
580+
def _get_internal_producer(
581581
self,
582582
on_error: Optional[ProducerErrorCallback] = None,
583583
transactional: Optional[bool] = None,
584-
) -> RowProducer:
584+
) -> InternalProducer:
585585
"""
586-
Create a RowProducer using the application config
586+
Create InternalProducer using the application config
587587
588588
Used to create the application producer as well as the sources producers
589589
"""
590590

591591
if transactional is None:
592592
transactional = self._config.exactly_once
593593

594-
return RowProducer(
594+
return InternalProducer(
595595
broker_address=self._config.broker_address,
596596
extra_config=self._config.producer_extra_config,
597597
flush_timeout=self._config.flush_timeout,
@@ -628,13 +628,13 @@ def get_producer(self) -> Producer:
628628
extra_config=self._config.producer_extra_config,
629629
)
630630

631-
def _get_rowconsumer(
631+
def _get_internal_consumer(
632632
self,
633633
on_error: Optional[ConsumerErrorCallback] = None,
634634
extra_config_overrides: Optional[dict] = None,
635-
) -> RowConsumer:
635+
) -> InternalConsumer:
636636
"""
637-
Create a RowConsumer using the application config
637+
Create an InternalConsumer using the application config
638638
639639
Used to create the application consumer as well as the sources consumers
640640
"""
@@ -644,7 +644,7 @@ def _get_rowconsumer(
644644
**self._config.consumer_extra_config,
645645
**extra_config_overrides,
646646
}
647-
return RowConsumer(
647+
return InternalConsumer(
648648
broker_address=self._config.broker_address,
649649
consumer_group=self._config.consumer_group,
650650
auto_offset_reset=self._config.auto_offset_reset,
@@ -734,8 +734,8 @@ def add_source(self, source: BaseSource, topic: Optional[Topic] = None) -> Topic
734734
self._source_manager.register(
735735
source,
736736
topic,
737-
self._get_rowproducer(transactional=False),
738-
self._get_rowconsumer(
737+
self._get_internal_producer(transactional=False),
738+
self._get_internal_consumer(
739739
extra_config_overrides=consumer_extra_config_overrides
740740
),
741741
self._get_topic_manager(),

quixstreams/checkpointing/checkpoint.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
from confluent_kafka import KafkaException, TopicPartition
77

88
from quixstreams.dataframe import DataFrameRegistry
9-
from quixstreams.rowconsumer import RowConsumer
10-
from quixstreams.rowproducer import RowProducer
9+
from quixstreams.internal_consumer import InternalConsumer
10+
from quixstreams.internal_producer import InternalProducer
1111
from quixstreams.sinks import SinkManager
1212
from quixstreams.sinks.base import SinkBackpressureError
1313
from quixstreams.state import (
@@ -122,8 +122,8 @@ class Checkpoint(BaseCheckpoint):
122122
def __init__(
123123
self,
124124
commit_interval: float,
125-
producer: RowProducer,
126-
consumer: RowConsumer,
125+
producer: InternalProducer,
126+
consumer: InternalConsumer,
127127
state_manager: StateStoreManager,
128128
sink_manager: SinkManager,
129129
dataframe_registry: DataFrameRegistry,
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from .consumer import InternalConsumer as InternalConsumer

quixstreams/rowconsumer/consumer.py renamed to quixstreams/internal_consumer/consumer.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
logger = logging.getLogger(__name__)
2222

23-
__all__ = ("RowConsumer",)
23+
__all__ = ("InternalConsumer",)
2424

2525

2626
def _validate_message_batch(
@@ -37,10 +37,9 @@ def _validate_message_batch(
3737
raise
3838

3939

40-
class RowConsumer(BaseConsumer):
40+
class InternalConsumer(BaseConsumer):
4141
_backpressure_resume_at: float
4242

43-
# TODO: Rename it to "InternalConsumer"? Or "InternalBufferedConsumer"?
4443
def __init__(
4544
self,
4645
broker_address: Union[str, ConnectionConfig],
@@ -55,14 +54,12 @@ def __init__(
5554
max_partition_buffer_size: int = 10000,
5655
):
5756
"""
58-
5957
A consumer class that is capable of deserializing Kafka messages to Rows
6058
according to the Topics deserialization settings.
6159
6260
It overrides `.subscribe()` method of Consumer class to accept `Topic`
6361
objects instead of strings.
6462
65-
6663
:param broker_address: Connection settings for Kafka.
6764
Accepts string with Kafka broker host and port formatted as `<host>:<port>`,
6865
or a ConnectionConfig object if authentication is required.
@@ -79,7 +76,8 @@ def __init__(
7976
:param extra_config: A dictionary with additional options that
8077
will be passed to `confluent_kafka.Consumer` as is.
8178
Note: values passed as arguments override values in `extra_config`.
82-
:param on_error: a callback triggered when `RowConsumer.poll_row` fails.
79+
:param on_error: a callback triggered when InternalConsumer fails
80+
to get and deserialize a new message.
8381
If consumer fails and the callback returns `True`, the exception
8482
will be logged but not propagated.
8583
The default callback logs an exception and returns `False`.

quixstreams/rowproducer.py renamed to quixstreams/internal_producer.py

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
PRODUCER_ON_ERROR_RETRIES,
1616
PRODUCER_POLL_TIMEOUT,
1717
Producer,
18-
TransactionalProducer,
1918
)
2019
from .models import Headers, Row, Topic
2120

@@ -74,7 +73,7 @@ def wrapper(*args, **kwargs):
7473
class KafkaProducerTransactionCommitFailed(QuixException): ...
7574

7675

77-
class RowProducer:
76+
class InternalProducer:
7877
"""
7978
A producer class that is capable of serializing Rows to bytes and send them to Kafka.
8079
The serialization is performed according to the Topic serialization settings.
@@ -85,8 +84,8 @@ class RowProducer:
8584
:param extra_config: A dictionary with additional options that
8685
will be passed to `confluent_kafka.Producer` as is.
8786
Note: values passed as arguments override values in `extra_config`.
88-
:param on_error: a callback triggered when `RowProducer.produce_row()`
89-
or `RowProducer.poll()` fail`.
87+
:param on_error: a callback triggered when `InternalProducer.produce_row()`
88+
or `InternalProducer.poll()` fail`.
9089
If producer fails and the callback returns `True`, the exception
9190
will be logged but not propagated.
9291
The default callback logs an exception and returns `False`.
@@ -103,18 +102,12 @@ def __init__(
103102
flush_timeout: Optional[float] = None,
104103
transactional: bool = False,
105104
):
106-
if transactional:
107-
self._producer: Producer = TransactionalProducer(
108-
broker_address=broker_address,
109-
extra_config=extra_config,
110-
flush_timeout=flush_timeout,
111-
)
112-
else:
113-
self._producer = Producer(
114-
broker_address=broker_address,
115-
extra_config=extra_config,
116-
flush_timeout=flush_timeout,
117-
)
105+
self._producer = Producer(
106+
broker_address=broker_address,
107+
extra_config=extra_config,
108+
flush_timeout=flush_timeout,
109+
transactional=transactional,
110+
)
118111

119112
self._on_error: ProducerErrorCallback = on_error or default_on_producer_error
120113
self._tp_offsets: Dict[Tuple[str, int], int] = {}

quixstreams/kafka/exceptions.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,6 @@ class KafkaConsumerException(BaseKafkaException): ...
3030

3131

3232
class KafkaProducerDeliveryError(BaseKafkaException): ...
33+
34+
35+
class InvalidProducerConfigError(QuixException): ...

0 commit comments

Comments
 (0)