@@ -418,12 +418,14 @@ def from_proto(
418
418
class InitRequest (IToProto ):
419
419
topics_read_settings : List ["StreamReadMessage.InitRequest.TopicReadSettings" ]
420
420
consumer : str
421
+ auto_partitioning_support : bool
421
422
422
423
def to_proto (self ) -> ydb_topic_pb2 .StreamReadMessage .InitRequest :
423
424
res = ydb_topic_pb2 .StreamReadMessage .InitRequest ()
424
425
res .consumer = self .consumer
425
426
for settings in self .topics_read_settings :
426
427
res .topics_read_settings .append (settings .to_proto ())
428
+ res .auto_partitioning_support = self .auto_partitioning_support
427
429
return res
428
430
429
431
@dataclass
@@ -695,6 +697,20 @@ def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.StopPartitionSessionRespon
695
697
partition_session_id = self .partition_session_id ,
696
698
)
697
699
700
+ @dataclass
701
+ class EndPartitionSession (IFromProto ):
702
+ partition_session_id : int
703
+ adjacent_partition_ids : List [int ]
704
+ child_partition_ids : List [int ]
705
+
706
+ @staticmethod
707
+ def from_proto (msg : ydb_topic_pb2 .StreamReadMessage .EndPartitionSession ):
708
+ return StreamReadMessage .EndPartitionSession (
709
+ partition_session_id = msg .partition_session_id ,
710
+ adjacent_partition_ids = list (msg .adjacent_partition_ids ),
711
+ child_partition_ids = list (msg .child_partition_ids ),
712
+ )
713
+
698
714
@dataclass
699
715
class FromClient (IToProto ):
700
716
client_message : "ReaderMessagesFromClientToServer"
@@ -774,6 +790,13 @@ def from_proto(
774
790
msg .partition_session_status_response
775
791
),
776
792
)
793
+ elif mess_type == "end_partition_session" :
794
+ return StreamReadMessage .FromServer (
795
+ server_status = server_status ,
796
+ server_message = StreamReadMessage .EndPartitionSession .from_proto (
797
+ msg .end_partition_session ,
798
+ ),
799
+ )
777
800
else :
778
801
raise issues .UnexpectedGrpcMessage (
779
802
"Unexpected message while parse ReaderMessagesFromServerToClient: '%s'" % mess_type
@@ -798,6 +821,7 @@ def from_proto(
798
821
UpdateTokenResponse ,
799
822
StreamReadMessage .StartPartitionSessionRequest ,
800
823
StreamReadMessage .StopPartitionSessionRequest ,
824
+ StreamReadMessage .EndPartitionSession ,
801
825
]
802
826
803
827
@@ -942,30 +966,196 @@ def from_public(alter_consumer: ydb_topic_public_types.PublicAlterConsumer) -> A
942
966
class PartitioningSettings (IToProto , IFromProto ):
943
967
min_active_partitions : int
944
968
partition_count_limit : int
969
+ max_active_partitions : int
970
+ auto_partitioning_settings : AutoPartitioningSettings
945
971
946
972
@staticmethod
947
973
def from_proto (msg : ydb_topic_pb2 .PartitioningSettings ) -> "PartitioningSettings" :
948
974
return PartitioningSettings (
949
975
min_active_partitions = msg .min_active_partitions ,
950
976
partition_count_limit = msg .partition_count_limit ,
977
+ max_active_partitions = msg .max_active_partitions ,
978
+ auto_partitioning_settings = AutoPartitioningSettings .from_proto (msg .auto_partitioning_settings ),
951
979
)
952
980
953
981
def to_proto (self ) -> ydb_topic_pb2 .PartitioningSettings :
982
+ auto_partitioning_settings = None
983
+ if self .auto_partitioning_settings is not None :
984
+ auto_partitioning_settings = self .auto_partitioning_settings .to_proto ()
985
+
954
986
return ydb_topic_pb2 .PartitioningSettings (
955
987
min_active_partitions = self .min_active_partitions ,
956
988
partition_count_limit = self .partition_count_limit ,
989
+ max_active_partitions = self .max_active_partitions ,
990
+ auto_partitioning_settings = auto_partitioning_settings ,
991
+ )
992
+
993
+
994
+ class AutoPartitioningStrategy (int , IFromProto , IFromPublic , IToPublic ):
995
+ UNSPECIFIED = 0
996
+ DISABLED = 1
997
+ SCALE_UP = 2
998
+ SCALE_UP_AND_DOWN = 3
999
+ PAUSED = 4
1000
+
1001
+ @staticmethod
1002
+ def from_public (
1003
+ strategy : Optional [ydb_topic_public_types .PublicAutoPartitioningStrategy ],
1004
+ ) -> Optional ["AutoPartitioningStrategy" ]:
1005
+ if strategy is None :
1006
+ return None
1007
+
1008
+ return AutoPartitioningStrategy (strategy )
1009
+
1010
+ @staticmethod
1011
+ def from_proto (code : Optional [int ]) -> Optional ["AutoPartitioningStrategy" ]:
1012
+ if code is None :
1013
+ return None
1014
+
1015
+ return AutoPartitioningStrategy (code )
1016
+
1017
+ def to_public (self ) -> ydb_topic_public_types .PublicAutoPartitioningStrategy :
1018
+ try :
1019
+ return ydb_topic_public_types .PublicAutoPartitioningStrategy (int (self ))
1020
+ except KeyError :
1021
+ return ydb_topic_public_types .PublicAutoPartitioningStrategy .UNSPECIFIED
1022
+
1023
+
1024
+ @dataclass
1025
+ class AutoPartitioningSettings (IToProto , IFromProto , IFromPublic , IToPublic ):
1026
+ strategy : AutoPartitioningStrategy
1027
+ partition_write_speed : AutoPartitioningWriteSpeedStrategy
1028
+
1029
+ @staticmethod
1030
+ def from_public (
1031
+ settings : Optional [ydb_topic_public_types .PublicAutoPartitioningSettings ],
1032
+ ) -> Optional [AutoPartitioningSettings ]:
1033
+ if not settings :
1034
+ return None
1035
+
1036
+ return AutoPartitioningSettings (
1037
+ strategy = settings .strategy ,
1038
+ partition_write_speed = AutoPartitioningWriteSpeedStrategy (
1039
+ stabilization_window = settings .stabilization_window ,
1040
+ up_utilization_percent = settings .up_utilization_percent ,
1041
+ down_utilization_percent = settings .down_utilization_percent ,
1042
+ ),
1043
+ )
1044
+
1045
+ @staticmethod
1046
+ def from_proto (msg : ydb_topic_pb2 .AutoPartitioningSettings ) -> AutoPartitioningSettings :
1047
+ if msg is None :
1048
+ return None
1049
+
1050
+ return AutoPartitioningSettings (
1051
+ strategy = AutoPartitioningStrategy .from_proto (msg .strategy ),
1052
+ partition_write_speed = AutoPartitioningWriteSpeedStrategy .from_proto (msg .partition_write_speed ),
1053
+ )
1054
+
1055
+ def to_proto (self ) -> ydb_topic_pb2 .AutoPartitioningSettings :
1056
+ return ydb_topic_pb2 .AutoPartitioningSettings (
1057
+ strategy = self .strategy , partition_write_speed = self .partition_write_speed .to_proto ()
1058
+ )
1059
+
1060
+ def to_public (self ) -> ydb_topic_public_types .PublicAutoPartitioningSettings :
1061
+ return ydb_topic_public_types .PublicAutoPartitioningSettings (
1062
+ strategy = self .strategy .to_public (),
1063
+ stabilization_window = self .partition_write_speed .stabilization_window ,
1064
+ up_utilization_percent = self .partition_write_speed .up_utilization_percent ,
1065
+ down_utilization_percent = self .partition_write_speed .down_utilization_percent ,
1066
+ )
1067
+
1068
+
1069
+ @dataclass
1070
+ class AutoPartitioningWriteSpeedStrategy (IToProto , IFromProto ):
1071
+ stabilization_window : Optional [datetime .timedelta ]
1072
+ up_utilization_percent : Optional [int ]
1073
+ down_utilization_percent : Optional [int ]
1074
+
1075
+ def to_proto (self ):
1076
+ return ydb_topic_pb2 .AutoPartitioningWriteSpeedStrategy (
1077
+ stabilization_window = proto_duration_from_timedelta (self .stabilization_window ),
1078
+ up_utilization_percent = self .up_utilization_percent ,
1079
+ down_utilization_percent = self .down_utilization_percent ,
1080
+ )
1081
+
1082
+ @staticmethod
1083
+ def from_proto (
1084
+ msg : Optional [ydb_topic_pb2 .AutoPartitioningWriteSpeedStrategy ],
1085
+ ) -> Optional [AutoPartitioningWriteSpeedStrategy ]:
1086
+ if msg is None :
1087
+ return None
1088
+
1089
+ return AutoPartitioningWriteSpeedStrategy (
1090
+ stabilization_window = timedelta_from_proto_duration (msg .stabilization_window ),
1091
+ up_utilization_percent = msg .up_utilization_percent ,
1092
+ down_utilization_percent = msg .down_utilization_percent ,
957
1093
)
958
1094
959
1095
960
1096
@dataclass
961
1097
class AlterPartitioningSettings (IToProto ):
962
1098
set_min_active_partitions : Optional [int ]
963
1099
set_partition_count_limit : Optional [int ]
1100
+ set_max_active_partitions : Optional [int ]
1101
+ alter_auto_partitioning_settings : Optional [AlterAutoPartitioningSettings ]
964
1102
965
1103
def to_proto (self ) -> ydb_topic_pb2 .AlterPartitioningSettings :
1104
+ alter_auto_partitioning_settings = None
1105
+ if self .alter_auto_partitioning_settings is not None :
1106
+ alter_auto_partitioning_settings = self .alter_auto_partitioning_settings .to_proto ()
1107
+
966
1108
return ydb_topic_pb2 .AlterPartitioningSettings (
967
1109
set_min_active_partitions = self .set_min_active_partitions ,
968
1110
set_partition_count_limit = self .set_partition_count_limit ,
1111
+ set_max_active_partitions = self .set_max_active_partitions ,
1112
+ alter_auto_partitioning_settings = alter_auto_partitioning_settings ,
1113
+ )
1114
+
1115
+
1116
+ @dataclass
1117
+ class AlterAutoPartitioningSettings (IToProto , IFromPublic ):
1118
+ set_strategy : Optional [AutoPartitioningStrategy ]
1119
+ set_partition_write_speed : Optional [AlterAutoPartitioningWriteSpeedStrategy ]
1120
+
1121
+ @staticmethod
1122
+ def from_public (
1123
+ settings : Optional [ydb_topic_public_types .PublicAlterAutoPartitioningSettings ],
1124
+ ) -> Optional [AlterAutoPartitioningSettings ]:
1125
+ if not settings :
1126
+ return None
1127
+
1128
+ return AlterAutoPartitioningSettings (
1129
+ set_strategy = settings .set_strategy ,
1130
+ set_partition_write_speed = AlterAutoPartitioningWriteSpeedStrategy (
1131
+ set_stabilization_window = settings .set_stabilization_window ,
1132
+ set_up_utilization_percent = settings .set_up_utilization_percent ,
1133
+ set_down_utilization_percent = settings .set_down_utilization_percent ,
1134
+ ),
1135
+ )
1136
+
1137
+ def to_proto (self ) -> ydb_topic_pb2 .AlterAutoPartitioningSettings :
1138
+ set_partition_write_speed = None
1139
+ if self .set_partition_write_speed :
1140
+ set_partition_write_speed = self .set_partition_write_speed .to_proto ()
1141
+
1142
+ return ydb_topic_pb2 .AlterAutoPartitioningSettings (
1143
+ set_strategy = self .set_strategy ,
1144
+ set_partition_write_speed = set_partition_write_speed ,
1145
+ )
1146
+
1147
+
1148
+ @dataclass
1149
+ class AlterAutoPartitioningWriteSpeedStrategy (IToProto ):
1150
+ set_stabilization_window : Optional [datetime .timedelta ]
1151
+ set_up_utilization_percent : Optional [int ]
1152
+ set_down_utilization_percent : Optional [int ]
1153
+
1154
+ def to_proto (self ) -> ydb_topic_pb2 .AlterAutoPartitioningWriteSpeedStrategy :
1155
+ return ydb_topic_pb2 .AlterAutoPartitioningWriteSpeedStrategy (
1156
+ set_stabilization_window = proto_duration_from_timedelta (self .set_stabilization_window ),
1157
+ set_up_utilization_percent = self .set_up_utilization_percent ,
1158
+ set_down_utilization_percent = self .set_down_utilization_percent ,
969
1159
)
970
1160
971
1161
@@ -992,7 +1182,7 @@ def from_proto(code: Optional[int]) -> Optional["MeteringMode"]:
992
1182
993
1183
def to_public (self ) -> ydb_topic_public_types .PublicMeteringMode :
994
1184
try :
995
- ydb_topic_public_types .PublicMeteringMode (int (self ))
1185
+ return ydb_topic_public_types .PublicMeteringMode (int (self ))
996
1186
except KeyError :
997
1187
return ydb_topic_public_types .PublicMeteringMode .UNSPECIFIED
998
1188
@@ -1011,9 +1201,13 @@ class CreateTopicRequest(IToProto, IFromPublic):
1011
1201
metering_mode : "MeteringMode"
1012
1202
1013
1203
def to_proto (self ) -> ydb_topic_pb2 .CreateTopicRequest :
1204
+ partitioning_settings = None
1205
+ if self .partitioning_settings is not None :
1206
+ partitioning_settings = self .partitioning_settings .to_proto ()
1207
+
1014
1208
return ydb_topic_pb2 .CreateTopicRequest (
1015
1209
path = self .path ,
1016
- partitioning_settings = self . partitioning_settings . to_proto () ,
1210
+ partitioning_settings = partitioning_settings ,
1017
1211
retention_period = proto_duration_from_timedelta (self .retention_period ),
1018
1212
retention_storage_mb = self .retention_storage_mb ,
1019
1213
supported_codecs = self .supported_codecs .to_proto (),
@@ -1038,11 +1232,17 @@ def from_public(req: ydb_topic_public_types.CreateTopicRequestParams):
1038
1232
consumer = ydb_topic_public_types .PublicConsumer (name = consumer )
1039
1233
consumers .append (Consumer .from_public (consumer ))
1040
1234
1235
+ auto_partitioning_settings = None
1236
+ if req .auto_partitioning_settings is not None :
1237
+ auto_partitioning_settings = AutoPartitioningSettings .from_public (req .auto_partitioning_settings )
1238
+
1041
1239
return CreateTopicRequest (
1042
1240
path = req .path ,
1043
1241
partitioning_settings = PartitioningSettings (
1044
1242
min_active_partitions = req .min_active_partitions ,
1045
1243
partition_count_limit = req .partition_count_limit ,
1244
+ max_active_partitions = req .max_active_partitions ,
1245
+ auto_partitioning_settings = auto_partitioning_settings ,
1046
1246
),
1047
1247
retention_period = req .retention_period ,
1048
1248
retention_storage_mb = req .retention_storage_mb ,
@@ -1113,13 +1313,21 @@ def from_public(req: ydb_topic_public_types.AlterTopicRequestParams) -> AlterTop
1113
1313
consumer = ydb_topic_public_types .PublicAlterConsumer (name = consumer )
1114
1314
alter_consumers .append (AlterConsumer .from_public (consumer ))
1115
1315
1316
+ alter_auto_partitioning_settings = None
1317
+ if req .alter_auto_partitioning_settings is not None :
1318
+ alter_auto_partitioning_settings = AlterAutoPartitioningSettings .from_public (
1319
+ req .alter_auto_partitioning_settings
1320
+ )
1321
+
1116
1322
drop_consumers = req .drop_consumers if req .drop_consumers else []
1117
1323
1118
1324
return AlterTopicRequest (
1119
1325
path = req .path ,
1120
1326
alter_partitioning_settings = AlterPartitioningSettings (
1121
1327
set_min_active_partitions = req .set_min_active_partitions ,
1122
1328
set_partition_count_limit = req .set_partition_count_limit ,
1329
+ set_max_active_partitions = req .set_max_active_partitions ,
1330
+ alter_auto_partitioning_settings = alter_auto_partitioning_settings ,
1123
1331
),
1124
1332
add_consumers = add_consumers ,
1125
1333
set_retention_period = req .set_retention_period ,
@@ -1180,6 +1388,8 @@ def to_public(self) -> ydb_topic_public_types.PublicDescribeTopicResult:
1180
1388
return ydb_topic_public_types .PublicDescribeTopicResult (
1181
1389
self = scheme ._wrap_scheme_entry (self .self_proto ),
1182
1390
min_active_partitions = self .partitioning_settings .min_active_partitions ,
1391
+ max_active_partitions = self .partitioning_settings .max_active_partitions ,
1392
+ auto_partitioning_settings = self .partitioning_settings .auto_partitioning_settings .to_public (),
1183
1393
partition_count_limit = self .partitioning_settings .partition_count_limit ,
1184
1394
partitions = list (map (DescribeTopicResult .PartitionInfo .to_public , self .partitions )),
1185
1395
retention_period = self .retention_period ,
0 commit comments