From a4588fb5002cf49ce98a3d0740b7ef55a680251e Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Thu, 27 Jun 2024 18:35:39 +0300 Subject: [PATCH 1/3] Alter topic feature --- tests/topics/test_control_plane.py | 16 +++ ydb/_apis.py | 1 + ydb/_grpc/grpcwrapper/ydb_topic.py | 129 ++++++++++++++++++ .../grpcwrapper/ydb_topic_public_types.py | 39 ++++++ ydb/topic.py | 98 +++++++++++++ 5 files changed, 283 insertions(+) diff --git a/tests/topics/test_control_plane.py b/tests/topics/test_control_plane.py index 2446ddcf..384c9249 100644 --- a/tests/topics/test_control_plane.py +++ b/tests/topics/test_control_plane.py @@ -39,6 +39,14 @@ async def test_describe_topic(self, driver, topic_path: str, topic_consumer): assert has_consumer + async def test_alter_topic(self, driver, topic_path): + client = driver.topic_client + + await client.alter_topic(topic_path) + + with pytest.raises(issues.SchemeError): + await client.alter_topic(topic_path + "-not-exist") + class TestTopicClientControlPlane: def test_create_topic(self, driver_sync, database): @@ -72,3 +80,11 @@ def test_describe_topic(self, driver_sync, topic_path: str, topic_consumer): break assert has_consumer + + def test_alter_topic(self, driver_sync, topic_path): + client = driver_sync.topic_client + + client.alter_topic(topic_path) + + with pytest.raises(issues.SchemeError): + client.alter_topic(topic_path + "-not-exist") diff --git a/ydb/_apis.py b/ydb/_apis.py index 9a241e5c..bd455838 100644 --- a/ydb/_apis.py +++ b/ydb/_apis.py @@ -123,6 +123,7 @@ class TopicService(object): CreateTopic = "CreateTopic" DescribeTopic = "DescribeTopic" + AlterTopic = "AlterTopic" DropTopic = "DropTopic" StreamRead = "StreamRead" StreamWrite = "StreamWrite" diff --git a/ydb/_grpc/grpcwrapper/ydb_topic.py b/ydb/_grpc/grpcwrapper/ydb_topic.py index 5b5e294a..12c6b4cd 100644 --- a/ydb/_grpc/grpcwrapper/ydb_topic.py +++ b/ydb/_grpc/grpcwrapper/ydb_topic.py @@ -883,6 +883,39 @@ def from_proto( ) +@dataclass +class AlterConsumer(IToProto, IFromPublic): + name: str + set_important: bool + set_read_from: datetime.datetime + set_supported_codecs: SupportedCodecs + alter_attributes: Dict[str, str] + + def to_proto(self) -> ydb_topic_pb2.AlterConsumer: + return ydb_topic_pb2.AlterConsumer( + name=self.name, + set_important=self.set_important, + set_read_from=proto_timestamp_from_datetime(self.set_read_from), + set_supported_codecs=self.set_supported_codecs.to_proto(), + alter_attributes=self.alter_attributes, + ) + + @staticmethod + def from_public(alter_consumer: ydb_topic_public_types.PublicAlterConsumer) -> AlterConsumer: + if not alter_consumer: + return None + + supported_codecs = alter_consumer.set_supported_codecs if alter_consumer.set_supported_codecs else [] + + return AlterConsumer( + name=alter_consumer.name, + set_important=alter_consumer.set_important, + set_read_from=alter_consumer.set_read_from, + set_supported_codecs=SupportedCodecs(codecs=supported_codecs), + alter_attributes=alter_consumer.alter_attributes, + ) + + @dataclass class PartitioningSettings(IToProto, IFromProto): min_active_partitions: int @@ -902,6 +935,25 @@ def to_proto(self) -> ydb_topic_pb2.PartitioningSettings: ) +@dataclass +class AlterPartitioningSettings(IToProto, IFromProto): + set_min_active_partitions: int + set_partition_count_limit: int + + @staticmethod + def from_proto(msg: ydb_topic_pb2.AlterPartitioningSettings) -> "AlterPartitioningSettings": + return AlterPartitioningSettings( + set_min_active_partitions=msg.set_min_active_partitions, + set_partition_count_limit=msg.set_partition_count_limit, + ) + + def to_proto(self) -> ydb_topic_pb2.AlterPartitioningSettings: + return ydb_topic_pb2.AlterPartitioningSettings( + set_min_active_partitions=self.set_min_active_partitions, + set_partition_count_limit=self.set_partition_count_limit, + ) + + class MeteringMode(int, IFromProto, IFromPublic, IToPublic): UNSPECIFIED = 0 RESERVED_CAPACITY = 1 @@ -995,6 +1047,83 @@ class CreateTopicResult: pass +@dataclass +class AlterTopicRequest(IToProto, IFromPublic): + path: str + add_consumers: List["Consumer"] + alter_partitioning_settings: AlterPartitioningSettings + set_retention_period: datetime.timedelta + set_retention_storage_mb: int + set_supported_codecs: SupportedCodecs + set_partition_write_burst_bytes: typing.Optional[int] + set_partition_write_speed_bytes_per_second: typing.Optional[int] + alter_attributes: Dict[str, str] + alter_consumers: List[AlterConsumer] + drop_consumers: List[str] + set_metering_mode: "MeteringMode" + + def to_proto(self) -> ydb_topic_pb2.AlterTopicRequest: + return ydb_topic_pb2.AlterTopicRequest( + path=self.path, + add_consumers=[consumer.to_proto() for consumer in self.add_consumers], + alter_partitioning_settings=self.alter_partitioning_settings.to_proto(), + set_retention_period=proto_duration_from_timedelta(self.set_retention_period), + set_retention_storage_mb=self.set_retention_storage_mb, + set_supported_codecs=self.set_supported_codecs.to_proto(), + set_partition_write_burst_bytes=self.set_partition_write_burst_bytes, + set_partition_write_speed_bytes_per_second=self.set_partition_write_speed_bytes_per_second, + alter_attributes=self.alter_attributes, + alter_consumers=[consumer.to_proto() for consumer in self.alter_consumers], + drop_consumers=list(self.drop_consumers), + set_metering_mode=self.set_metering_mode, + ) + + @staticmethod + def from_public(req: ydb_topic_public_types.AlterTopicRequestParams) -> AlterTopicRequest: + add_consumers = [] + if req.add_consumers: + for consumer in req.add_consumers: + if isinstance(consumer, str): + consumer = ydb_topic_public_types.PublicConsumer(name=consumer) + add_consumers.append(Consumer.from_public(consumer)) + + alter_consumers = [] + if req.alter_consumers: + for consumer in req.alter_consumers: + if isinstance(consumer, str): + consumer = ydb_topic_public_types.PublicAlterConsumer(name=consumer) + alter_consumers.append(AlterConsumer.from_public(consumer)) + + drop_consumers = req.drop_consumers if req.drop_consumers else [] + + supported_codecs = req.set_supported_codecs if req.set_supported_codecs else [] + + return AlterTopicRequest( + path=req.path, + alter_partitioning_settings=AlterPartitioningSettings( + set_min_active_partitions=req.set_min_active_partitions, + set_partition_count_limit=req.set_partition_count_limit, + ), + add_consumers=add_consumers, + set_retention_period=req.set_retention_period, + set_retention_storage_mb=req.set_retention_storage_mb, + set_supported_codecs=SupportedCodecs( + codecs=supported_codecs, + ), + set_partition_write_burst_bytes=req.set_partition_write_burst_bytes, + set_partition_write_speed_bytes_per_second=req.set_partition_write_speed_bytes_per_second, + alter_attributes=req.alter_attributes, + alter_consumers=alter_consumers, + drop_consumers=drop_consumers, + set_metering_mode=MeteringMode.from_public(req.set_metering_mode), + ) + + +@dataclass +class AlterTopicResult: + pass + + @dataclass class DescribeTopicRequest: path: str diff --git a/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py b/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py index df280a8b..06ec8aaa 100644 --- a/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py +++ b/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py @@ -30,6 +30,23 @@ class CreateTopicRequestParams: metering_mode: Optional["PublicMeteringMode"] +@dataclass +class AlterTopicRequestParams: + path: str + set_min_active_partitions: Optional[int] + set_partition_count_limit: Optional[int] + add_consumers: Optional[List[Union["PublicConsumer", str]]] + alter_consumers: Optional[List[Union["PublicAlterConsumer", str]]] + drop_consumers: Optional[List[str]] # TODO: clarify + alter_attributes: Optional[Dict[str, str]] + set_metering_mode: Optional["PublicMeteringMode"] + set_partition_write_speed_bytes_per_second: Optional[int] + set_partition_write_burst_bytes: Optional[int] + set_retention_period: Optional[datetime.timedelta] + set_retention_storage_mb: Optional[int] + set_supported_codecs: Optional[List[Union["PublicCodec", int]]] + + class PublicCodec(int): """ Codec value may contain any int number. @@ -73,6 +90,28 @@ class PublicConsumer: "Attributes of consumer" +@dataclass +class PublicAlterConsumer: + name: str + set_important: bool = False + """ + Consumer may be marked as 'important'. It means messages for this consumer will never expire due to retention. + User should take care that such consumer never stalls, to prevent running out of disk space. + """ + + set_read_from: Optional[datetime.datetime] = None + "All messages with smaller server written_at timestamp will be skipped." + + set_supported_codecs: List[PublicCodec] = field(default_factory=lambda: list()) + """ + List of supported codecs by this consumer. + supported_codecs on topic must be contained inside this list. + """ + + alter_attributes: Dict[str, str] = field(default_factory=lambda: dict()) + "Attributes of consumer" + + @dataclass class DropTopicRequestParams(IToProto): path: str diff --git a/ydb/topic.py b/ydb/topic.py index 948bcff4..f0b872e2 100644 --- a/ydb/topic.py +++ b/ydb/topic.py @@ -6,6 +6,7 @@ "TopicClientSettings", "TopicCodec", "TopicConsumer", + "TopicAlterConsumer", "TopicDescription", "TopicError", "TopicMeteringMode", @@ -77,6 +78,7 @@ PublicPartitionStats as TopicPartitionStats, PublicCodec as TopicCodec, PublicConsumer as TopicConsumer, + PublicAlterConsumer as TopicAlterConsumer, PublicMeteringMode as TopicMeteringMode, ) @@ -145,6 +147,53 @@ async def create_topic( _wrap_operation, ) + async def alter_topic( + self, + path: str, + set_min_active_partitions: Optional[int] = None, + set_partition_count_limit: Optional[int] = None, + add_consumers: Optional[List[Union[TopicConsumer, str]]] = None, + alter_consumers: Optional[List[Union[TopicAlterConsumer, str]]] = None, + drop_consumers: Optional[List[str]] = None, + alter_attributes: Optional[Dict[str, str]] = None, + set_metering_mode: Optional[TopicMeteringMode] = None, + set_partition_write_speed_bytes_per_second: Optional[int] = None, + set_partition_write_burst_bytes: Optional[int] = None, + set_retention_period: Optional[datetime.timedelta] = None, + set_retention_storage_mb: Optional[int] = None, + set_supported_codecs: Optional[List[Union[TopicCodec, int]]] = None, + ): + """ + alter topic command + + :param path: full path to topic + :param set_min_active_partitions: Minimum partition count auto merge would stop working at. + :param set_partition_count_limit: Limit for total partition count, including active (open for write) + and read-only partitions. + :param add_consumers: List of consumers for this topic to add + :param alter_consumers: List of consumers for this topic to alter + :param drop_consumers: List of consumer names for this topic to drop + :param alter_attributes: User and server attributes of topic. + Server attributes starts from "_" and will be validated by server. + :param set_metering_mode: Metering mode for the topic in a serverless database + :param set_partition_write_speed_bytes_per_second: Partition write speed in bytes per second + :param set_partition_write_burst_bytes: Burst size for write in partition, in bytes + :param set_retention_period: How long data in partition should be stored + :param set_retention_storage_mb: How much data in partition should be stored + :param set_supported_codecs: List of allowed codecs for writers. Writes with codec not from this list are forbidden. + Empty list mean disable codec compatibility checks for the topic. + """ + args = locals().copy() + del args["self"] + req = _ydb_topic_public_types.AlterTopicRequestParams(**args) + req = _ydb_topic.AlterTopicRequest.from_public(req) + await self._driver( + req.to_proto(), + _apis.TopicService.Stub, + _apis.TopicService.AlterTopic, + _wrap_operation, + ) + async def describe_topic(self, path: str, include_stats: bool = False) -> TopicDescription: args = locals().copy() del args["self"] @@ -297,6 +346,55 @@ def create_topic( _wrap_operation, ) + def alter_topic( + self, + path: str, + set_min_active_partitions: Optional[int] = None, + set_partition_count_limit: Optional[int] = None, + add_consumers: Optional[List[Union[TopicConsumer, str]]] = None, + alter_consumers: Optional[List[Union[TopicAlterConsumer, str]]] = None, + drop_consumers: Optional[List[str]] = None, + alter_attributes: Optional[Dict[str, str]] = None, + set_metering_mode: Optional[TopicMeteringMode] = None, + set_partition_write_speed_bytes_per_second: Optional[int] = None, + set_partition_write_burst_bytes: Optional[int] = None, + set_retention_period: Optional[datetime.timedelta] = None, + set_retention_storage_mb: Optional[int] = None, + set_supported_codecs: Optional[List[Union[TopicCodec, int]]] = None, + ): + """ + alter topic command + + :param path: full path to topic + :param set_min_active_partitions: Minimum partition count auto merge would stop working at. + :param set_partition_count_limit: Limit for total partition count, including active (open for write) + and read-only partitions. + :param add_consumers: List of consumers for this topic to add + :param alter_consumers: List of consumers for this topic to alter + :param drop_consumers: List of consumer names for this topic to drop + :param alter_attributes: User and server attributes of topic. + Server attributes starts from "_" and will be validated by server. + :param set_metering_mode: Metering mode for the topic in a serverless database + :param set_partition_write_speed_bytes_per_second: Partition write speed in bytes per second + :param set_partition_write_burst_bytes: Burst size for write in partition, in bytes + :param set_retention_period: How long data in partition should be stored + :param set_retention_storage_mb: How much data in partition should be stored + :param set_supported_codecs: List of allowed codecs for writers. Writes with codec not from this list are forbidden. + Empty list mean disable codec compatibility checks for the topic. + """ + args = locals().copy() + del args["self"] + self._check_closed() + + req = _ydb_topic_public_types.AlterTopicRequestParams(**args) + req = _ydb_topic.AlterTopicRequest.from_public(req) + self._driver( + req.to_proto(), + _apis.TopicService.Stub, + _apis.TopicService.AlterTopic, + _wrap_operation, + ) + def describe_topic(self, path: str, include_stats: bool = False) -> TopicDescription: args = locals().copy() del args["self"] From badb27a4fb90e62bc7f82de828fac3211b2043ee Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Sat, 29 Jun 2024 23:06:26 +0300 Subject: [PATCH 2/3] tests --- tests/topics/test_control_plane.py | 30 +++++++-- ydb/_grpc/grpcwrapper/ydb_topic.py | 4 +- .../grpcwrapper/ydb_topic_public_types.py | 2 +- ydb/_grpc/grpcwrapper/ydb_topic_test.py | 67 +++++++++++++++++++ 4 files changed, 95 insertions(+), 8 deletions(-) diff --git a/tests/topics/test_control_plane.py b/tests/topics/test_control_plane.py index 384c9249..25258b61 100644 --- a/tests/topics/test_control_plane.py +++ b/tests/topics/test_control_plane.py @@ -39,14 +39,23 @@ async def test_describe_topic(self, driver, topic_path: str, topic_consumer): assert has_consumer - async def test_alter_topic(self, driver, topic_path): + async def test_alter_not_existed_topic(self, driver, topic_path): client = driver.topic_client - await client.alter_topic(topic_path) - with pytest.raises(issues.SchemeError): await client.alter_topic(topic_path + "-not-exist") + async def test_alter_existed_topic(self, driver, topic_path): + client = driver.topic_client + + topic_before = await client.describe_topic(topic_path) + + target_min_active_partitions = topic_before.min_active_partitions + 1 + await client.alter_topic(topic_path, set_min_active_partitions=target_min_active_partitions) + + topic_after = await client.describe_topic(topic_path) + assert topic_after.min_active_partitions == target_min_active_partitions + class TestTopicClientControlPlane: def test_create_topic(self, driver_sync, database): @@ -81,10 +90,19 @@ def test_describe_topic(self, driver_sync, topic_path: str, topic_consumer): assert has_consumer - def test_alter_topic(self, driver_sync, topic_path): + def test_alter_not_existed_topic(self, driver_sync, topic_path): client = driver_sync.topic_client - client.alter_topic(topic_path) - with pytest.raises(issues.SchemeError): client.alter_topic(topic_path + "-not-exist") + + def test_alter_existed_topic(self, driver_sync, topic_path): + client = driver_sync.topic_client + + topic_before = client.describe_topic(topic_path) + + target_min_active_partitions = topic_before.min_active_partitions + 1 + client.alter_topic(topic_path, set_min_active_partitions=target_min_active_partitions) + + topic_after = client.describe_topic(topic_path) + assert topic_after.min_active_partitions == target_min_active_partitions diff --git a/ydb/_grpc/grpcwrapper/ydb_topic.py b/ydb/_grpc/grpcwrapper/ydb_topic.py index 12c6b4cd..d803e048 100644 --- a/ydb/_grpc/grpcwrapper/ydb_topic.py +++ b/ydb/_grpc/grpcwrapper/ydb_topic.py @@ -1063,13 +1063,15 @@ class AlterTopicRequest(IToProto, IFromPublic): set_metering_mode: "MeteringMode" def to_proto(self) -> ydb_topic_pb2.AlterTopicRequest: + supported_codecs = self.set_supported_codecs.to_proto() if self.set_supported_codecs.codecs else None + return ydb_topic_pb2.AlterTopicRequest( path=self.path, add_consumers=[consumer.to_proto() for consumer in self.add_consumers], alter_partitioning_settings=self.alter_partitioning_settings.to_proto(), set_retention_period=proto_duration_from_timedelta(self.set_retention_period), set_retention_storage_mb=self.set_retention_storage_mb, - set_supported_codecs=self.set_supported_codecs.to_proto(), + set_supported_codecs=supported_codecs, set_partition_write_burst_bytes=self.set_partition_write_burst_bytes, set_partition_write_speed_bytes_per_second=self.set_partition_write_speed_bytes_per_second, alter_attributes=self.alter_attributes, diff --git a/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py b/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py index 06ec8aaa..e829c4ee 100644 --- a/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py +++ b/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py @@ -37,7 +37,7 @@ class AlterTopicRequestParams: set_partition_count_limit: Optional[int] add_consumers: Optional[List[Union["PublicConsumer", str]]] alter_consumers: Optional[List[Union["PublicAlterConsumer", str]]] - drop_consumers: Optional[List[str]] # TODO: clarify + drop_consumers: Optional[List[str]] alter_attributes: Optional[Dict[str, str]] set_metering_mode: Optional["PublicMeteringMode"] set_partition_write_speed_bytes_per_second: Optional[int] diff --git a/ydb/_grpc/grpcwrapper/ydb_topic_test.py b/ydb/_grpc/grpcwrapper/ydb_topic_test.py index ff29834a..ab888eb0 100644 --- a/ydb/_grpc/grpcwrapper/ydb_topic_test.py +++ b/ydb/_grpc/grpcwrapper/ydb_topic_test.py @@ -1,4 +1,15 @@ +import datetime + +from google.protobuf.json_format import MessageToDict + from ydb._grpc.grpcwrapper.ydb_topic import OffsetsRange +from .ydb_topic import AlterTopicRequest +from .ydb_topic_public_types import ( + AlterTopicRequestParams, + PublicAlterConsumer, + PublicConsumer, + PublicCodec, +) def test_offsets_range_intersected(): @@ -17,3 +28,59 @@ def test_offsets_range_intersected(): ]: assert OffsetsRange(test[0], test[1]).is_intersected_with(OffsetsRange(test[2], test[3])) assert OffsetsRange(test[2], test[3]).is_intersected_with(OffsetsRange(test[0], test[1])) + + +def test_alter_topic_request_from_public_to_proto(): + # Specify all fields with all possible input ways + params = { + "path": "topic_name", + "add_consumers": [ + "new_consumer_1", + PublicConsumer("new_consumer_2"), + ], + "alter_consumers": [ + "old_consumer_1", + PublicAlterConsumer("old_consumer_2"), + ], + "drop_consumers": ["redundant_consumer"], + "set_retention_period": datetime.timedelta(weeks=4), + "set_retention_storage_mb": 4, + "set_supported_codecs": [1, PublicCodec(2)], + "set_partition_write_burst_bytes": 8, + "set_partition_write_speed_bytes_per_second": 15, + "alter_attributes": {"key": "value"}, + "set_metering_mode": 1, + "set_min_active_partitions": 2, + "set_partition_count_limit": 4, + } + + params_public = AlterTopicRequestParams(**params) + request = AlterTopicRequest.from_public(params_public) + request_proto = request.to_proto() + + msg_dict = MessageToDict(request_proto, preserving_proto_field_name=True) + + assert msg_dict["path"] == params["path"] + assert len(msg_dict["add_consumers"]) == len(params["add_consumers"]) + assert len(msg_dict["alter_consumers"]) == len(params["alter_consumers"]) + assert len(msg_dict["drop_consumers"]) == len(params["drop_consumers"]) + assert msg_dict["alter_attributes"] == params["alter_attributes"] + + assert ( + int(msg_dict["alter_partitioning_settings"]["set_min_active_partitions"]) == params["set_min_active_partitions"] + ) + assert ( + int(msg_dict["alter_partitioning_settings"]["set_partition_count_limit"]) == params["set_partition_count_limit"] + ) + + assert int(msg_dict["set_partition_write_burst_bytes"]) == params["set_partition_write_burst_bytes"] + assert ( + int(msg_dict["set_partition_write_speed_bytes_per_second"]) + == params["set_partition_write_speed_bytes_per_second"] + ) + assert msg_dict["set_retention_period"] == str(int(params["set_retention_period"].total_seconds())) + "s" + assert int(msg_dict["set_retention_storage_mb"]) == params["set_retention_storage_mb"] + + assert msg_dict["set_metering_mode"] == "METERING_MODE_RESERVED_CAPACITY" + + assert msg_dict["set_supported_codecs"]["codecs"] == params["set_supported_codecs"] From e8e1272061257366748139046a3c7138227a9060 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Tue, 2 Jul 2024 19:21:54 +0300 Subject: [PATCH 3/3] fix review comments --- ydb/_grpc/grpcwrapper/ydb_topic.py | 85 ++++++++++--------- .../grpcwrapper/ydb_topic_public_types.py | 6 +- ydb/_grpc/grpcwrapper/ydb_topic_test.py | 44 +++++----- 3 files changed, 67 insertions(+), 68 deletions(-) diff --git a/ydb/_grpc/grpcwrapper/ydb_topic.py b/ydb/_grpc/grpcwrapper/ydb_topic.py index d803e048..c1789b6c 100644 --- a/ydb/_grpc/grpcwrapper/ydb_topic.py +++ b/ydb/_grpc/grpcwrapper/ydb_topic.py @@ -33,7 +33,7 @@ ) -class Codec(int, IToPublic): +class Codec(int, IToPublic, IFromPublic): CODEC_UNSPECIFIED = 0 CODEC_RAW = 1 CODEC_GZIP = 2 @@ -47,9 +47,13 @@ def from_proto_iterable(codecs: typing.Iterable[int]) -> List["Codec"]: def to_public(self) -> ydb_topic_public_types.PublicCodec: return ydb_topic_public_types.PublicCodec(int(self)) + @staticmethod + def from_public(codec: Union[ydb_topic_public_types.PublicCodec, int]) -> "Codec": + return Codec(int(codec)) + @dataclass -class SupportedCodecs(IToProto, IFromProto, IToPublic): +class SupportedCodecs(IToProto, IFromProto, IToPublic, IFromPublic): codecs: List[Codec] def to_proto(self) -> ydb_topic_pb2.SupportedCodecs: @@ -69,6 +73,15 @@ def from_proto(msg: Optional[ydb_topic_pb2.SupportedCodecs]) -> "SupportedCodecs def to_public(self) -> List[ydb_topic_public_types.PublicCodec]: return list(map(Codec.to_public, self.codecs)) + @staticmethod + def from_public( + codecs: Optional[List[Union[ydb_topic_public_types.PublicCodec, int]]] + ) -> Optional["SupportedCodecs"]: + if codecs is None: + return None + + return SupportedCodecs(codecs=[Codec.from_public(codec) for codec in codecs]) + @dataclass(order=True) class OffsetsRange(IFromProto, IToProto): @@ -886,17 +899,21 @@ def from_proto( @dataclass class AlterConsumer(IToProto, IFromPublic): name: str - set_important: bool - set_read_from: datetime.datetime - set_supported_codecs: SupportedCodecs - alter_attributes: Dict[str, str] + set_important: Optional[bool] + set_read_from: Optional[datetime.datetime] + set_supported_codecs: Optional[SupportedCodecs] + alter_attributes: Optional[Dict[str, str]] def to_proto(self) -> ydb_topic_pb2.AlterConsumer: + supported_codecs = None + if self.set_supported_codecs is not None: + supported_codecs = self.set_supported_codecs.to_proto() + return ydb_topic_pb2.AlterConsumer( name=self.name, set_important=self.set_important, set_read_from=proto_timestamp_from_datetime(self.set_read_from), - set_supported_codecs=self.set_supported_codecs.to_proto(), + set_supported_codecs=supported_codecs, alter_attributes=self.alter_attributes, ) @@ -905,13 +922,11 @@ def from_public(alter_consumer: ydb_topic_public_types.PublicAlterConsumer) -> A if not alter_consumer: return None - supported_codecs = alter_consumer.set_supported_codecs if alter_consumer.set_supported_codecs else [] - return AlterConsumer( name=alter_consumer.name, set_important=alter_consumer.set_important, set_read_from=alter_consumer.set_read_from, - set_supported_codecs=SupportedCodecs(codecs=supported_codecs), + set_supported_codecs=SupportedCodecs.from_public(alter_consumer.set_supported_codecs), alter_attributes=alter_consumer.alter_attributes, ) @@ -936,16 +951,9 @@ def to_proto(self) -> ydb_topic_pb2.PartitioningSettings: @dataclass -class AlterPartitioningSettings(IToProto, IFromProto): - set_min_active_partitions: int - set_partition_count_limit: int - - @staticmethod - def from_proto(msg: ydb_topic_pb2.AlterPartitioningSettings) -> "AlterPartitioningSettings": - return AlterPartitioningSettings( - set_min_active_partitions=msg.set_min_active_partitions, - set_partition_count_limit=msg.set_partition_count_limit, - ) +class AlterPartitioningSettings(IToProto): + set_min_active_partitions: Optional[int] + set_partition_count_limit: Optional[int] def to_proto(self) -> ydb_topic_pb2.AlterPartitioningSettings: return ydb_topic_pb2.AlterPartitioningSettings( @@ -1050,20 +1058,22 @@ class CreateTopicResult: @dataclass class AlterTopicRequest(IToProto, IFromPublic): path: str - add_consumers: List["Consumer"] - alter_partitioning_settings: AlterPartitioningSettings - set_retention_period: datetime.timedelta - set_retention_storage_mb: int - set_supported_codecs: SupportedCodecs - set_partition_write_burst_bytes: typing.Optional[int] - set_partition_write_speed_bytes_per_second: typing.Optional[int] - alter_attributes: Dict[str, str] - alter_consumers: List[AlterConsumer] - drop_consumers: List[str] - set_metering_mode: "MeteringMode" + add_consumers: Optional[List["Consumer"]] + alter_partitioning_settings: Optional[AlterPartitioningSettings] + set_retention_period: Optional[datetime.timedelta] + set_retention_storage_mb: Optional[int] + set_supported_codecs: Optional[SupportedCodecs] + set_partition_write_burst_bytes: Optional[int] + set_partition_write_speed_bytes_per_second: Optional[int] + alter_attributes: Optional[Dict[str, str]] + alter_consumers: Optional[List[AlterConsumer]] + drop_consumers: Optional[List[str]] + set_metering_mode: Optional["MeteringMode"] def to_proto(self) -> ydb_topic_pb2.AlterTopicRequest: - supported_codecs = self.set_supported_codecs.to_proto() if self.set_supported_codecs.codecs else None + supported_codecs = None + if self.set_supported_codecs is not None: + supported_codecs = self.set_supported_codecs.to_proto() return ydb_topic_pb2.AlterTopicRequest( path=self.path, @@ -1098,8 +1108,6 @@ def from_public(req: ydb_topic_public_types.AlterTopicRequestParams) -> AlterTop drop_consumers = req.drop_consumers if req.drop_consumers else [] - supported_codecs = req.set_supported_codecs if req.set_supported_codecs else [] - return AlterTopicRequest( path=req.path, alter_partitioning_settings=AlterPartitioningSettings( @@ -1109,9 +1117,7 @@ def from_public(req: ydb_topic_public_types.AlterTopicRequestParams) -> AlterTop add_consumers=add_consumers, set_retention_period=req.set_retention_period, set_retention_storage_mb=req.set_retention_storage_mb, - set_supported_codecs=SupportedCodecs( - codecs=supported_codecs, - ), + set_supported_codecs=SupportedCodecs.from_public(req.set_supported_codecs), set_partition_write_burst_bytes=req.set_partition_write_burst_bytes, set_partition_write_speed_bytes_per_second=req.set_partition_write_speed_bytes_per_second, alter_attributes=req.alter_attributes, @@ -1121,11 +1127,6 @@ def from_public(req: ydb_topic_public_types.AlterTopicRequestParams) -> AlterTop ) -@dataclass -class AlterTopicResult: - pass - - @dataclass class DescribeTopicRequest: path: str diff --git a/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py b/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py index e829c4ee..d1987546 100644 --- a/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py +++ b/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py @@ -93,7 +93,7 @@ class PublicConsumer: @dataclass class PublicAlterConsumer: name: str - set_important: bool = False + set_important: Optional[bool] = None """ Consumer may be marked as 'important'. It means messages for this consumer will never expire due to retention. User should take care that such consumer never stalls, to prevent running out of disk space. @@ -102,13 +102,13 @@ class PublicAlterConsumer: set_read_from: Optional[datetime.datetime] = None "All messages with smaller server written_at timestamp will be skipped." - set_supported_codecs: List[PublicCodec] = field(default_factory=lambda: list()) + set_supported_codecs: Optional[List[PublicCodec]] = None """ List of supported codecs by this consumer. supported_codecs on topic must be contained inside this list. """ - alter_attributes: Dict[str, str] = field(default_factory=lambda: dict()) + alter_attributes: Optional[Dict[str, str]] = None "Attributes of consumer" diff --git a/ydb/_grpc/grpcwrapper/ydb_topic_test.py b/ydb/_grpc/grpcwrapper/ydb_topic_test.py index ab888eb0..53256ac1 100644 --- a/ydb/_grpc/grpcwrapper/ydb_topic_test.py +++ b/ydb/_grpc/grpcwrapper/ydb_topic_test.py @@ -60,27 +60,25 @@ def test_alter_topic_request_from_public_to_proto(): msg_dict = MessageToDict(request_proto, preserving_proto_field_name=True) - assert msg_dict["path"] == params["path"] - assert len(msg_dict["add_consumers"]) == len(params["add_consumers"]) - assert len(msg_dict["alter_consumers"]) == len(params["alter_consumers"]) - assert len(msg_dict["drop_consumers"]) == len(params["drop_consumers"]) - assert msg_dict["alter_attributes"] == params["alter_attributes"] - - assert ( - int(msg_dict["alter_partitioning_settings"]["set_min_active_partitions"]) == params["set_min_active_partitions"] - ) - assert ( - int(msg_dict["alter_partitioning_settings"]["set_partition_count_limit"]) == params["set_partition_count_limit"] - ) - - assert int(msg_dict["set_partition_write_burst_bytes"]) == params["set_partition_write_burst_bytes"] - assert ( - int(msg_dict["set_partition_write_speed_bytes_per_second"]) - == params["set_partition_write_speed_bytes_per_second"] - ) - assert msg_dict["set_retention_period"] == str(int(params["set_retention_period"].total_seconds())) + "s" - assert int(msg_dict["set_retention_storage_mb"]) == params["set_retention_storage_mb"] - - assert msg_dict["set_metering_mode"] == "METERING_MODE_RESERVED_CAPACITY" + expected_dict = { + "path": "topic_name", + "alter_partitioning_settings": {"set_min_active_partitions": "2", "set_partition_count_limit": "4"}, + "set_retention_period": "2419200s", + "set_retention_storage_mb": "4", + "set_supported_codecs": {"codecs": [1, 2]}, + "set_partition_write_speed_bytes_per_second": "15", + "set_partition_write_burst_bytes": "8", + "alter_attributes": {"key": "value"}, + "add_consumers": [ + {"name": "new_consumer_1", "supported_codecs": {}}, + {"name": "new_consumer_2", "supported_codecs": {}}, + ], + "drop_consumers": ["redundant_consumer"], + "alter_consumers": [ + {"name": "old_consumer_1"}, + {"name": "old_consumer_2"}, + ], + "set_metering_mode": "METERING_MODE_RESERVED_CAPACITY", + } - assert msg_dict["set_supported_codecs"]["codecs"] == params["set_supported_codecs"] + assert msg_dict == expected_dict