Skip to content

Commit c4362dc

Browse files
authored
Add topic_type to the Topic objects (#847)
1 parent 4e28546 commit c4362dc

File tree

3 files changed

+48
-16
lines changed

3 files changed

+48
-16
lines changed

quixstreams/models/topics/manager.py

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
TopicNameLengthExceeded,
1111
TopicNotFoundError,
1212
)
13-
from .topic import TimestampExtractor, Topic, TopicConfig
13+
from .topic import TimestampExtractor, Topic, TopicConfig, TopicType
1414

1515
logger = logging.getLogger(__name__)
1616

@@ -56,24 +56,16 @@ def __init__(
5656
"""
5757
self._admin = topic_admin
5858
self._consumer_group = consumer_group
59-
self._topics: Dict[str, Topic] = {}
59+
self._regular_topics: Dict[str, Topic] = {}
6060
self._repartition_topics: Dict[str, Topic] = {}
6161
self._changelog_topics: Dict[Optional[str], Dict[str, Topic]] = {}
6262
self._timeout = timeout
6363
self._create_timeout = create_timeout
6464
self._auto_create_topics = auto_create_topics
6565

66-
@property
67-
def _all_topics_list(self) -> List[Topic]:
68-
return (
69-
list(self._topics.values())
70-
+ list(self._repartition_topics.values())
71-
+ self.changelog_topics_list
72-
)
73-
7466
@property
7567
def topics(self) -> Dict[str, Topic]:
76-
return self._topics
68+
return self._regular_topics
7769

7870
@property
7971
def repartition_topics(self) -> Dict[str, Topic]:
@@ -102,7 +94,7 @@ def non_changelog_topics(self) -> Dict[str, Topic]:
10294
"""
10395
Returns a dict with normal and repartition topics
10496
"""
105-
return {**self._topics, **self._repartition_topics}
97+
return {**self._regular_topics, **self._repartition_topics}
10698

10799
@property
108100
def all_topics(self) -> Dict[str, Topic]:
@@ -111,7 +103,12 @@ def all_topics(self) -> Dict[str, Topic]:
111103
112104
returns: full topic dict, {topic_name: Topic}
113105
"""
114-
return {topic.name: topic for topic in self._all_topics_list}
106+
all_topics_list = (
107+
list(self._regular_topics.values())
108+
+ list(self._repartition_topics.values())
109+
+ self.changelog_topics_list
110+
)
111+
return {topic.name: topic for topic in all_topics_list}
115112

116113
def topic_config(
117114
self,
@@ -175,10 +172,11 @@ def topic(
175172
key_deserializer=key_deserializer,
176173
create_config=create_config,
177174
timestamp_extractor=timestamp_extractor,
175+
topic_type=TopicType.REGULAR,
178176
)
179177
broker_topic = self._get_or_create_broker_topic(topic)
180178
topic = self._configure_topic(topic, broker_topic)
181-
self._topics[topic.name] = topic
179+
self._regular_topics[topic.name] = topic
182180
return topic
183181

184182
def register(self, topic: Topic) -> Topic:
@@ -197,7 +195,7 @@ def register(self, topic: Topic) -> Topic:
197195
)
198196
broker_topic = self._get_or_create_broker_topic(topic)
199197
topic = self._configure_topic(topic, broker_topic)
200-
self._topics[topic.name] = topic
198+
self._regular_topics[topic.name] = topic
201199
return topic
202200

203201
def repartition_topic(
@@ -230,6 +228,7 @@ def repartition_topic(
230228
value_serializer=value_serializer,
231229
key_serializer=key_serializer,
232230
create_config=config,
231+
topic_type=TopicType.REPARTITION,
233232
)
234233
broker_topic = self._get_or_create_broker_topic(topic)
235234
topic = self._configure_topic(topic, broker_topic)
@@ -269,6 +268,7 @@ def changelog_topic(
269268
key_deserializer="bytes",
270269
value_deserializer="bytes",
271270
create_config=config,
271+
topic_type=TopicType.CHANGELOG,
272272
)
273273
broker_topic = self._get_or_create_broker_topic(topic)
274274
topic = self._configure_topic(topic, broker_topic)

quixstreams/models/topics/topic.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import copy
22
import dataclasses
3+
import enum
34
import logging
45
from typing import Any, Callable, List, Optional, Union
56

@@ -30,7 +31,7 @@
3031
SuccessfulConfluentKafkaMessageProto,
3132
)
3233

33-
__all__ = ("Topic", "TopicConfig", "TimestampExtractor")
34+
__all__ = ("Topic", "TopicConfig", "TimestampExtractor", "TopicType")
3435

3536
logger = logging.getLogger(__name__)
3637

@@ -82,6 +83,12 @@ def _get_deserializer(
8283
return deserializer
8384

8485

86+
class TopicType(enum.Enum):
87+
REGULAR = 1
88+
REPARTITION = 2
89+
CHANGELOG = 3
90+
91+
8592
class Topic:
8693
"""
8794
A definition of a Kafka topic.
@@ -94,6 +101,7 @@ class Topic:
94101
def __init__(
95102
self,
96103
name: str,
104+
topic_type: TopicType = TopicType.REGULAR,
97105
create_config: Optional[TopicConfig] = None,
98106
value_deserializer: Optional[DeserializerType] = None,
99107
key_deserializer: Optional[DeserializerType] = BytesDeserializer(),
@@ -104,6 +112,13 @@ def __init__(
104112
):
105113
"""
106114
:param name: topic name
115+
:param topic_type: a type of the topic, can be one of:
116+
- `TopicType.REGULAR` - the regular input and output topics
117+
- `TopicType.REPARTITION` - a repartition topic used for re-keying the data
118+
- `TopicType.CHANGELOG` - a changelog topic to back up the state stores.
119+
120+
Default - `TopicType.REGULAR`.
121+
107122
:param create_config: a `TopicConfig` to create a new topic if it does not exist
108123
:param value_deserializer: a deserializer type for values
109124
:param key_deserializer: a deserializer type for keys
@@ -123,6 +138,7 @@ def __init__(
123138
self._value_serializer = _get_serializer(value_serializer)
124139
self._key_serializer = _get_serializer(key_serializer)
125140
self._timestamp_extractor = timestamp_extractor
141+
self._type = topic_type
126142

127143
def __clone__(
128144
self,
@@ -138,6 +154,7 @@ def __clone__(
138154
value_serializer=self._value_serializer,
139155
key_serializer=self._key_serializer,
140156
timestamp_extractor=self._timestamp_extractor,
157+
topic_type=self._type,
141158
)
142159

143160
@property
@@ -166,6 +183,18 @@ def broker_config(self) -> TopicConfig:
166183
def broker_config(self, config: TopicConfig):
167184
self._broker_config = copy.deepcopy(config)
168185

186+
@property
187+
def is_changelog(self) -> bool:
188+
return self._type == TopicType.CHANGELOG
189+
190+
@property
191+
def is_regular(self) -> bool:
192+
return self._type == TopicType.REGULAR
193+
194+
@property
195+
def is_repartition(self) -> bool:
196+
return self._type == TopicType.REPARTITION
197+
169198
def row_serialize(self, row: Row, key: Any) -> KafkaMessage:
170199
"""
171200
Serialize Row to a Kafka message structure

tests/test_quixstreams/test_models/test_topics/test_manager.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ def test_topic_with_config(self, topic_manager_factory):
5050

5151
assert topic.name == topic_name
5252
assert topic.quix_name == topic_name
53+
assert topic.is_regular
5354
assert topic.create_config.num_partitions == create_config.num_partitions
5455
assert (
5556
topic.create_config.replication_factor == create_config.replication_factor
@@ -187,6 +188,7 @@ def test_changelog_topic(self, topic_manager_factory):
187188
assert changelog.broker_config.num_partitions == 1
188189
assert changelog.broker_config.replication_factor == 1
189190
assert changelog.broker_config.extra_config["cleanup.policy"] == "compact"
191+
assert changelog.is_changelog
190192

191193
def test_changelog_topic_partition_count_mismatch(
192194
self, topic_manager_factory, kafka_admin_client
@@ -260,6 +262,7 @@ def test_repartition_topic(self, topic_manager_factory):
260262
assert repartition.broker_config.replication_factor == 1
261263
assert repartition.broker_config.extra_config["retention.ms"] == "1000"
262264
assert repartition.broker_config.extra_config["retention.bytes"] == "1000"
265+
assert repartition.is_repartition
263266

264267
def test_changelog_nested_internal_topic_naming(self, topic_manager_factory):
265268
"""

0 commit comments

Comments
 (0)