diff --git a/tests/topics/test_control_plane.py b/tests/topics/test_control_plane.py index 2446ddcf..25258b61 100644 --- a/tests/topics/test_control_plane.py +++ b/tests/topics/test_control_plane.py @@ -39,6 +39,23 @@ async def test_describe_topic(self, driver, topic_path: str, topic_consumer): assert has_consumer + async def test_alter_not_existed_topic(self, driver, topic_path): + client = driver.topic_client + + with pytest.raises(issues.SchemeError): + await client.alter_topic(topic_path + "-not-exist") + + async def test_alter_existed_topic(self, driver, topic_path): + client = driver.topic_client + + topic_before = await client.describe_topic(topic_path) + + target_min_active_partitions = topic_before.min_active_partitions + 1 + await client.alter_topic(topic_path, set_min_active_partitions=target_min_active_partitions) + + topic_after = await client.describe_topic(topic_path) + assert topic_after.min_active_partitions == target_min_active_partitions + class TestTopicClientControlPlane: def test_create_topic(self, driver_sync, database): @@ -72,3 +89,20 @@ def test_describe_topic(self, driver_sync, topic_path: str, topic_consumer): break assert has_consumer + + def test_alter_not_existed_topic(self, driver_sync, topic_path): + client = driver_sync.topic_client + + with pytest.raises(issues.SchemeError): + client.alter_topic(topic_path + "-not-exist") + + def test_alter_existed_topic(self, driver_sync, topic_path): + client = driver_sync.topic_client + + topic_before = client.describe_topic(topic_path) + + target_min_active_partitions = topic_before.min_active_partitions + 1 + client.alter_topic(topic_path, set_min_active_partitions=target_min_active_partitions) + + topic_after = client.describe_topic(topic_path) + assert topic_after.min_active_partitions == target_min_active_partitions diff --git a/ydb/_apis.py b/ydb/_apis.py index 9a241e5c..bd455838 100644 --- a/ydb/_apis.py +++ b/ydb/_apis.py @@ -123,6 +123,7 @@ class TopicService(object): CreateTopic = "CreateTopic" DescribeTopic = "DescribeTopic" + AlterTopic = "AlterTopic" DropTopic = "DropTopic" StreamRead = "StreamRead" StreamWrite = "StreamWrite" diff --git a/ydb/_grpc/grpcwrapper/ydb_topic.py b/ydb/_grpc/grpcwrapper/ydb_topic.py index 5b5e294a..c1789b6c 100644 --- a/ydb/_grpc/grpcwrapper/ydb_topic.py +++ b/ydb/_grpc/grpcwrapper/ydb_topic.py @@ -33,7 +33,7 @@ ) -class Codec(int, IToPublic): +class Codec(int, IToPublic, IFromPublic): CODEC_UNSPECIFIED = 0 CODEC_RAW = 1 CODEC_GZIP = 2 @@ -47,9 +47,13 @@ def from_proto_iterable(codecs: typing.Iterable[int]) -> List["Codec"]: def to_public(self) -> ydb_topic_public_types.PublicCodec: return ydb_topic_public_types.PublicCodec(int(self)) + @staticmethod + def from_public(codec: Union[ydb_topic_public_types.PublicCodec, int]) -> "Codec": + return Codec(int(codec)) + @dataclass -class SupportedCodecs(IToProto, IFromProto, IToPublic): +class SupportedCodecs(IToProto, IFromProto, IToPublic, IFromPublic): codecs: List[Codec] def to_proto(self) -> ydb_topic_pb2.SupportedCodecs: @@ -69,6 +73,15 @@ def from_proto(msg: Optional[ydb_topic_pb2.SupportedCodecs]) -> "SupportedCodecs def to_public(self) -> List[ydb_topic_public_types.PublicCodec]: return list(map(Codec.to_public, self.codecs)) + @staticmethod + def from_public( + codecs: Optional[List[Union[ydb_topic_public_types.PublicCodec, int]]] + ) -> Optional["SupportedCodecs"]: + if codecs is None: + return None + + return SupportedCodecs(codecs=[Codec.from_public(codec) for codec in codecs]) + @dataclass(order=True) class OffsetsRange(IFromProto, IToProto): @@ -883,6 +896,41 @@ def from_proto( ) +@dataclass +class AlterConsumer(IToProto, IFromPublic): + name: str + set_important: Optional[bool] + set_read_from: Optional[datetime.datetime] + set_supported_codecs: Optional[SupportedCodecs] + alter_attributes: Optional[Dict[str, str]] + + def to_proto(self) -> ydb_topic_pb2.AlterConsumer: + supported_codecs = None + if self.set_supported_codecs is not None: + supported_codecs = self.set_supported_codecs.to_proto() + + return ydb_topic_pb2.AlterConsumer( + name=self.name, + set_important=self.set_important, + set_read_from=proto_timestamp_from_datetime(self.set_read_from), + set_supported_codecs=supported_codecs, + alter_attributes=self.alter_attributes, + ) + + @staticmethod + def from_public(alter_consumer: ydb_topic_public_types.PublicAlterConsumer) -> AlterConsumer: + if not alter_consumer: + return None + + return AlterConsumer( + name=alter_consumer.name, + set_important=alter_consumer.set_important, + set_read_from=alter_consumer.set_read_from, + set_supported_codecs=SupportedCodecs.from_public(alter_consumer.set_supported_codecs), + alter_attributes=alter_consumer.alter_attributes, + ) + + @dataclass class PartitioningSettings(IToProto, IFromProto): min_active_partitions: int @@ -902,6 +950,18 @@ def to_proto(self) -> ydb_topic_pb2.PartitioningSettings: ) +@dataclass +class AlterPartitioningSettings(IToProto): + set_min_active_partitions: Optional[int] + set_partition_count_limit: Optional[int] + + def to_proto(self) -> ydb_topic_pb2.AlterPartitioningSettings: + return ydb_topic_pb2.AlterPartitioningSettings( + set_min_active_partitions=self.set_min_active_partitions, + set_partition_count_limit=self.set_partition_count_limit, + ) + + class MeteringMode(int, IFromProto, IFromPublic, IToPublic): UNSPECIFIED = 0 RESERVED_CAPACITY = 1 @@ -995,6 +1055,78 @@ class CreateTopicResult: pass +@dataclass +class AlterTopicRequest(IToProto, IFromPublic): + path: str + add_consumers: Optional[List["Consumer"]] + alter_partitioning_settings: Optional[AlterPartitioningSettings] + set_retention_period: Optional[datetime.timedelta] + set_retention_storage_mb: Optional[int] + set_supported_codecs: Optional[SupportedCodecs] + set_partition_write_burst_bytes: Optional[int] + set_partition_write_speed_bytes_per_second: Optional[int] + alter_attributes: Optional[Dict[str, str]] + alter_consumers: Optional[List[AlterConsumer]] + drop_consumers: Optional[List[str]] + set_metering_mode: Optional["MeteringMode"] + + def to_proto(self) -> ydb_topic_pb2.AlterTopicRequest: + supported_codecs = None + if self.set_supported_codecs is not None: + supported_codecs = self.set_supported_codecs.to_proto() + + return ydb_topic_pb2.AlterTopicRequest( + path=self.path, + add_consumers=[consumer.to_proto() for consumer in self.add_consumers], + alter_partitioning_settings=self.alter_partitioning_settings.to_proto(), + set_retention_period=proto_duration_from_timedelta(self.set_retention_period), + set_retention_storage_mb=self.set_retention_storage_mb, + set_supported_codecs=supported_codecs, + set_partition_write_burst_bytes=self.set_partition_write_burst_bytes, + set_partition_write_speed_bytes_per_second=self.set_partition_write_speed_bytes_per_second, + alter_attributes=self.alter_attributes, + alter_consumers=[consumer.to_proto() for consumer in self.alter_consumers], + drop_consumers=list(self.drop_consumers), + set_metering_mode=self.set_metering_mode, + ) + + @staticmethod + def from_public(req: ydb_topic_public_types.AlterTopicRequestParams) -> AlterTopicRequest: + add_consumers = [] + if req.add_consumers: + for consumer in req.add_consumers: + if isinstance(consumer, str): + consumer = ydb_topic_public_types.PublicConsumer(name=consumer) + add_consumers.append(Consumer.from_public(consumer)) + + alter_consumers = [] + if req.alter_consumers: + for consumer in req.alter_consumers: + if isinstance(consumer, str): + consumer = ydb_topic_public_types.PublicAlterConsumer(name=consumer) + alter_consumers.append(AlterConsumer.from_public(consumer)) + + drop_consumers = req.drop_consumers if req.drop_consumers else [] + + return AlterTopicRequest( + path=req.path, + alter_partitioning_settings=AlterPartitioningSettings( + set_min_active_partitions=req.set_min_active_partitions, + set_partition_count_limit=req.set_partition_count_limit, + ), + add_consumers=add_consumers, + set_retention_period=req.set_retention_period, + set_retention_storage_mb=req.set_retention_storage_mb, + set_supported_codecs=SupportedCodecs.from_public(req.set_supported_codecs), + set_partition_write_burst_bytes=req.set_partition_write_burst_bytes, + set_partition_write_speed_bytes_per_second=req.set_partition_write_speed_bytes_per_second, + alter_attributes=req.alter_attributes, + alter_consumers=alter_consumers, + drop_consumers=drop_consumers, + set_metering_mode=MeteringMode.from_public(req.set_metering_mode), + ) + + @dataclass class DescribeTopicRequest: path: str diff --git a/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py b/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py index df280a8b..d1987546 100644 --- a/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py +++ b/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py @@ -30,6 +30,23 @@ class CreateTopicRequestParams: metering_mode: Optional["PublicMeteringMode"] +@dataclass +class AlterTopicRequestParams: + path: str + set_min_active_partitions: Optional[int] + set_partition_count_limit: Optional[int] + add_consumers: Optional[List[Union["PublicConsumer", str]]] + alter_consumers: Optional[List[Union["PublicAlterConsumer", str]]] + drop_consumers: Optional[List[str]] + alter_attributes: Optional[Dict[str, str]] + set_metering_mode: Optional["PublicMeteringMode"] + set_partition_write_speed_bytes_per_second: Optional[int] + set_partition_write_burst_bytes: Optional[int] + set_retention_period: Optional[datetime.timedelta] + set_retention_storage_mb: Optional[int] + set_supported_codecs: Optional[List[Union["PublicCodec", int]]] + + class PublicCodec(int): """ Codec value may contain any int number. @@ -73,6 +90,28 @@ class PublicConsumer: "Attributes of consumer" +@dataclass +class PublicAlterConsumer: + name: str + set_important: Optional[bool] = None + """ + Consumer may be marked as 'important'. It means messages for this consumer will never expire due to retention. + User should take care that such consumer never stalls, to prevent running out of disk space. + """ + + set_read_from: Optional[datetime.datetime] = None + "All messages with smaller server written_at timestamp will be skipped." + + set_supported_codecs: Optional[List[PublicCodec]] = None + """ + List of supported codecs by this consumer. + supported_codecs on topic must be contained inside this list. + """ + + alter_attributes: Optional[Dict[str, str]] = None + "Attributes of consumer" + + @dataclass class DropTopicRequestParams(IToProto): path: str diff --git a/ydb/_grpc/grpcwrapper/ydb_topic_test.py b/ydb/_grpc/grpcwrapper/ydb_topic_test.py index ff29834a..53256ac1 100644 --- a/ydb/_grpc/grpcwrapper/ydb_topic_test.py +++ b/ydb/_grpc/grpcwrapper/ydb_topic_test.py @@ -1,4 +1,15 @@ +import datetime + +from google.protobuf.json_format import MessageToDict + from ydb._grpc.grpcwrapper.ydb_topic import OffsetsRange +from .ydb_topic import AlterTopicRequest +from .ydb_topic_public_types import ( + AlterTopicRequestParams, + PublicAlterConsumer, + PublicConsumer, + PublicCodec, +) def test_offsets_range_intersected(): @@ -17,3 +28,57 @@ def test_offsets_range_intersected(): ]: assert OffsetsRange(test[0], test[1]).is_intersected_with(OffsetsRange(test[2], test[3])) assert OffsetsRange(test[2], test[3]).is_intersected_with(OffsetsRange(test[0], test[1])) + + +def test_alter_topic_request_from_public_to_proto(): + # Specify all fields with all possible input ways + params = { + "path": "topic_name", + "add_consumers": [ + "new_consumer_1", + PublicConsumer("new_consumer_2"), + ], + "alter_consumers": [ + "old_consumer_1", + PublicAlterConsumer("old_consumer_2"), + ], + "drop_consumers": ["redundant_consumer"], + "set_retention_period": datetime.timedelta(weeks=4), + "set_retention_storage_mb": 4, + "set_supported_codecs": [1, PublicCodec(2)], + "set_partition_write_burst_bytes": 8, + "set_partition_write_speed_bytes_per_second": 15, + "alter_attributes": {"key": "value"}, + "set_metering_mode": 1, + "set_min_active_partitions": 2, + "set_partition_count_limit": 4, + } + + params_public = AlterTopicRequestParams(**params) + request = AlterTopicRequest.from_public(params_public) + request_proto = request.to_proto() + + msg_dict = MessageToDict(request_proto, preserving_proto_field_name=True) + + expected_dict = { + "path": "topic_name", + "alter_partitioning_settings": {"set_min_active_partitions": "2", "set_partition_count_limit": "4"}, + "set_retention_period": "2419200s", + "set_retention_storage_mb": "4", + "set_supported_codecs": {"codecs": [1, 2]}, + "set_partition_write_speed_bytes_per_second": "15", + "set_partition_write_burst_bytes": "8", + "alter_attributes": {"key": "value"}, + "add_consumers": [ + {"name": "new_consumer_1", "supported_codecs": {}}, + {"name": "new_consumer_2", "supported_codecs": {}}, + ], + "drop_consumers": ["redundant_consumer"], + "alter_consumers": [ + {"name": "old_consumer_1"}, + {"name": "old_consumer_2"}, + ], + "set_metering_mode": "METERING_MODE_RESERVED_CAPACITY", + } + + assert msg_dict == expected_dict diff --git a/ydb/topic.py b/ydb/topic.py index 948bcff4..f0b872e2 100644 --- a/ydb/topic.py +++ b/ydb/topic.py @@ -6,6 +6,7 @@ "TopicClientSettings", "TopicCodec", "TopicConsumer", + "TopicAlterConsumer", "TopicDescription", "TopicError", "TopicMeteringMode", @@ -77,6 +78,7 @@ PublicPartitionStats as TopicPartitionStats, PublicCodec as TopicCodec, PublicConsumer as TopicConsumer, + PublicAlterConsumer as TopicAlterConsumer, PublicMeteringMode as TopicMeteringMode, ) @@ -145,6 +147,53 @@ async def create_topic( _wrap_operation, ) + async def alter_topic( + self, + path: str, + set_min_active_partitions: Optional[int] = None, + set_partition_count_limit: Optional[int] = None, + add_consumers: Optional[List[Union[TopicConsumer, str]]] = None, + alter_consumers: Optional[List[Union[TopicAlterConsumer, str]]] = None, + drop_consumers: Optional[List[str]] = None, + alter_attributes: Optional[Dict[str, str]] = None, + set_metering_mode: Optional[TopicMeteringMode] = None, + set_partition_write_speed_bytes_per_second: Optional[int] = None, + set_partition_write_burst_bytes: Optional[int] = None, + set_retention_period: Optional[datetime.timedelta] = None, + set_retention_storage_mb: Optional[int] = None, + set_supported_codecs: Optional[List[Union[TopicCodec, int]]] = None, + ): + """ + alter topic command + + :param path: full path to topic + :param set_min_active_partitions: Minimum partition count auto merge would stop working at. + :param set_partition_count_limit: Limit for total partition count, including active (open for write) + and read-only partitions. + :param add_consumers: List of consumers for this topic to add + :param alter_consumers: List of consumers for this topic to alter + :param drop_consumers: List of consumer names for this topic to drop + :param alter_attributes: User and server attributes of topic. + Server attributes starts from "_" and will be validated by server. + :param set_metering_mode: Metering mode for the topic in a serverless database + :param set_partition_write_speed_bytes_per_second: Partition write speed in bytes per second + :param set_partition_write_burst_bytes: Burst size for write in partition, in bytes + :param set_retention_period: How long data in partition should be stored + :param set_retention_storage_mb: How much data in partition should be stored + :param set_supported_codecs: List of allowed codecs for writers. Writes with codec not from this list are forbidden. + Empty list mean disable codec compatibility checks for the topic. + """ + args = locals().copy() + del args["self"] + req = _ydb_topic_public_types.AlterTopicRequestParams(**args) + req = _ydb_topic.AlterTopicRequest.from_public(req) + await self._driver( + req.to_proto(), + _apis.TopicService.Stub, + _apis.TopicService.AlterTopic, + _wrap_operation, + ) + async def describe_topic(self, path: str, include_stats: bool = False) -> TopicDescription: args = locals().copy() del args["self"] @@ -297,6 +346,55 @@ def create_topic( _wrap_operation, ) + def alter_topic( + self, + path: str, + set_min_active_partitions: Optional[int] = None, + set_partition_count_limit: Optional[int] = None, + add_consumers: Optional[List[Union[TopicConsumer, str]]] = None, + alter_consumers: Optional[List[Union[TopicAlterConsumer, str]]] = None, + drop_consumers: Optional[List[str]] = None, + alter_attributes: Optional[Dict[str, str]] = None, + set_metering_mode: Optional[TopicMeteringMode] = None, + set_partition_write_speed_bytes_per_second: Optional[int] = None, + set_partition_write_burst_bytes: Optional[int] = None, + set_retention_period: Optional[datetime.timedelta] = None, + set_retention_storage_mb: Optional[int] = None, + set_supported_codecs: Optional[List[Union[TopicCodec, int]]] = None, + ): + """ + alter topic command + + :param path: full path to topic + :param set_min_active_partitions: Minimum partition count auto merge would stop working at. + :param set_partition_count_limit: Limit for total partition count, including active (open for write) + and read-only partitions. + :param add_consumers: List of consumers for this topic to add + :param alter_consumers: List of consumers for this topic to alter + :param drop_consumers: List of consumer names for this topic to drop + :param alter_attributes: User and server attributes of topic. + Server attributes starts from "_" and will be validated by server. + :param set_metering_mode: Metering mode for the topic in a serverless database + :param set_partition_write_speed_bytes_per_second: Partition write speed in bytes per second + :param set_partition_write_burst_bytes: Burst size for write in partition, in bytes + :param set_retention_period: How long data in partition should be stored + :param set_retention_storage_mb: How much data in partition should be stored + :param set_supported_codecs: List of allowed codecs for writers. Writes with codec not from this list are forbidden. + Empty list mean disable codec compatibility checks for the topic. + """ + args = locals().copy() + del args["self"] + self._check_closed() + + req = _ydb_topic_public_types.AlterTopicRequestParams(**args) + req = _ydb_topic.AlterTopicRequest.from_public(req) + self._driver( + req.to_proto(), + _apis.TopicService.Stub, + _apis.TopicService.AlterTopic, + _wrap_operation, + ) + def describe_topic(self, path: str, include_stats: bool = False) -> TopicDescription: args = locals().copy() del args["self"]