Skip to content

Commit ebab4e8

Browse files
authored
Feature: Sources API (#420)
Sources API
1 parent 3c19186 commit ebab4e8

File tree

14 files changed

+1011
-60
lines changed

14 files changed

+1011
-60
lines changed

quixstreams/app.py

Lines changed: 107 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import functools
33
import logging
44
import os
5+
import time
56
import signal
67
import warnings
78
from typing import Optional, List, Callable, Union, Literal, Tuple, Type
@@ -44,6 +45,7 @@
4445
from .state import StateStoreManager
4546
from .state.recovery import RecoveryManager
4647
from .state.rocksdb import RocksDBOptionsType
48+
from .sources.manager import SourceManager, BaseSource, SourceException
4749
from .utils.settings import BaseSettings
4850

4951
__all__ = ("Application", "ApplicationConfig")
@@ -114,7 +116,7 @@ def __init__(
114116
on_message_processed: Optional[MessageProcessedCallback] = None,
115117
consumer_poll_timeout: float = 1.0,
116118
producer_poll_timeout: float = 0.0,
117-
loglevel: Optional[LogLevel] = "INFO",
119+
loglevel: Optional[Union[int, LogLevel]] = "INFO",
118120
auto_create_topics: bool = True,
119121
use_changelog_topics: bool = True,
120122
quix_config_builder: Optional[QuixKafkaConfigsBuilder] = None,
@@ -289,13 +291,7 @@ def __init__(
289291
extra_config=self._config.consumer_extra_config,
290292
on_error=on_consumer_error,
291293
)
292-
self._producer = RowProducer(
293-
broker_address=self._config.broker_address,
294-
extra_config=self._config.producer_extra_config,
295-
on_error=on_producer_error,
296-
flush_timeout=self._config.flush_timeout,
297-
transactional=self._config.exactly_once,
298-
)
294+
self._producer = self._get_rowproducer(on_error=on_producer_error)
299295
self._running = False
300296
self._failed = False
301297

@@ -328,6 +324,7 @@ def __init__(
328324
recovery_manager=recovery_manager,
329325
)
330326

327+
self._source_manager = SourceManager()
331328
self._sink_manager = SinkManager()
332329
self._pausing_manager = PausingManager(consumer=self._consumer)
333330
self._processing_context = ProcessingContext(
@@ -364,7 +361,7 @@ def Quix(
364361
on_message_processed: Optional[MessageProcessedCallback] = None,
365362
consumer_poll_timeout: float = 1.0,
366363
producer_poll_timeout: float = 0.0,
367-
loglevel: Optional[LogLevel] = "INFO",
364+
loglevel: Optional[Union[int, LogLevel]] = "INFO",
368365
quix_config_builder: Optional[QuixKafkaConfigsBuilder] = None,
369366
auto_create_topics: bool = True,
370367
use_changelog_topics: bool = True,
@@ -573,7 +570,8 @@ def custom_ts_extractor(
573570

574571
def dataframe(
575572
self,
576-
topic: Topic,
573+
topic: Optional[Topic] = None,
574+
source: Optional[BaseSource] = None,
577575
) -> StreamingDataFrame:
578576
"""
579577
A simple helper method that generates a `StreamingDataFrame`, which is used
@@ -603,6 +601,12 @@ def dataframe(
603601
to be used as an input topic.
604602
:return: `StreamingDataFrame` object
605603
"""
604+
if not source and not topic:
605+
raise ValueError("one of `source` or `topic` is required")
606+
607+
if source:
608+
topic = self.add_source(source, topic)
609+
606610
sdf = StreamingDataFrame(
607611
topic=topic,
608612
topic_manager=self._topic_manager,
@@ -633,6 +637,28 @@ def stop(self, fail: bool = False):
633637
if self._state_manager.using_changelogs:
634638
self._state_manager.stop_recovery()
635639

640+
def _get_rowproducer(
641+
self,
642+
on_error: Optional[ProducerErrorCallback] = None,
643+
transactional: Optional[bool] = None,
644+
) -> RowProducer:
645+
"""
646+
Create a RowProducer using the application config
647+
648+
Used to create the application producer as well as the sources producers
649+
"""
650+
651+
if transactional is None:
652+
transactional = self._config.exactly_once
653+
654+
return RowProducer(
655+
broker_address=self._config.broker_address,
656+
extra_config=self._config.producer_extra_config,
657+
flush_timeout=self._config.flush_timeout,
658+
on_error=on_error,
659+
transactional=transactional,
660+
)
661+
636662
def get_producer(self) -> Producer:
637663
"""
638664
Create and return a pre-configured Producer instance.
@@ -716,6 +742,25 @@ def clear_state(self):
716742
"""
717743
self._state_manager.clear_stores()
718744

745+
def add_source(self, source: BaseSource, topic: Optional[Topic] = None) -> Topic:
746+
"""
747+
Add a source to the application.
748+
749+
See :class:`quixstreams.sources.base.BaseSource` for more details.
750+
751+
:param source: a :class:`quixstreams.sources.BaseSource` instance
752+
:param topic: the :class:`quixstreams.models.Topic` instance the source will produce to
753+
Default: the source default
754+
"""
755+
if not topic:
756+
topic = source.default_topic()
757+
self._topic_manager.register(topic)
758+
759+
producer = self._get_rowproducer(transactional=False)
760+
source.configure(topic, producer)
761+
self._source_manager.register(source)
762+
return topic
763+
719764
def run(
720765
self,
721766
dataframe: StreamingDataFrame,
@@ -745,6 +790,19 @@ def run(
745790
746791
:param dataframe: instance of `StreamingDataFrame`
747792
"""
793+
self._run(dataframe)
794+
795+
def _exception_handler(self, exc_type, exc_val, exc_tb):
796+
fail = False
797+
798+
# Sources and the application are independent.
799+
# If a source fails, the application can shutdown gracefully.
800+
if exc_val is not None and exc_type is not SourceException:
801+
fail = True
802+
803+
self.stop(fail=fail)
804+
805+
def _run(self, dataframe: Optional[StreamingDataFrame] = None):
748806
self._setup_signal_handlers()
749807

750808
logger.info(
@@ -765,36 +823,52 @@ def run(
765823
exit_stack.enter_context(self._processing_context)
766824
exit_stack.enter_context(self._state_manager)
767825
exit_stack.enter_context(self._consumer)
768-
exit_stack.push(
769-
lambda exc_type, exc_val, exc_tb: self.stop(fail=exc_val is not None)
770-
)
826+
exit_stack.enter_context(self._source_manager)
827+
exit_stack.push(self._exception_handler)
771828

772829
with exit_stack:
773830
# Subscribe to topics in Kafka and start polling
774-
self._consumer.subscribe(
775-
dataframe.topics_to_subscribe,
776-
on_assign=self._on_assign,
777-
on_revoke=self._on_revoke,
778-
on_lost=self._on_lost,
779-
)
780-
logger.info("Waiting for incoming messages")
781-
# Start polling Kafka for messages and callbacks
782-
self._running = True
831+
if dataframe is not None:
832+
self._run_dataframe(dataframe)
833+
else:
834+
self._run_sources()
835+
836+
def _run_dataframe(self, dataframe):
837+
self._consumer.subscribe(
838+
dataframe.topics_to_subscribe,
839+
on_assign=self._on_assign,
840+
on_revoke=self._on_revoke,
841+
on_lost=self._on_lost,
842+
)
843+
logger.info("Waiting for incoming messages")
844+
# Start polling Kafka for messages and callbacks
845+
self._running = True
846+
847+
# Initialize the checkpoint
848+
self._processing_context.init_checkpoint()
849+
850+
dataframe_composed = dataframe.compose()
851+
852+
while self._running:
853+
if self._state_manager.recovery_required:
854+
self._state_manager.do_recovery()
855+
else:
856+
self._process_message(dataframe_composed)
857+
self._processing_context.commit_checkpoint()
858+
self._processing_context.resume_ready_partitions()
859+
self._source_manager.raise_for_error()
783860

784-
# Initialize the checkpoint
785-
self._processing_context.init_checkpoint()
861+
logger.info("Stop processing of StreamingDataFrame")
786862

787-
dataframe_composed = dataframe.compose()
863+
def _run_sources(self):
864+
self._running = True
865+
while self._running:
866+
self._source_manager.raise_for_error()
788867

789-
while self._running:
790-
if self._state_manager.recovery_required:
791-
self._state_manager.do_recovery()
792-
else:
793-
self._process_message(dataframe_composed)
794-
self._processing_context.commit_checkpoint()
795-
self._processing_context.resume_ready_partitions()
868+
if not self._source_manager.is_alive():
869+
self.stop()
796870

797-
logger.info("Stop processing of StreamingDataFrame")
871+
time.sleep(1)
798872

799873
def _quix_runtime_init(self):
800874
"""

quixstreams/dataframe/dataframe.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
from quixstreams.processing import ProcessingContext
4949
from quixstreams.sinks import BaseSink
5050
from quixstreams.state.types import State
51+
from quixstreams.sources import BaseSource
5152
from .base import BaseStreaming
5253
from .exceptions import InvalidOperation, GroupByLimitExceeded, DataFrameLocked
5354
from .series import StreamingSeries

quixstreams/logging.py

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1+
import os
12
import logging
23
import sys
3-
from typing import Literal, Optional
4+
from typing import Literal, Optional, Union
45

56
__all__ = ("configure_logging", "LogLevel")
67

@@ -13,18 +14,17 @@
1314
"NOTSET",
1415
]
1516

16-
_LOGGER_NAME = "quixstreams"
17+
LOGGER_NAME = "quixstreams"
1718

18-
_DEFAULT_FORMATTER = logging.Formatter(
19-
f"[%(asctime)s] [%(levelname)s] [{_LOGGER_NAME}] : %(message)s"
20-
)
2119
_DEFAULT_HANDLER = logging.StreamHandler(stream=sys.stderr)
2220

23-
logger = logging.getLogger(_LOGGER_NAME)
21+
logger = logging.getLogger(LOGGER_NAME)
2422

2523

2624
def configure_logging(
27-
loglevel: Optional[LogLevel],
25+
loglevel: Optional[Union[int, LogLevel]],
26+
name: str = LOGGER_NAME,
27+
pid: bool = False,
2828
) -> bool:
2929
"""
3030
Configure "quixstreams" logger.
@@ -35,20 +35,36 @@ def configure_logging(
3535
3636
:param loglevel: a valid log level as a string or None.
3737
If None passed, this function is no-op and no logging will be configured.
38+
:param name: the log name included in the output
39+
:param pid: if True include the process PID in the logs
3840
:return: True if logging config has been updated, otherwise False.
3941
"""
4042
if loglevel is None:
4143
# Skipping logging configuration
4244
return False
4345

46+
if pid:
47+
formatter = (
48+
f"[%(asctime)s] [%(levelname)s] [{name}] [{os.getpid()}] : %(message)s"
49+
)
50+
else:
51+
formatter = f"[%(asctime)s] [%(levelname)s] [{name}] : %(message)s"
52+
4453
if logger.handlers:
45-
# There's a pre-configured handler for "quixstreams", leave it as it is
46-
return False
54+
if len(logger.handlers) != 1 or logger.handlers[0] is not _DEFAULT_HANDLER:
55+
# There's a pre-configured handler for "quixstreams", leave it as it is
56+
return False
57+
else:
58+
# The pre-configured handler for "quixstreams" is the default handler.
59+
# Reconfigure the formatter in case we are in a subprocess and the logger
60+
# was configured by mistake by the Application.
61+
_DEFAULT_HANDLER.setFormatter(logging.Formatter(formatter))
62+
return True
4763

4864
# Configuring logger
4965
logger.setLevel(loglevel)
5066
logger.propagate = False
51-
_DEFAULT_HANDLER.setFormatter(_DEFAULT_FORMATTER)
67+
_DEFAULT_HANDLER.setFormatter(logging.Formatter(formatter))
5268
_DEFAULT_HANDLER.setLevel(loglevel)
5369
logger.addHandler(_DEFAULT_HANDLER)
5470
return True

quixstreams/models/topics/manager.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,23 @@ def topic(
283283
self._topics[name] = topic
284284
return topic
285285

286+
def register(self, topic: Topic):
287+
"""
288+
Register an already generated :class:`quixstreams.models.topics.Topic` to the topic manager.
289+
290+
The topic name and config can be updated by the topic manager.
291+
292+
:param topic: The topic to register
293+
"""
294+
topic.name = self._resolve_topic_name(topic.name)
295+
if topic.config is None:
296+
topic.config = TopicConfig(
297+
num_partitions=self.default_num_partitions,
298+
replication_factor=self.default_replication_factor,
299+
extra_config=self.default_extra_config,
300+
)
301+
self._topics[topic.name] = topic
302+
286303
def repartition_topic(
287304
self,
288305
operation: str,

quixstreams/models/topics/topic.py

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ class Topic:
9393
def __init__(
9494
self,
9595
name: str,
96-
config: TopicConfig,
96+
config: Optional[TopicConfig] = None,
9797
value_deserializer: Optional[DeserializerType] = None,
9898
key_deserializer: Optional[DeserializerType] = BytesDeserializer(),
9999
value_serializer: Optional[SerializerType] = None,
@@ -110,25 +110,14 @@ def __init__(
110110
:param timestamp_extractor: a callable that returns a timestamp in
111111
milliseconds from a deserialized message.
112112
"""
113-
self._name = name
114-
self._config = config
113+
self.name = name
114+
self.config = config
115115
self._key_serializer = _get_serializer(key_serializer)
116116
self._key_deserializer = _get_deserializer(key_deserializer)
117117
self._value_serializer = _get_serializer(value_serializer)
118118
self._value_deserializer = _get_deserializer(value_deserializer)
119119
self._timestamp_extractor = timestamp_extractor
120120

121-
@property
122-
def name(self) -> str:
123-
"""
124-
Topic name
125-
"""
126-
return self._name
127-
128-
@property
129-
def config(self) -> TopicConfig:
130-
return self._config
131-
132121
def row_serialize(self, row: Row, key: Any) -> KafkaMessage:
133122
"""
134123
Serialize Row to a Kafka message structure

quixstreams/sources/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .base import *
2+
from .manager import SourceException
3+
from .multiprocessing import multiprocessing

0 commit comments

Comments
 (0)