Skip to content

Commit c27172c

Browse files
authored
Chore/quix topics management change (#594)
1 parent 47301e8 commit c27172c

File tree

11 files changed

+540
-582
lines changed

11 files changed

+540
-582
lines changed

quixstreams/app.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -647,8 +647,7 @@ def add_source(self, source: BaseSource, topic: Optional[Topic] = None) -> Topic
647647
Default: the source default
648648
"""
649649
if not topic:
650-
topic = source.default_topic()
651-
self._topic_manager.register(topic)
650+
topic = self._topic_manager.register(source.default_topic())
652651

653652
producer = self._get_rowproducer(transactional=False)
654653
source.configure(topic, producer)

quixstreams/models/topics/manager.py

Lines changed: 48 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,11 @@ def affirm_ready_for_create(topics: List[Topic]):
3030

3131
class TopicManager:
3232
"""
33-
The source of all topic management with quixstreams.
33+
The source of all topic management for a Quix Streams Application.
3434
35-
Generally initialized and managed automatically by an `Application`,
36-
but allows a user to work with it directly when needed, such as using it alongside
37-
a plain `Producer` to create its topics.
35+
Intended only for internal use by Application.
3836
39-
See methods for details.
37+
To create a Topic, use Application.topic() or generate them directly.
4038
"""
4139

4240
# Default topic params
@@ -118,19 +116,17 @@ def all_topics(self) -> Dict[str, Topic]:
118116
"""
119117
return {topic.name: topic for topic in self._all_topics_list}
120118

121-
def _resolve_topic_name(self, name: str) -> str:
119+
def _finalize_topic(self, topic: Topic) -> Topic:
122120
"""
123-
Here primarily for adjusting the topic name for Quix topics.
121+
Validates the original topic name and returns the Topic.
124122
125-
Also validates topic name is not too long.
126-
127-
:return: name, no changes (identity function)
123+
Does more in QuixTopicManager.
128124
"""
129-
if len(name) > self._max_topic_name_len:
125+
if len(topic.name) > self._max_topic_name_len:
130126
raise TopicNameLengthExceeded(
131-
f"Topic {name} exceeds the {self._max_topic_name_len} character limit"
127+
f"'{topic.name}' exceeds the {self._max_topic_name_len} character limit"
132128
)
133-
return name
129+
return topic
134130

135131
def _format_nested_name(self, topic_name: str) -> str:
136132
"""
@@ -171,9 +167,7 @@ def _internal_name(
171167
:return: formatted topic name
172168
"""
173169
nested_name = self._format_nested_name(topic_name)
174-
return self._resolve_topic_name(
175-
f"{topic_type}__{'--'.join([self._consumer_group, nested_name, suffix])}"
176-
)
170+
return f"{topic_type}__{'--'.join([self._consumer_group, nested_name, suffix])}"
177171

178172
def _create_topics(
179173
self, topics: List[Topic], timeout: float, create_timeout: float
@@ -264,42 +258,44 @@ def topic(
264258
265259
:return: Topic object with creation configs
266260
"""
267-
name = self._resolve_topic_name(name)
268-
269261
if not config:
270262
config = TopicConfig(
271263
num_partitions=self.default_num_partitions,
272264
replication_factor=self.default_replication_factor,
273265
extra_config=self.default_extra_config,
274266
)
275-
topic = Topic(
276-
name=name,
277-
value_serializer=value_serializer,
278-
value_deserializer=value_deserializer,
279-
key_serializer=key_serializer,
280-
key_deserializer=key_deserializer,
281-
config=config,
282-
timestamp_extractor=timestamp_extractor,
267+
268+
topic = self._finalize_topic(
269+
Topic(
270+
name=name,
271+
value_serializer=value_serializer,
272+
value_deserializer=value_deserializer,
273+
key_serializer=key_serializer,
274+
key_deserializer=key_deserializer,
275+
config=config,
276+
timestamp_extractor=timestamp_extractor,
277+
)
283278
)
284-
self._topics[name] = topic
279+
self._topics[topic.name] = topic
285280
return topic
286281

287-
def register(self, topic: Topic):
282+
def register(self, topic: Topic) -> Topic:
288283
"""
289284
Register an already generated :class:`quixstreams.models.topics.Topic` to the topic manager.
290285
291286
The topic name and config can be updated by the topic manager.
292287
293288
:param topic: The topic to register
294289
"""
295-
topic.name = self._resolve_topic_name(topic.name)
296290
if topic.config is None:
297291
topic.config = TopicConfig(
298292
num_partitions=self.default_num_partitions,
299293
replication_factor=self.default_replication_factor,
300294
extra_config=self.default_extra_config,
301295
)
296+
topic = self._finalize_topic(topic)
302297
self._topics[topic.name] = topic
298+
return topic
303299

304300
def repartition_topic(
305301
self,
@@ -324,21 +320,21 @@ def repartition_topic(
324320
325321
:return: `Topic` object (which is also stored on the TopicManager)
326322
"""
327-
name = self._internal_name("repartition", topic_name, operation)
328-
329-
topic = Topic(
330-
name=name,
331-
value_deserializer=value_deserializer,
332-
key_deserializer=key_deserializer,
333-
value_serializer=value_serializer,
334-
key_serializer=key_serializer,
335-
config=self._get_source_topic_config(
336-
topic_name,
337-
extras_imports=self._groupby_extra_config_imports_defaults,
338-
timeout=timeout if timeout is not None else self._timeout,
339-
),
323+
topic = self._finalize_topic(
324+
Topic(
325+
name=self._internal_name("repartition", topic_name, operation),
326+
value_deserializer=value_deserializer,
327+
key_deserializer=key_deserializer,
328+
value_serializer=value_serializer,
329+
key_serializer=key_serializer,
330+
config=self._get_source_topic_config(
331+
topic_name,
332+
extras_imports=self._groupby_extra_config_imports_defaults,
333+
timeout=timeout if timeout is not None else self._timeout,
334+
),
335+
)
340336
)
341-
self._repartition_topics[name] = topic
337+
self._repartition_topics[topic.name] = topic
342338
return topic
343339

344340
def changelog_topic(
@@ -373,7 +369,6 @@ def changelog_topic(
373369
:return: `Topic` object (which is also stored on the TopicManager)
374370
"""
375371

376-
topic_name = self._resolve_topic_name(topic_name)
377372
source_topic_config = self._get_source_topic_config(
378373
topic_name,
379374
extras_imports=self._changelog_extra_config_imports_defaults,
@@ -387,13 +382,15 @@ def changelog_topic(
387382
extra_config=source_topic_config.extra_config,
388383
)
389384

390-
topic = Topic(
391-
name=self._internal_name("changelog", topic_name, store_name),
392-
key_serializer="bytes",
393-
value_serializer="bytes",
394-
key_deserializer="bytes",
395-
value_deserializer="bytes",
396-
config=changelog_config,
385+
topic = self._finalize_topic(
386+
Topic(
387+
name=self._internal_name("changelog", topic_name, store_name),
388+
key_serializer="bytes",
389+
value_serializer="bytes",
390+
key_deserializer="bytes",
391+
value_deserializer="bytes",
392+
config=changelog_config,
393+
)
397394
)
398395
self._changelog_topics.setdefault(topic_name, {})[store_name] = topic
399396
return topic

quixstreams/models/topics/topic.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,12 +111,32 @@ def __init__(
111111
"""
112112
self.name = name
113113
self.config = config
114-
self._key_serializer = _get_serializer(key_serializer)
114+
self._value_deserializer = _get_deserializer(value_deserializer)
115115
self._key_deserializer = _get_deserializer(key_deserializer)
116116
self._value_serializer = _get_serializer(value_serializer)
117-
self._value_deserializer = _get_deserializer(value_deserializer)
117+
self._key_serializer = _get_serializer(key_serializer)
118118
self._timestamp_extractor = timestamp_extractor
119119

120+
def __clone__(
121+
self,
122+
name: str,
123+
config: Optional[TopicConfig] = None,
124+
value_deserializer: Optional[DeserializerType] = None,
125+
key_deserializer: Optional[DeserializerType] = None,
126+
value_serializer: Optional[SerializerType] = None,
127+
key_serializer: Optional[SerializerType] = None,
128+
timestamp_extractor: Optional[TimestampExtractor] = None,
129+
):
130+
return self.__class__(
131+
name=name,
132+
config=config or self.config,
133+
value_deserializer=value_deserializer or self._value_deserializer,
134+
key_deserializer=key_deserializer or self._key_deserializer,
135+
value_serializer=value_serializer or self._value_serializer,
136+
key_serializer=key_serializer or self._key_serializer,
137+
timestamp_extractor=timestamp_extractor or self._timestamp_extractor,
138+
)
139+
120140
def row_serialize(self, row: Row, key: Any) -> KafkaMessage:
121141
"""
122142
Serialize Row to a Kafka message structure

0 commit comments

Comments
 (0)