From 213253faa21a8ab8e1924e70b3dc8021eafd2472 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Mon, 27 Jan 2025 12:31:31 +0300 Subject: [PATCH 01/10] Partition autosplit feature --- ydb/_grpc/grpcwrapper/ydb_topic.py | 24 +++++++++++++++++++++++ ydb/_topic_reader/topic_reader.py | 2 ++ ydb/_topic_reader/topic_reader_asyncio.py | 9 +++++++++ 3 files changed, 35 insertions(+) diff --git a/ydb/_grpc/grpcwrapper/ydb_topic.py b/ydb/_grpc/grpcwrapper/ydb_topic.py index ec84ab08..f8c45d2e 100644 --- a/ydb/_grpc/grpcwrapper/ydb_topic.py +++ b/ydb/_grpc/grpcwrapper/ydb_topic.py @@ -419,12 +419,14 @@ def from_proto( class InitRequest(IToProto): topics_read_settings: List["StreamReadMessage.InitRequest.TopicReadSettings"] consumer: str + auto_partitioning_support: bool = False def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.InitRequest: res = ydb_topic_pb2.StreamReadMessage.InitRequest() res.consumer = self.consumer for settings in self.topics_read_settings: res.topics_read_settings.append(settings.to_proto()) + res.auto_partitioning_support = self.auto_partitioning_support return res @dataclass @@ -696,6 +698,20 @@ def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.StopPartitionSessionRespon partition_session_id=self.partition_session_id, ) + @dataclass + class EndPartitionSession(IFromProto): + partition_session_id: int + adjacent_partition_ids: List[int] + child_partition_ids: List[int] + + @staticmethod + def from_proto(msg: ydb_topic_pb2.StreamReadMessage.EndPartitionSession): + return StreamReadMessage.EndPartitionSession( + partition_session_id=msg.partition_session_id, + adjacent_partition_ids=list(msg.adjacent_partition_ids), + child_partition_ids=list(msg.child_partition_ids), + ) + @dataclass class FromClient(IToProto): client_message: "ReaderMessagesFromClientToServer" @@ -775,6 +791,13 @@ def from_proto( msg.partition_session_status_response ), ) + elif mess_type == "end_partition_session": + return StreamReadMessage.FromServer( + server_status=server_status, + server_message=StreamReadMessage.EndPartitionSession.from_proto( + msg.end_partition_session, + ), + ) else: raise issues.UnexpectedGrpcMessage( "Unexpected message while parse ReaderMessagesFromServerToClient: '%s'" % mess_type @@ -799,6 +822,7 @@ def from_proto( UpdateTokenResponse, StreamReadMessage.StartPartitionSessionRequest, StreamReadMessage.StopPartitionSessionRequest, + StreamReadMessage.EndPartitionSession, ] diff --git a/ydb/_topic_reader/topic_reader.py b/ydb/_topic_reader/topic_reader.py index b907ee27..699e2417 100644 --- a/ydb/_topic_reader/topic_reader.py +++ b/ydb/_topic_reader/topic_reader.py @@ -45,6 +45,7 @@ class PublicReaderSettings: consumer: str topic: TopicSelectorTypes buffer_size_bytes: int = 50 * 1024 * 1024 + auto_partitioning_support: bool = False decoders: Union[Mapping[int, Callable[[bytes], bytes]], None] = None """decoders: map[codec_code] func(encoded_bytes)->decoded_bytes""" @@ -77,6 +78,7 @@ def _init_message(self) -> StreamReadMessage.InitRequest: return StreamReadMessage.InitRequest( topics_read_settings=list(map(PublicTopicSelector._to_topic_read_settings, selectors)), # type: ignore consumer=self.consumer, + auto_partitioning_support=self.auto_partitioning_support, ) def _retry_settings(self) -> RetrySettings: diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index e407fe01..6545a451 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -498,6 +498,12 @@ async def _read_messages_loop(self): ): self._on_partition_session_stop(message.server_message) + elif isinstance( + message.server_message, + StreamReadMessage.EndPartitionSession, + ): + self._on_end_partition_session(message.server_message) + elif isinstance(message.server_message, UpdateTokenResponse): self._update_token_event.set() @@ -575,6 +581,9 @@ def _on_partition_session_stop(self, message: StreamReadMessage.StopPartitionSes ) ) + def _on_end_partition_session(self, message: StreamReadMessage.EndPartitionSession): + logger.debug(f"End partition session with id: {message.partition_session_id}") + def _on_read_response(self, message: StreamReadMessage.ReadResponse): self._buffer_consume_bytes(message.bytes_size) From 7c607f5a1aee1ebb175e38ebe632b31644d67bc0 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Fri, 31 Jan 2025 10:49:43 +0300 Subject: [PATCH 02/10] Auto partitioning settings to public api --- ydb/_grpc/grpcwrapper/ydb_topic.py | 171 ++++++++++++++++++ .../grpcwrapper/ydb_topic_public_types.py | 33 ++++ ydb/_topic_reader/topic_reader_asyncio.py | 2 +- ydb/topic.py | 11 ++ 4 files changed, 216 insertions(+), 1 deletion(-) diff --git a/ydb/_grpc/grpcwrapper/ydb_topic.py b/ydb/_grpc/grpcwrapper/ydb_topic.py index f8c45d2e..3d54af39 100644 --- a/ydb/_grpc/grpcwrapper/ydb_topic.py +++ b/ydb/_grpc/grpcwrapper/ydb_topic.py @@ -967,18 +967,123 @@ def from_public(alter_consumer: ydb_topic_public_types.PublicAlterConsumer) -> A class PartitioningSettings(IToProto, IFromProto): min_active_partitions: int partition_count_limit: int + max_active_partitions: int + auto_partitioning_settings: AutoPartitioningSettings @staticmethod def from_proto(msg: ydb_topic_pb2.PartitioningSettings) -> "PartitioningSettings": return PartitioningSettings( min_active_partitions=msg.min_active_partitions, partition_count_limit=msg.partition_count_limit, + max_active_partitions=msg.max_active_partitions, + auto_partitioning_settings=AutoPartitioningSettings.from_proto( + msg.auto_partitioning_settings + ), ) def to_proto(self) -> ydb_topic_pb2.PartitioningSettings: return ydb_topic_pb2.PartitioningSettings( min_active_partitions=self.min_active_partitions, partition_count_limit=self.partition_count_limit, + max_active_partitions=self.max_active_partitions, + auto_partitioning_settings=self.auto_partitioning_settings.to_proto() + ) + + +class AutoPartitioningStrategy(int, IFromProto, IFromPublic, IToPublic): + UNSPECIFIED = 0 + DISABLED = 1 + SCALE_UP = 2 + SCALE_UP_AND_DOWN = 3 + PAUSED = 4 + + @staticmethod + def from_public( + strategy: Optional[ydb_topic_public_types.PublicAutoPartitioningStrategy], + ) -> Optional["AutoPartitioningStrategy"]: + if strategy is None: + return None + + return AutoPartitioningStrategy(strategy) + + @staticmethod + def from_proto(code: Optional[int]) -> Optional["AutoPartitioningStrategy"]: + if code is None: + return None + + return AutoPartitioningStrategy(code) + + def to_public(self) -> ydb_topic_public_types.PublicAutoPartitioningStrategy: + try: + ydb_topic_public_types.PublicAutoPartitioningStrategy(int(self)) + except KeyError: + return ydb_topic_public_types.PublicAutoPartitioningStrategy.UNSPECIFIED + + +@dataclass +class AutoPartitioningSettings(IToProto, IFromProto, IFromPublic, IToPublic): + strategy: AutoPartitioningStrategy + partition_write_speed: AutoPartitioningWriteSpeedStrategy + + @staticmethod + def from_public( + settings: Optional[ydb_topic_public_types.PublicAutoPartitioningSettings] + ) -> Optional[AutoPartitioningSettings]: + if not settings: + return None + + return AutoPartitioningSettings( + strategy=settings.strategy, + partition_write_speed=AutoPartitioningWriteSpeedStrategy( + stabilization_window=settings.stabilization_window, + up_utilization_percent=settings.up_utilization_percent, + down_utilization_percent=settings.down_utilization_percent, + ) + ) + + @staticmethod + def from_proto(msg: ydb_topic_pb2.AutoPartitioningSettings) -> AutoPartitioningSettings: + return AutoPartitioningSettings( + strategy=AutoPartitioningStrategy.from_proto(msg.strategy), + partition_write_speed=AutoPartitioningWriteSpeedStrategy.from_proto( + msg.partition_write_speed + ), + ) + + def to_proto(self) -> ydb_topic_pb2.AutoPartitioningSettings: + return ydb_topic_pb2.AutoPartitioningSettings( + strategy=self.strategy, + partition_write_speed=self.partition_write_speed.to_proto() + ) + + def to_public(self) -> ydb_topic_public_types.PublicAutoPartitioningSettings: + return ydb_topic_public_types.PublicAutoPartitioningSettings( + strategy=self.strategy.to_public(), + stabilization_window=self.partition_write_speed.stabilization_window, + up_utilization_percent=self.partition_write_speed.up_utilization_percent, + down_utilization_percent=self.partition_write_speed.down_utilization_percent, + ) + + +@dataclass +class AutoPartitioningWriteSpeedStrategy(IToProto, IFromProto): + stabilization_window: Optional[datetime.timedelta] + up_utilization_percent: Optional[int] + down_utilization_percent: Optional[int] + + def to_proto(self): + return ydb_topic_pb2.AutoPartitioningWriteSpeedStrategy( + stabilization_window=proto_duration_from_timedelta(self.stabilization_window), + up_utilization_percent=self.up_utilization_percent, + down_utilization_percent=self.down_utilization_percent, + ) + + @staticmethod + def from_proto(msg: ydb_topic_pb2.AutoPartitioningWriteSpeedStrategy) -> AutoPartitioningWriteSpeedStrategy: + return AutoPartitioningWriteSpeedStrategy( + stabilization_window=timedelta_from_proto_duration(msg.stabilization_window), + up_utilization_percent=msg.up_utilization_percent, + down_utilization_percent=msg.down_utilization_percent, ) @@ -986,11 +1091,60 @@ def to_proto(self) -> ydb_topic_pb2.PartitioningSettings: class AlterPartitioningSettings(IToProto): set_min_active_partitions: Optional[int] set_partition_count_limit: Optional[int] + set_max_active_partitions: Optional[int] + alter_auto_partitioning_settings: Optional[AlterAutoPartitioningSettings] def to_proto(self) -> ydb_topic_pb2.AlterPartitioningSettings: return ydb_topic_pb2.AlterPartitioningSettings( set_min_active_partitions=self.set_min_active_partitions, set_partition_count_limit=self.set_partition_count_limit, + set_max_active_partitions=self.set_max_active_partitions, + ) + + +@dataclass +class AlterAutoPartitioningSettings(IToProto, IFromPublic): + set_strategy: Optional[AutoPartitioningStrategy] + set_partition_write_speed: Optional[AlterAutoPartitioningWriteSpeedStrategy] + + @staticmethod + def from_public( + settings: Optional[ydb_topic_public_types.PublicAlterAutoPartitioningSettings] + ) -> Optional[AlterAutoPartitioningSettings]: + if not settings: + return None + + return AutoPartitioningSettings( + strategy=settings.set_strategy, + partition_write_speed=AlterAutoPartitioningWriteSpeedStrategy( + stabilization_window=settings.set_stabilization_window, + up_utilization_percent=settings.set_up_utilization_percent, + down_utilization_percent=settings.set_down_utilization_percent, + ) + ) + + def to_proto(self) -> ydb_topic_pb2.AlterAutoPartitioningSettings: + set_partition_write_speed = None + if self.set_partition_write_speed: + set_partition_write_speed = self.set_partition_write_speed.to_proto() + + return ydb_topic_pb2.AlterAutoPartitioningSettings( + set_strategy=self.set_strategy, + set_partition_write_speed=set_partition_write_speed, + ) + + +@dataclass +class AlterAutoPartitioningWriteSpeedStrategy(IToProto): + set_stabilization_window: Optional[datetime.timedelta] + set_up_utilization_percent: Optional[int] + set_down_utilization_percent: Optional[int] + + def to_proto(self) -> ydb_topic_pb2.AlterAutoPartitioningWriteSpeedStrategy: + return ydb_topic_pb2.AlterAutoPartitioningWriteSpeedStrategy( + set_stabilization_window=proto_duration_from_timedelta(self.set_stabilization_window), + set_up_utilization_percent=self.set_up_utilization_percent, + set_down_utilization_percent=self.set_down_utilization_percent, ) @@ -1063,11 +1217,17 @@ def from_public(req: ydb_topic_public_types.CreateTopicRequestParams): consumer = ydb_topic_public_types.PublicConsumer(name=consumer) consumers.append(Consumer.from_public(consumer)) + auto_partitioning_settings = None + if req.auto_partitioning_settings is not None: + auto_partitioning_settings = AutoPartitioningSettings.from_public(req.auto_partitioning_settings) + return CreateTopicRequest( path=req.path, partitioning_settings=PartitioningSettings( min_active_partitions=req.min_active_partitions, partition_count_limit=req.partition_count_limit, + max_active_partitions=req.max_active_partitions, + auto_partitioning_settings=auto_partitioning_settings ), retention_period=req.retention_period, retention_storage_mb=req.retention_storage_mb, @@ -1138,6 +1298,13 @@ def from_public(req: ydb_topic_public_types.AlterTopicRequestParams) -> AlterTop consumer = ydb_topic_public_types.PublicAlterConsumer(name=consumer) alter_consumers.append(AlterConsumer.from_public(consumer)) + alter_auto_partitioning_settings = None + if req.alter_auto_partitioning_settings is not None: + alter_auto_partitioning_settings = AutoPartitioningSettings.from_public( + req.alter_auto_partitioning_settings + ) + + drop_consumers = req.drop_consumers if req.drop_consumers else [] return AlterTopicRequest( @@ -1145,6 +1312,8 @@ def from_public(req: ydb_topic_public_types.AlterTopicRequestParams) -> AlterTop alter_partitioning_settings=AlterPartitioningSettings( set_min_active_partitions=req.set_min_active_partitions, set_partition_count_limit=req.set_partition_count_limit, + set_max_active_partitions=req.set_max_active_partitions, + alter_auto_partitioning_settings=alter_auto_partitioning_settings, ), add_consumers=add_consumers, set_retention_period=req.set_retention_period, @@ -1205,6 +1374,8 @@ def to_public(self) -> ydb_topic_public_types.PublicDescribeTopicResult: return ydb_topic_public_types.PublicDescribeTopicResult( self=scheme._wrap_scheme_entry(self.self_proto), min_active_partitions=self.partitioning_settings.min_active_partitions, + max_active_partitions=self.partitioning_settings.max_active_partitions, + auto_partitioning_settings=self.partitioning_settings.auto_partitioning_settings.to_public(), partition_count_limit=self.partitioning_settings.partition_count_limit, partitions=list(map(DescribeTopicResult.PartitionInfo.to_public, self.partitions)), retention_period=self.retention_period, diff --git a/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py b/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py index d1987546..eebcec6d 100644 --- a/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py +++ b/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py @@ -19,6 +19,7 @@ class CreateTopicRequestParams: path: str min_active_partitions: Optional[int] + max_active_partitions: Optional[int] partition_count_limit: Optional[int] retention_period: Optional[datetime.timedelta] retention_storage_mb: Optional[int] @@ -28,12 +29,14 @@ class CreateTopicRequestParams: attributes: Optional[Dict[str, str]] consumers: Optional[List[Union["PublicConsumer", str]]] metering_mode: Optional["PublicMeteringMode"] + auto_partitioning_settings: Optional["PublicAutoPartitioningSettings"] @dataclass class AlterTopicRequestParams: path: str set_min_active_partitions: Optional[int] + set_max_active_partitions: Optional[int] set_partition_count_limit: Optional[int] add_consumers: Optional[List[Union["PublicConsumer", str]]] alter_consumers: Optional[List[Union["PublicAlterConsumer", str]]] @@ -45,6 +48,7 @@ class AlterTopicRequestParams: set_retention_period: Optional[datetime.timedelta] set_retention_storage_mb: Optional[int] set_supported_codecs: Optional[List[Union["PublicCodec", int]]] + alter_auto_partitioning_settings: Optional["PublicAlterAutoPartitioningSettings"] class PublicCodec(int): @@ -68,6 +72,30 @@ class PublicMeteringMode(IntEnum): REQUEST_UNITS = 2 +class PublicAutoPartitioningStrategy(IntEnum): + UNSPECIFIED = 0 + DISABLED = 1 + SCALE_UP = 2 + SCALE_UP_AND_DOWN = 3 + PAUSED = 4 + + +@dataclass +class PublicAutoPartitioningSettings: + strategy: Optional["PublicAutoPartitioningStrategy"] + stabilization_window: Optional[datetime.timedelta] + up_utilization_percent: Optional[int] + down_utilization_percent: Optional[int] + + +@dataclass +class PublicAlterAutoPartitioningSettings: + set_strategy: Optional["PublicAutoPartitioningStrategy"] + set_stabilization_window: Optional[datetime.timedelta] + set_up_utilization_percent: Optional[int] + set_down_utilization_percent: Optional[int] + + @dataclass class PublicConsumer: name: str @@ -138,6 +166,9 @@ class PublicDescribeTopicResult: min_active_partitions: int "Minimum partition count auto merge would stop working at" + max_active_partitions: int + "Minimum partition count auto split would stop working at" + partition_count_limit: int "Limit for total partition count, including active (open for write) and read-only partitions" @@ -171,6 +202,8 @@ class PublicDescribeTopicResult: topic_stats: "PublicDescribeTopicResult.TopicStats" "Statistics of topic" + auto_partitioning_settings: "PublicAutoPartitioningSettings" + @dataclass class PartitionInfo: partition_id: int diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index 6545a451..e7bba5a7 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -582,7 +582,7 @@ def _on_partition_session_stop(self, message: StreamReadMessage.StopPartitionSes ) def _on_end_partition_session(self, message: StreamReadMessage.EndPartitionSession): - logger.debug(f"End partition session with id: {message.partition_session_id}") + logger.info(f"End partition session with id: {message.partition_session_id}, child partitions: {message.child_partition_ids}") def _on_read_response(self, message: StreamReadMessage.ReadResponse): self._buffer_consume_bytes(message.bytes_size) diff --git a/ydb/topic.py b/ydb/topic.py index f0b872e2..6d0fceee 100644 --- a/ydb/topic.py +++ b/ydb/topic.py @@ -80,6 +80,9 @@ PublicConsumer as TopicConsumer, PublicAlterConsumer as TopicAlterConsumer, PublicMeteringMode as TopicMeteringMode, + PublicAutoPartitioningStrategy as TopicAutoPartitioningStrategy, + PublicAutoPartitioningSettings as TopicAutoPartitioningSettings, + PublicAlterAutoPartitioningSettings as TopicAlterAutoPartitioningSettings, ) @@ -117,6 +120,7 @@ async def create_topic( attributes: Optional[Dict[str, str]] = None, consumers: Optional[List[Union[TopicConsumer, str]]] = None, metering_mode: Optional[TopicMeteringMode] = None, + auto_partitioning_settings: Optional[TopicAutoPartitioningSettings] = None, ): """ create topic command @@ -151,6 +155,7 @@ async def alter_topic( self, path: str, set_min_active_partitions: Optional[int] = None, + set_max_active_partitions: Optional[int] = None, set_partition_count_limit: Optional[int] = None, add_consumers: Optional[List[Union[TopicConsumer, str]]] = None, alter_consumers: Optional[List[Union[TopicAlterConsumer, str]]] = None, @@ -162,6 +167,7 @@ async def alter_topic( set_retention_period: Optional[datetime.timedelta] = None, set_retention_storage_mb: Optional[int] = None, set_supported_codecs: Optional[List[Union[TopicCodec, int]]] = None, + alter_auto_partitioning_settings: Optional[TopicAlterAutoPartitioningSettings] = None, ): """ alter topic command @@ -226,6 +232,7 @@ def reader( # custom decoder executor for call builtin and custom decoders. If None - use shared executor pool. # if max_worker in the executor is 1 - then decoders will be called from the thread without parallel decoder_executor: Optional[concurrent.futures.Executor] = None, + auto_partitioning_support: Optional[bool] = False, # Auto partitioning feature flag. Default - False. ) -> TopicReaderAsyncIO: if not decoder_executor: @@ -314,6 +321,7 @@ def create_topic( attributes: Optional[Dict[str, str]] = None, consumers: Optional[List[Union[TopicConsumer, str]]] = None, metering_mode: Optional[TopicMeteringMode] = None, + auto_partitioning_settings: Optional[TopicAutoPartitioningSettings] = None, ): """ create topic command @@ -350,6 +358,7 @@ def alter_topic( self, path: str, set_min_active_partitions: Optional[int] = None, + set_max_active_partitions: Optional[int] = None, set_partition_count_limit: Optional[int] = None, add_consumers: Optional[List[Union[TopicConsumer, str]]] = None, alter_consumers: Optional[List[Union[TopicAlterConsumer, str]]] = None, @@ -361,6 +370,7 @@ def alter_topic( set_retention_period: Optional[datetime.timedelta] = None, set_retention_storage_mb: Optional[int] = None, set_supported_codecs: Optional[List[Union[TopicCodec, int]]] = None, + alter_auto_partitioning_settings: Optional[TopicAlterAutoPartitioningSettings] = None, ): """ alter topic command @@ -431,6 +441,7 @@ def reader( # custom decoder executor for call builtin and custom decoders. If None - use shared executor pool. # if max_worker in the executor is 1 - then decoders will be called from the thread without parallel decoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool + auto_partitioning_support: Optional[bool] = False, # Auto partitioning feature flag. Default - False. ) -> TopicReader: if not decoder_executor: decoder_executor = self._executor From 4f38b651eaca87f6815e2c5d4040397d7fea4946 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Fri, 31 Jan 2025 11:11:35 +0300 Subject: [PATCH 03/10] style fixes --- ydb/_grpc/grpcwrapper/ydb_topic.py | 24 +++++++++-------------- ydb/_topic_reader/topic_reader_asyncio.py | 4 +++- 2 files changed, 12 insertions(+), 16 deletions(-) diff --git a/ydb/_grpc/grpcwrapper/ydb_topic.py b/ydb/_grpc/grpcwrapper/ydb_topic.py index 3d54af39..b5317273 100644 --- a/ydb/_grpc/grpcwrapper/ydb_topic.py +++ b/ydb/_grpc/grpcwrapper/ydb_topic.py @@ -976,9 +976,7 @@ def from_proto(msg: ydb_topic_pb2.PartitioningSettings) -> "PartitioningSettings min_active_partitions=msg.min_active_partitions, partition_count_limit=msg.partition_count_limit, max_active_partitions=msg.max_active_partitions, - auto_partitioning_settings=AutoPartitioningSettings.from_proto( - msg.auto_partitioning_settings - ), + auto_partitioning_settings=AutoPartitioningSettings.from_proto(msg.auto_partitioning_settings), ) def to_proto(self) -> ydb_topic_pb2.PartitioningSettings: @@ -986,7 +984,7 @@ def to_proto(self) -> ydb_topic_pb2.PartitioningSettings: min_active_partitions=self.min_active_partitions, partition_count_limit=self.partition_count_limit, max_active_partitions=self.max_active_partitions, - auto_partitioning_settings=self.auto_partitioning_settings.to_proto() + auto_partitioning_settings=self.auto_partitioning_settings.to_proto(), ) @@ -1027,7 +1025,7 @@ class AutoPartitioningSettings(IToProto, IFromProto, IFromPublic, IToPublic): @staticmethod def from_public( - settings: Optional[ydb_topic_public_types.PublicAutoPartitioningSettings] + settings: Optional[ydb_topic_public_types.PublicAutoPartitioningSettings], ) -> Optional[AutoPartitioningSettings]: if not settings: return None @@ -1038,22 +1036,19 @@ def from_public( stabilization_window=settings.stabilization_window, up_utilization_percent=settings.up_utilization_percent, down_utilization_percent=settings.down_utilization_percent, - ) + ), ) @staticmethod def from_proto(msg: ydb_topic_pb2.AutoPartitioningSettings) -> AutoPartitioningSettings: return AutoPartitioningSettings( strategy=AutoPartitioningStrategy.from_proto(msg.strategy), - partition_write_speed=AutoPartitioningWriteSpeedStrategy.from_proto( - msg.partition_write_speed - ), + partition_write_speed=AutoPartitioningWriteSpeedStrategy.from_proto(msg.partition_write_speed), ) def to_proto(self) -> ydb_topic_pb2.AutoPartitioningSettings: return ydb_topic_pb2.AutoPartitioningSettings( - strategy=self.strategy, - partition_write_speed=self.partition_write_speed.to_proto() + strategy=self.strategy, partition_write_speed=self.partition_write_speed.to_proto() ) def to_public(self) -> ydb_topic_public_types.PublicAutoPartitioningSettings: @@ -1109,7 +1104,7 @@ class AlterAutoPartitioningSettings(IToProto, IFromPublic): @staticmethod def from_public( - settings: Optional[ydb_topic_public_types.PublicAlterAutoPartitioningSettings] + settings: Optional[ydb_topic_public_types.PublicAlterAutoPartitioningSettings], ) -> Optional[AlterAutoPartitioningSettings]: if not settings: return None @@ -1120,7 +1115,7 @@ def from_public( stabilization_window=settings.set_stabilization_window, up_utilization_percent=settings.set_up_utilization_percent, down_utilization_percent=settings.set_down_utilization_percent, - ) + ), ) def to_proto(self) -> ydb_topic_pb2.AlterAutoPartitioningSettings: @@ -1227,7 +1222,7 @@ def from_public(req: ydb_topic_public_types.CreateTopicRequestParams): min_active_partitions=req.min_active_partitions, partition_count_limit=req.partition_count_limit, max_active_partitions=req.max_active_partitions, - auto_partitioning_settings=auto_partitioning_settings + auto_partitioning_settings=auto_partitioning_settings, ), retention_period=req.retention_period, retention_storage_mb=req.retention_storage_mb, @@ -1304,7 +1299,6 @@ def from_public(req: ydb_topic_public_types.AlterTopicRequestParams) -> AlterTop req.alter_auto_partitioning_settings ) - drop_consumers = req.drop_consumers if req.drop_consumers else [] return AlterTopicRequest( diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index e7bba5a7..b42c1db2 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -582,7 +582,9 @@ def _on_partition_session_stop(self, message: StreamReadMessage.StopPartitionSes ) def _on_end_partition_session(self, message: StreamReadMessage.EndPartitionSession): - logger.info(f"End partition session with id: {message.partition_session_id}, child partitions: {message.child_partition_ids}") + logger.info( + f"End partition session with id: {message.partition_session_id}, child partitions: {message.child_partition_ids}" + ) def _on_read_response(self, message: StreamReadMessage.ReadResponse): self._buffer_consume_bytes(message.bytes_size) From f7655372108868a6b4bb7c140a1a2d4c0cfda54f Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Fri, 31 Jan 2025 12:41:27 +0300 Subject: [PATCH 04/10] Fix unit --- ydb/_grpc/grpcwrapper/ydb_topic.py | 41 ++++++++++++++----- .../grpcwrapper/ydb_topic_public_types.py | 16 ++++---- ydb/_grpc/grpcwrapper/ydb_topic_test.py | 18 +++++++- ydb/topic.py | 2 + 4 files changed, 57 insertions(+), 20 deletions(-) diff --git a/ydb/_grpc/grpcwrapper/ydb_topic.py b/ydb/_grpc/grpcwrapper/ydb_topic.py index b5317273..570bbb75 100644 --- a/ydb/_grpc/grpcwrapper/ydb_topic.py +++ b/ydb/_grpc/grpcwrapper/ydb_topic.py @@ -980,11 +980,15 @@ def from_proto(msg: ydb_topic_pb2.PartitioningSettings) -> "PartitioningSettings ) def to_proto(self) -> ydb_topic_pb2.PartitioningSettings: + auto_partitioning_settings = None + if self.auto_partitioning_settings is not None: + auto_partitioning_settings = self.auto_partitioning_settings.to_proto() + return ydb_topic_pb2.PartitioningSettings( min_active_partitions=self.min_active_partitions, partition_count_limit=self.partition_count_limit, max_active_partitions=self.max_active_partitions, - auto_partitioning_settings=self.auto_partitioning_settings.to_proto(), + auto_partitioning_settings=auto_partitioning_settings, ) @@ -1041,6 +1045,9 @@ def from_public( @staticmethod def from_proto(msg: ydb_topic_pb2.AutoPartitioningSettings) -> AutoPartitioningSettings: + if msg is None: + return None + return AutoPartitioningSettings( strategy=AutoPartitioningStrategy.from_proto(msg.strategy), partition_write_speed=AutoPartitioningWriteSpeedStrategy.from_proto(msg.partition_write_speed), @@ -1074,7 +1081,12 @@ def to_proto(self): ) @staticmethod - def from_proto(msg: ydb_topic_pb2.AutoPartitioningWriteSpeedStrategy) -> AutoPartitioningWriteSpeedStrategy: + def from_proto( + msg: Optional[ydb_topic_pb2.AutoPartitioningWriteSpeedStrategy], + ) -> Optional[AutoPartitioningWriteSpeedStrategy]: + if msg is None: + return None + return AutoPartitioningWriteSpeedStrategy( stabilization_window=timedelta_from_proto_duration(msg.stabilization_window), up_utilization_percent=msg.up_utilization_percent, @@ -1090,10 +1102,15 @@ class AlterPartitioningSettings(IToProto): alter_auto_partitioning_settings: Optional[AlterAutoPartitioningSettings] def to_proto(self) -> ydb_topic_pb2.AlterPartitioningSettings: + alter_auto_partitioning_settings = None + if self.alter_auto_partitioning_settings is not None: + alter_auto_partitioning_settings = self.alter_auto_partitioning_settings.to_proto() + return ydb_topic_pb2.AlterPartitioningSettings( set_min_active_partitions=self.set_min_active_partitions, set_partition_count_limit=self.set_partition_count_limit, set_max_active_partitions=self.set_max_active_partitions, + alter_auto_partitioning_settings=alter_auto_partitioning_settings, ) @@ -1109,12 +1126,12 @@ def from_public( if not settings: return None - return AutoPartitioningSettings( - strategy=settings.set_strategy, - partition_write_speed=AlterAutoPartitioningWriteSpeedStrategy( - stabilization_window=settings.set_stabilization_window, - up_utilization_percent=settings.set_up_utilization_percent, - down_utilization_percent=settings.set_down_utilization_percent, + return AlterAutoPartitioningSettings( + set_strategy=settings.set_strategy, + set_partition_write_speed=AlterAutoPartitioningWriteSpeedStrategy( + set_stabilization_window=settings.set_stabilization_window, + set_up_utilization_percent=settings.set_up_utilization_percent, + set_down_utilization_percent=settings.set_down_utilization_percent, ), ) @@ -1185,9 +1202,13 @@ class CreateTopicRequest(IToProto, IFromPublic): metering_mode: "MeteringMode" def to_proto(self) -> ydb_topic_pb2.CreateTopicRequest: + partitioning_settings = None + if self.partitioning_settings is not None: + partitioning_settings = self.partitioning_settings.to_proto() + return ydb_topic_pb2.CreateTopicRequest( path=self.path, - partitioning_settings=self.partitioning_settings.to_proto(), + partitioning_settings=partitioning_settings, retention_period=proto_duration_from_timedelta(self.retention_period), retention_storage_mb=self.retention_storage_mb, supported_codecs=self.supported_codecs.to_proto(), @@ -1295,7 +1316,7 @@ def from_public(req: ydb_topic_public_types.AlterTopicRequestParams) -> AlterTop alter_auto_partitioning_settings = None if req.alter_auto_partitioning_settings is not None: - alter_auto_partitioning_settings = AutoPartitioningSettings.from_public( + alter_auto_partitioning_settings = AlterAutoPartitioningSettings.from_public( req.alter_auto_partitioning_settings ) diff --git a/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py b/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py index eebcec6d..58e1a05b 100644 --- a/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py +++ b/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py @@ -82,18 +82,18 @@ class PublicAutoPartitioningStrategy(IntEnum): @dataclass class PublicAutoPartitioningSettings: - strategy: Optional["PublicAutoPartitioningStrategy"] - stabilization_window: Optional[datetime.timedelta] - up_utilization_percent: Optional[int] - down_utilization_percent: Optional[int] + strategy: Optional["PublicAutoPartitioningStrategy"] = 0 + stabilization_window: Optional[datetime.timedelta] = None + up_utilization_percent: Optional[int] = None + down_utilization_percent: Optional[int] = None @dataclass class PublicAlterAutoPartitioningSettings: - set_strategy: Optional["PublicAutoPartitioningStrategy"] - set_stabilization_window: Optional[datetime.timedelta] - set_up_utilization_percent: Optional[int] - set_down_utilization_percent: Optional[int] + set_strategy: Optional["PublicAutoPartitioningStrategy"] = 0 + set_stabilization_window: Optional[datetime.timedelta] = None + set_up_utilization_percent: Optional[int] = None + set_down_utilization_percent: Optional[int] = None @dataclass diff --git a/ydb/_grpc/grpcwrapper/ydb_topic_test.py b/ydb/_grpc/grpcwrapper/ydb_topic_test.py index 53256ac1..b9e30603 100644 --- a/ydb/_grpc/grpcwrapper/ydb_topic_test.py +++ b/ydb/_grpc/grpcwrapper/ydb_topic_test.py @@ -7,6 +7,8 @@ from .ydb_topic_public_types import ( AlterTopicRequestParams, PublicAlterConsumer, + PublicAlterAutoPartitioningSettings, + PublicAutoPartitioningStrategy, PublicConsumer, PublicCodec, ) @@ -51,7 +53,11 @@ def test_alter_topic_request_from_public_to_proto(): "alter_attributes": {"key": "value"}, "set_metering_mode": 1, "set_min_active_partitions": 2, - "set_partition_count_limit": 4, + "set_max_active_partitions": 8, + "set_partition_count_limit": 10, + "alter_auto_partitioning_settings": PublicAlterAutoPartitioningSettings( + set_strategy=PublicAutoPartitioningStrategy.DISABLED, + ), } params_public = AlterTopicRequestParams(**params) @@ -62,7 +68,15 @@ def test_alter_topic_request_from_public_to_proto(): expected_dict = { "path": "topic_name", - "alter_partitioning_settings": {"set_min_active_partitions": "2", "set_partition_count_limit": "4"}, + "alter_partitioning_settings": { + "set_min_active_partitions": "2", + "set_max_active_partitions": "8", + "set_partition_count_limit": "10", + "alter_auto_partitioning_settings": { + "set_strategy": "AUTO_PARTITIONING_STRATEGY_DISABLED", + "set_partition_write_speed": {}, + }, + }, "set_retention_period": "2419200s", "set_retention_storage_mb": "4", "set_supported_codecs": {"codecs": [1, 2]}, diff --git a/ydb/topic.py b/ydb/topic.py index 6d0fceee..042d5fe9 100644 --- a/ydb/topic.py +++ b/ydb/topic.py @@ -111,6 +111,7 @@ async def create_topic( self, path: str, min_active_partitions: Optional[int] = None, + max_active_partitions: Optional[int] = None, partition_count_limit: Optional[int] = None, retention_period: Optional[datetime.timedelta] = None, retention_storage_mb: Optional[int] = None, @@ -312,6 +313,7 @@ def create_topic( self, path: str, min_active_partitions: Optional[int] = None, + max_active_partitions: Optional[int] = None, partition_count_limit: Optional[int] = None, retention_period: Optional[datetime.timedelta] = None, retention_storage_mb: Optional[int] = None, From f68af4c96c285f7ec8ceded76ac42924625f1f6c Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Tue, 4 Feb 2025 12:15:47 +0300 Subject: [PATCH 05/10] auto partitioning control plane --- tests/topics/test_control_plane.py | 34 +++++++++++++++++++ ydb/_grpc/grpcwrapper/ydb_topic.py | 4 +-- .../grpcwrapper/ydb_topic_public_types.py | 8 ++--- ydb/_topic_reader/topic_reader_asyncio.py | 6 ++++ ydb/topic.py | 3 ++ 5 files changed, 49 insertions(+), 6 deletions(-) diff --git a/tests/topics/test_control_plane.py b/tests/topics/test_control_plane.py index 25258b61..79b5976c 100644 --- a/tests/topics/test_control_plane.py +++ b/tests/topics/test_control_plane.py @@ -2,6 +2,7 @@ import pytest +import ydb from ydb import issues @@ -56,6 +57,39 @@ async def test_alter_existed_topic(self, driver, topic_path): topic_after = await client.describe_topic(topic_path) assert topic_after.min_active_partitions == target_min_active_partitions + async def test_alter_auto_partitioning_settings(self, driver, topic_path): + client = driver.topic_client + + topic_before = await client.describe_topic(topic_path) + + expected = topic_before.auto_partitioning_settings + + expected.strategy = ydb.TopicAutoPartitioningStrategy.SCALE_UP + + await client.alter_topic( + topic_path, + alter_auto_partitioning_settings=ydb.TopicAlterAutoPartitioningSettings( + set_strategy=ydb.TopicAutoPartitioningStrategy.SCALE_UP, + ), + ) + + topic_after = await client.describe_topic(topic_path) + + assert topic_after.auto_partitioning_settings == expected + + expected.up_utilization_percent = 88 + + await client.alter_topic( + topic_path, + alter_auto_partitioning_settings=ydb.TopicAlterAutoPartitioningSettings( + set_up_utilization_percent=88, + ), + ) + + topic_after = await client.describe_topic(topic_path) + + assert topic_after.auto_partitioning_settings == expected + class TestTopicClientControlPlane: def test_create_topic(self, driver_sync, database): diff --git a/ydb/_grpc/grpcwrapper/ydb_topic.py b/ydb/_grpc/grpcwrapper/ydb_topic.py index 570bbb75..4e23d370 100644 --- a/ydb/_grpc/grpcwrapper/ydb_topic.py +++ b/ydb/_grpc/grpcwrapper/ydb_topic.py @@ -1017,7 +1017,7 @@ def from_proto(code: Optional[int]) -> Optional["AutoPartitioningStrategy"]: def to_public(self) -> ydb_topic_public_types.PublicAutoPartitioningStrategy: try: - ydb_topic_public_types.PublicAutoPartitioningStrategy(int(self)) + return ydb_topic_public_types.PublicAutoPartitioningStrategy(int(self)) except KeyError: return ydb_topic_public_types.PublicAutoPartitioningStrategy.UNSPECIFIED @@ -1183,7 +1183,7 @@ def from_proto(code: Optional[int]) -> Optional["MeteringMode"]: def to_public(self) -> ydb_topic_public_types.PublicMeteringMode: try: - ydb_topic_public_types.PublicMeteringMode(int(self)) + return ydb_topic_public_types.PublicMeteringMode(int(self)) except KeyError: return ydb_topic_public_types.PublicMeteringMode.UNSPECIFIED diff --git a/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py b/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py index 58e1a05b..37528a2f 100644 --- a/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py +++ b/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py @@ -82,18 +82,18 @@ class PublicAutoPartitioningStrategy(IntEnum): @dataclass class PublicAutoPartitioningSettings: - strategy: Optional["PublicAutoPartitioningStrategy"] = 0 + strategy: Optional["PublicAutoPartitioningStrategy"] = None stabilization_window: Optional[datetime.timedelta] = None - up_utilization_percent: Optional[int] = None down_utilization_percent: Optional[int] = None + up_utilization_percent: Optional[int] = None @dataclass class PublicAlterAutoPartitioningSettings: - set_strategy: Optional["PublicAutoPartitioningStrategy"] = 0 + set_strategy: Optional["PublicAutoPartitioningStrategy"] = None set_stabilization_window: Optional[datetime.timedelta] = None - set_up_utilization_percent: Optional[int] = None set_down_utilization_percent: Optional[int] = None + set_up_utilization_percent: Optional[int] = None @dataclass diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index b42c1db2..76461959 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -335,6 +335,8 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMess self._started = True self._stream = stream + print(init_message) + stream.write(StreamReadMessage.FromClient(client_message=init_message)) init_response = await stream.receive() # type: StreamReadMessage.FromServer if isinstance(init_response.server_message, StreamReadMessage.InitResponse): @@ -586,6 +588,10 @@ def _on_end_partition_session(self, message: StreamReadMessage.EndPartitionSessi f"End partition session with id: {message.partition_session_id}, child partitions: {message.child_partition_ids}" ) + print( + f"End partition session with id: {message.partition_session_id}, child partitions: {message.child_partition_ids}" + ) + def _on_read_response(self, message: StreamReadMessage.ReadResponse): self._buffer_consume_bytes(message.bytes_size) diff --git a/ydb/topic.py b/ydb/topic.py index 042d5fe9..f0607aca 100644 --- a/ydb/topic.py +++ b/ydb/topic.py @@ -7,6 +7,9 @@ "TopicCodec", "TopicConsumer", "TopicAlterConsumer", + "TopicAlterAutoPartitioningSettings", + "TopicAutoPartitioningSettings", + "TopicAutoPartitioningStrategy", "TopicDescription", "TopicError", "TopicMeteringMode", From 0d5fa7b110f27bf854dd5ce5b80755e96adaabe9 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Wed, 5 Feb 2025 15:33:38 +0300 Subject: [PATCH 06/10] Stop rotation batch queue after split --- ydb/_topic_reader/datatypes.py | 11 +++++++++ ydb/_topic_reader/topic_reader_asyncio.py | 27 +++++++++++++++-------- 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/ydb/_topic_reader/datatypes.py b/ydb/_topic_reader/datatypes.py index a9c811ac..b48501af 100644 --- a/ydb/_topic_reader/datatypes.py +++ b/ydb/_topic_reader/datatypes.py @@ -121,6 +121,16 @@ def close(self): def closed(self): return self.state == PartitionSession.State.Stopped + def end(self): + if self.closed: + return + + self.state = PartitionSession.State.Ended + + @property + def ended(self): + return self.state == PartitionSession.State.Ended + def _ensure_not_closed(self): if self.state == PartitionSession.State.Stopped: raise topic_reader_asyncio.PublicTopicReaderPartitionExpiredError() @@ -129,6 +139,7 @@ class State(enum.Enum): Active = 1 GracefulShutdown = 2 Stopped = 3 + Ended = 4 @dataclass(order=True) class CommitAckWaiter: diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index 76461959..e959ac17 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -335,8 +335,6 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMess self._started = True self._stream = stream - print(init_message) - stream.write(StreamReadMessage.FromClient(client_message=init_message)) init_response = await stream.receive() # type: StreamReadMessage.FromServer if isinstance(init_response.server_message, StreamReadMessage.InitResponse): @@ -390,6 +388,15 @@ def _get_first_batch(self) -> typing.Tuple[int, datatypes.PublicBatch]: partition_session_id, batch = self._message_batches.popitem(last=False) return partition_session_id, batch + def _return_batch_to_queue(self, part_sess_id: int, batch: datatypes.PublicBatch): + self._message_batches[part_sess_id] = batch + + # In case of auto-split we should return all parent messages ASAP + # without queue rotation to prevent child's messages before parent's. + if part_sess_id in self._partition_sessions and self._partition_sessions[part_sess_id].ended: + print(f"part_sess_id: {part_sess_id} is ended, return to beginning of queue") + self._message_batches.move_to_end(part_sess_id, last=False) + def receive_batch_nowait(self, max_messages: Optional[int] = None): if self._get_first_error(): raise self._get_first_error() @@ -405,7 +412,8 @@ def receive_batch_nowait(self, max_messages: Optional[int] = None): cutted_batch = batch._pop_batch(message_count=max_messages) - self._message_batches[part_sess_id] = batch + self._return_batch_to_queue(part_sess_id, batch) + self._buffer_release_bytes(cutted_batch._bytes_size) return cutted_batch @@ -425,7 +433,7 @@ def receive_message_nowait(self): self._buffer_release_bytes(batch._bytes_size) else: # TODO: we should somehow release bytes from single message as well - self._message_batches[part_sess_id] = batch + self._return_batch_to_queue(part_sess_id, batch) return message @@ -584,13 +592,14 @@ def _on_partition_session_stop(self, message: StreamReadMessage.StopPartitionSes ) def _on_end_partition_session(self, message: StreamReadMessage.EndPartitionSession): - logger.info( - f"End partition session with id: {message.partition_session_id}, child partitions: {message.child_partition_ids}" + logger.debug( + f"End partition session with id: {message.partition_session_id}, " + f"child partitions: {message.child_partition_ids}" ) - print( - f"End partition session with id: {message.partition_session_id}, child partitions: {message.child_partition_ids}" - ) + if message.partition_session_id in self._partition_sessions: + # Mark partition session as ended not to shuffle messages. + self._partition_sessions[message.partition_session_id].end() def _on_read_response(self, message: StreamReadMessage.ReadResponse): self._buffer_consume_bytes(message.bytes_size) From 0f20070a302d6282d56ea170aaa2779a8d4e2c0f Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Thu, 6 Feb 2025 13:32:11 +0300 Subject: [PATCH 07/10] test autosplit message order --- ydb/_topic_reader/topic_reader_asyncio.py | 1 - .../topic_reader_asyncio_test.py | 112 +++++++++++++++++- 2 files changed, 109 insertions(+), 4 deletions(-) diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index e959ac17..7061b4e4 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -394,7 +394,6 @@ def _return_batch_to_queue(self, part_sess_id: int, batch: datatypes.PublicBatch # In case of auto-split we should return all parent messages ASAP # without queue rotation to prevent child's messages before parent's. if part_sess_id in self._partition_sessions and self._partition_sessions[part_sess_id].ended: - print(f"part_sess_id: {part_sess_id} is ended, return to beginning of queue") self._message_batches.move_to_end(part_sess_id, last=False) def receive_batch_nowait(self, max_messages: Optional[int] = None): diff --git a/ydb/_topic_reader/topic_reader_asyncio_test.py b/ydb/_topic_reader/topic_reader_asyncio_test.py index 25e08029..c598d1ce 100644 --- a/ydb/_topic_reader/topic_reader_asyncio_test.py +++ b/ydb/_topic_reader/topic_reader_asyncio_test.py @@ -52,8 +52,8 @@ def default_executor(): executor.shutdown() -def stub_partition_session(id: int = 0): - return datatypes.PartitionSession( +def stub_partition_session(id: int = 0, ended: bool = False): + partition_session = datatypes.PartitionSession( id=id, state=datatypes.PartitionSession.State.Active, topic_path="asd", @@ -63,6 +63,11 @@ def stub_partition_session(id: int = 0): reader_stream_id=513, ) + if ended: + partition_session.end() + + return partition_session + def stub_message(id: int): return PublicMessage( @@ -73,7 +78,7 @@ def stub_message(id: int): offset=0, written_at=datetime.datetime(2023, 3, 18, 14, 15), producer_id="", - data=bytes(), + data=id, metadata_items={}, _partition_session=stub_partition_session(), _commit_start_offset=0, @@ -746,6 +751,31 @@ def session_count(): with pytest.raises(asyncio.QueueEmpty): stream.from_client.get_nowait() + async def test_end_partition_session(self, stream, stream_reader, partition_session): + def session_count(): + return len(stream_reader._partition_sessions) + + initial_session_count = session_count() + + stream.from_server.put_nowait( + StreamReadMessage.FromServer( + server_status=ServerStatus(ydb_status_codes_pb2.StatusIds.SUCCESS, []), + server_message=StreamReadMessage.EndPartitionSession( + partition_session_id=partition_session.id, + adjacent_partition_ids=[], + child_partition_ids=[20, 30], + ), + ) + ) + + await asyncio.sleep(0) # wait next loop + with pytest.raises(asyncio.QueueEmpty): + stream.from_client.get_nowait() + + assert session_count() == initial_session_count + assert partition_session.id in stream_reader._partition_sessions + assert partition_session.ended + @pytest.mark.parametrize( "graceful", ( @@ -1168,6 +1198,82 @@ async def test_read_message( assert mess == expected_message assert dict(stream_reader._message_batches) == batches_after + @pytest.mark.parametrize( + "batches,expected_order", + [ + ( + { + 0: PublicBatch( + messages=[stub_message(1)], + _partition_session=stub_partition_session(0, ended=True), + _bytes_size=0, + _codec=Codec.CODEC_RAW, + ) + }, + [1], + ), + ( + { + 0: PublicBatch( + messages=[stub_message(1), stub_message(2)], + _partition_session=stub_partition_session(0, ended=True), + _bytes_size=0, + _codec=Codec.CODEC_RAW, + ), + 1: PublicBatch( + messages=[stub_message(3), stub_message(4)], + _partition_session=stub_partition_session(1), + _bytes_size=0, + _codec=Codec.CODEC_RAW, + ), + }, + [1, 2, 3, 4], + ), + ( + { + 0: PublicBatch( + messages=[stub_message(1), stub_message(2)], + _partition_session=stub_partition_session(0), + _bytes_size=0, + _codec=Codec.CODEC_RAW, + ), + 1: PublicBatch( + messages=[stub_message(3), stub_message(4)], + _partition_session=stub_partition_session(1, ended=True), + _bytes_size=0, + _codec=Codec.CODEC_RAW, + ), + 2: PublicBatch( + messages=[stub_message(5)], + _partition_session=stub_partition_session(2), + _bytes_size=0, + _codec=Codec.CODEC_RAW, + ), + }, + [1, 3, 4, 5, 2], + ), + ], + ) + async def test_read_message_autosplit_order( + self, + stream_reader, + batches: typing.Dict[int, datatypes.PublicBatch], + expected_order: typing.List[int], + ): + stream_reader._message_batches = OrderedDict(batches) + + for id, batch in batches.items(): + ps = batch._partition_session + stream_reader._partition_sessions[id] = ps + + result = [] + for _ in range(len(expected_order)): + mess = stream_reader.receive_message_nowait() + result.append(mess.data) + + assert result == expected_order + assert stream_reader.receive_message_nowait() is None + @pytest.mark.parametrize( "batches_before,max_messages,actual_messages,batches_after", [ From 818acdddcd3bdf0bd603c32a13929a546e7044a0 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Thu, 6 Feb 2025 18:22:19 +0300 Subject: [PATCH 08/10] remove default values from proto DTO --- ydb/_grpc/grpcwrapper/ydb_topic.py | 2 +- ydb/_topic_reader/topic_reader_asyncio_test.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/ydb/_grpc/grpcwrapper/ydb_topic.py b/ydb/_grpc/grpcwrapper/ydb_topic.py index 4e23d370..600dfb69 100644 --- a/ydb/_grpc/grpcwrapper/ydb_topic.py +++ b/ydb/_grpc/grpcwrapper/ydb_topic.py @@ -419,7 +419,7 @@ def from_proto( class InitRequest(IToProto): topics_read_settings: List["StreamReadMessage.InitRequest.TopicReadSettings"] consumer: str - auto_partitioning_support: bool = False + auto_partitioning_support: bool def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.InitRequest: res = ydb_topic_pb2.StreamReadMessage.InitRequest() diff --git a/ydb/_topic_reader/topic_reader_asyncio_test.py b/ydb/_topic_reader/topic_reader_asyncio_test.py index c598d1ce..7ad5077c 100644 --- a/ydb/_topic_reader/topic_reader_asyncio_test.py +++ b/ydb/_topic_reader/topic_reader_asyncio_test.py @@ -610,6 +610,7 @@ async def test_init_reader(self, stream, default_reader_settings): read_from=None, ) ], + auto_partitioning_support=False, ) start_task = asyncio.create_task(reader._start(stream, init_message)) From f5099d337c8704c29721d4763f94d842f532bcdb Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Thu, 6 Feb 2025 18:29:43 +0300 Subject: [PATCH 09/10] make auto_partitioning_support enabled by default --- ydb/_topic_reader/topic_reader.py | 2 +- ydb/topic.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ydb/_topic_reader/topic_reader.py b/ydb/_topic_reader/topic_reader.py index 699e2417..8bc12cc0 100644 --- a/ydb/_topic_reader/topic_reader.py +++ b/ydb/_topic_reader/topic_reader.py @@ -45,7 +45,7 @@ class PublicReaderSettings: consumer: str topic: TopicSelectorTypes buffer_size_bytes: int = 50 * 1024 * 1024 - auto_partitioning_support: bool = False + auto_partitioning_support: bool = True decoders: Union[Mapping[int, Callable[[bytes], bytes]], None] = None """decoders: map[codec_code] func(encoded_bytes)->decoded_bytes""" diff --git a/ydb/topic.py b/ydb/topic.py index f0607aca..55f4ea04 100644 --- a/ydb/topic.py +++ b/ydb/topic.py @@ -236,7 +236,7 @@ def reader( # custom decoder executor for call builtin and custom decoders. If None - use shared executor pool. # if max_worker in the executor is 1 - then decoders will be called from the thread without parallel decoder_executor: Optional[concurrent.futures.Executor] = None, - auto_partitioning_support: Optional[bool] = False, # Auto partitioning feature flag. Default - False. + auto_partitioning_support: Optional[bool] = True, # Auto partitioning feature flag. Default - True. ) -> TopicReaderAsyncIO: if not decoder_executor: @@ -446,7 +446,7 @@ def reader( # custom decoder executor for call builtin and custom decoders. If None - use shared executor pool. # if max_worker in the executor is 1 - then decoders will be called from the thread without parallel decoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool - auto_partitioning_support: Optional[bool] = False, # Auto partitioning feature flag. Default - False. + auto_partitioning_support: Optional[bool] = True, # Auto partitioning feature flag. Default - True. ) -> TopicReader: if not decoder_executor: decoder_executor = self._executor From 3da64a076c7fbdea86e4f1b9267d88c4b885cf33 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Wed, 12 Feb 2025 16:06:10 +0300 Subject: [PATCH 10/10] add autosplit example --- examples/topic/autosplit_example.py | 89 +++++++++++++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 examples/topic/autosplit_example.py diff --git a/examples/topic/autosplit_example.py b/examples/topic/autosplit_example.py new file mode 100644 index 00000000..1847c395 --- /dev/null +++ b/examples/topic/autosplit_example.py @@ -0,0 +1,89 @@ +import argparse +import asyncio +import datetime +import logging + +import ydb + +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) + + +async def connect(endpoint: str, database: str) -> ydb.aio.Driver: + config = ydb.DriverConfig(endpoint=endpoint, database=database) + config.credentials = ydb.credentials_from_env_variables() + driver = ydb.aio.Driver(config) + await driver.wait(5, fail_fast=True) + return driver + + +async def recreate_topic(driver: ydb.aio.Driver, topic: str, consumer: str): + try: + await driver.topic_client.drop_topic(topic) + except ydb.SchemeError: + pass + + await driver.topic_client.create_topic( + topic, + consumers=[consumer], + max_active_partitions=100, + auto_partitioning_settings=ydb.TopicAutoPartitioningSettings( + strategy=ydb.TopicAutoPartitioningStrategy.SCALE_UP, + up_utilization_percent=1, + down_utilization_percent=1, + stabilization_window=datetime.timedelta(seconds=1), + ), + ) + + +async def write_messages(driver: ydb.aio.Driver, topic: str, id: int = 0): + async with driver.topic_client.writer(topic) as writer: + for i in range(100): + mess = ydb.TopicWriterMessage(data=f"[{id}] mess-{i}", metadata_items={"index": f"{i}"}) + await writer.write(mess) + await asyncio.sleep(0.01) + + +async def read_messages(driver: ydb.aio.Driver, topic: str, consumer: str): + async with driver.topic_client.reader(topic, consumer, auto_partitioning_support=True) as reader: + count = 0 + while True: + try: + mess = await asyncio.wait_for(reader.receive_message(), 5) + count += 1 + print(mess.data.decode()) + reader.commit(mess) + except asyncio.TimeoutError: + assert count == 200 + return + + +async def main(): + parser = argparse.ArgumentParser( + formatter_class=argparse.RawDescriptionHelpFormatter, + description="""YDB topic basic example.\n""", + ) + parser.add_argument("-d", "--database", default="/local", help="Name of the database to use") + parser.add_argument("-e", "--endpoint", default="grpc://localhost:2136", help="Endpoint url to use") + parser.add_argument("-p", "--path", default="test-topic", help="Topic name") + parser.add_argument("-c", "--consumer", default="consumer", help="Consumer name") + parser.add_argument("-v", "--verbose", default=True, action="store_true") + + args = parser.parse_args() + + if args.verbose: + logger.addHandler(logging.StreamHandler()) + + driver = await connect(args.endpoint, args.database) + + await recreate_topic(driver, args.path, args.consumer) + + await asyncio.gather( + write_messages(driver, args.path, 0), + write_messages(driver, args.path, 1), + read_messages(driver, args.path, args.consumer), + ) + + +if __name__ == "__main__": + asyncio.run(main())