Skip to content

Commit 25add43

Browse files
nshestakovGazizonoki
authored andcommitted
Moved "
Send EndPartitionSession control message" commit from ydb repo
1 parent c83fae3 commit 25add43

File tree

11 files changed

+293
-5
lines changed

11 files changed

+293
-5
lines changed

include/ydb-cpp-sdk/client/federated_topic/federated_topic.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ struct TReadSessionEvent {
134134
using TCommitOffsetAcknowledgementEvent = TFederated<NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent>;
135135
using TStartPartitionSessionEvent = TFederated<NTopic::TReadSessionEvent::TStartPartitionSessionEvent>;
136136
using TStopPartitionSessionEvent = TFederated<NTopic::TReadSessionEvent::TStopPartitionSessionEvent>;
137+
using TEndPartitionSessionEvent = TFederated<NTopic::TReadSessionEvent::TEndPartitionSessionEvent>;
137138
using TPartitionSessionStatusEvent = TFederated<NTopic::TReadSessionEvent::TPartitionSessionStatusEvent>;
138139
using TPartitionSessionClosedEvent = TFederated<NTopic::TReadSessionEvent::TPartitionSessionClosedEvent>;
139140

@@ -202,6 +203,7 @@ struct TReadSessionEvent {
202203
TCommitOffsetAcknowledgementEvent,
203204
TStartPartitionSessionEvent,
204205
TStopPartitionSessionEvent,
206+
TEndPartitionSessionEvent,
205207
TPartitionSessionStatusEvent,
206208
TPartitionSessionClosedEvent,
207209
TSessionClosedEvent>;
@@ -352,6 +354,13 @@ struct TFederatedReadSessionSettings: public NTopic::TReadSessionSettings {
352354
FLUENT_SETTING(std::function<void(TReadSessionEvent::TStopPartitionSessionEvent&)>,
353355
StopPartitionSessionHandler);
354356

357+
//! Function to handle end partition session events.
358+
//! If this handler is set, end partition session events will be handled by handler,
359+
//! otherwise sent to TReadSession::GetEvent().
360+
//! Default value is empty function (not set).
361+
FLUENT_SETTING(std::function<void(TReadSessionEvent::TEndPartitionSessionEvent&)>,
362+
EndPartitionSessionHandler);
363+
355364
//! Function to handle partition session status events.
356365
//! If this handler is set, partition session status events will be handled by handler,
357366
//! otherwise sent to TReadSession::GetEvent().
@@ -535,6 +544,8 @@ void TPrintable<NFederatedTopic::TReadSessionEvent::TFederated<NFederatedTopic::
535544
template<>
536545
void TPrintable<NFederatedTopic::TReadSessionEvent::TFederated<NFederatedTopic::TReadSessionEvent::TStopPartitionSessionEvent>>::DebugString(TStringBuilder& res, bool) const;
537546
template<>
547+
void TPrintable<NFederatedTopic::TReadSessionEvent::TFederated<NFederatedTopic::TReadSessionEvent::TEndPartitionSessionEvent>>::DebugString(TStringBuilder& res, bool) const;
548+
template<>
538549
void TPrintable<NFederatedTopic::TReadSessionEvent::TFederated<NFederatedTopic::TReadSessionEvent::TPartitionSessionStatusEvent>>::DebugString(TStringBuilder& res, bool) const;
539550
template<>
540551
void TPrintable<NFederatedTopic::TReadSessionEvent::TFederated<NFederatedTopic::TReadSessionEvent::TPartitionSessionClosedEvent>>::DebugString(TStringBuilder& res, bool) const;

include/ydb-cpp-sdk/client/topic/read_events.h

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,26 @@ struct TReadSessionEvent {
282282
ui64 CommittedOffset;
283283
};
284284

285+
//! Server command for ending partition session.
286+
//! This is a hint that all messages from the partition have been read and will no longer appear, and that the client must commit offsets.
287+
struct TEndPartitionSessionEvent: public TPartitionSessionAccessor, public TPrintable<TEndPartitionSessionEvent> {
288+
TEndPartitionSessionEvent(TPartitionSession::TPtr partitionSession, std::vector<ui32>&& adjacentPartitionIds, std::vector<ui32>&& childPartitionIds);
289+
290+
//! A list of the partition IDs that also participated in the partition's merge.
291+
const std::vector<ui32> GetAdjacentPartitionIds() const {
292+
return AdjacentPartitionIds;
293+
}
294+
295+
//! A list of partition IDs that were obtained as a result of merging or splitting this partition.
296+
const std::vector<ui32> GetChildPartitionIds() const {
297+
return ChildPartitionIds;
298+
}
299+
300+
private:
301+
std::vector<ui32> AdjacentPartitionIds;
302+
std::vector<ui32> ChildPartitionIds;
303+
};
304+
285305
//! Status for partition session requested via TPartitionSession::RequestStatus()
286306
struct TPartitionSessionStatusEvent: public TPartitionSessionAccessor,
287307
public TPrintable<TPartitionSessionStatusEvent> {
@@ -343,6 +363,7 @@ struct TReadSessionEvent {
343363
TCommitOffsetAcknowledgementEvent,
344364
TStartPartitionSessionEvent,
345365
TStopPartitionSessionEvent,
366+
TEndPartitionSessionEvent,
346367
TPartitionSessionStatusEvent,
347368
TPartitionSessionClosedEvent,
348369
TSessionClosedEvent>;

include/ydb-cpp-sdk/client/topic/read_session.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,13 @@ struct TReadSessionSettings: public TRequestSettings<TReadSessionSettings> {
104104
FLUENT_SETTING(std::function<void(TReadSessionEvent::TStopPartitionSessionEvent&)>,
105105
StopPartitionSessionHandler);
106106

107+
//! Function to handle end partition session events.
108+
//! If this handler is set, end partition session events will be handled by handler,
109+
//! otherwise sent to TReadSession::GetEvent().
110+
//! Default value is empty function (not set).
111+
FLUENT_SETTING(std::function<void(TReadSessionEvent::TEndPartitionSessionEvent&)>,
112+
EndPartitionSessionHandler);
113+
107114
//! Function to handle partition session status events.
108115
//! If this handler is set, partition session status events will be handled by handler,
109116
//! otherwise sent to TReadSession::GetEvent().
@@ -186,6 +193,9 @@ struct TReadSessionSettings: public TRequestSettings<TReadSessionSettings> {
186193

187194
FLUENT_SETTING_DEFAULT(TDuration, ConnectTimeout, TDuration::Seconds(30));
188195

196+
//! AutoscalingSupport.
197+
FLUENT_SETTING_OPTIONAL(bool, AutoscalingSupport);
198+
189199
//! Log.
190200
FLUENT_SETTING_OPTIONAL(TLog, Log);
191201
};

src/api/protos/ydb_persqueue_v1.proto

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import "src/api/protos/ydb_status_codes.proto";
55
import "src/api/protos/ydb_issue_message.proto";
66
import "src/api/protos/annotations/validation.proto";
77

8+
import "google/protobuf/duration.proto";
9+
810
package Ydb.PersQueue.V1;
911

1012
option java_package = "com.yandex.ydb.persqueue";
@@ -1079,6 +1081,18 @@ message Credentials {
10791081
}
10801082
}
10811083

1084+
enum AutoscalingStrategy {
1085+
// The autoscaling algorithm is not specified. The default value will be used.
1086+
AUTOSCALING_STRATEGY_UNSPECIFIED = 0;
1087+
// The autoscaling is disabled.
1088+
AUTOSCALING_STRATEGY_DISABLED = 1;
1089+
// The autoscaling algorithm will increase partitions count depending on the load characteristics.
1090+
// The autoscaling algorithm will never decrease the number of partitions.
1091+
AUTOSCALING_STRATEGY_SCALE_UP = 2;
1092+
// The autoscaling algorithm will both increase and decrease partitions count depending on the load characteristics.
1093+
AUTOSCALING_STRATEGY_SCALE_UP_AND_DOWN = 3;
1094+
}
1095+
10821096
/**
10831097
* Message for describing topic internals.
10841098
*/
@@ -1088,8 +1102,13 @@ message TopicSettings {
10881102
FORMAT_BASE = 1;
10891103
}
10901104

1091-
// How many partitions in topic. Must less than database limit. Default limit - 10.
1092-
int32 partitions_count = 1 [(value) = "> 0"];
1105+
oneof partitioning {
1106+
// How many partitions in topic. Must less than database limit. Default limit - 10.
1107+
int32 partitions_count = 1 [(value) = "> 0"];
1108+
// Settings for the partitions count autoscaling.
1109+
AutoscalingSettings autoscaling_settings = 15;
1110+
};
1111+
10931112
oneof retention {
10941113
// How long data in partition should be stored. Must be greater than 0 and less than limit for this database.
10951114
// Default limit - 36 hours.
@@ -1164,6 +1183,40 @@ message TopicSettings {
11641183
RemoteMirrorRule remote_mirror_rule = 11;
11651184
}
11661185

1186+
message AutoscalingSettings {
1187+
// Strategy of autoscaling.
1188+
AutoscalingStrategy strategy = 1;
1189+
1190+
// Auto merge would stop working when the partitions count reaches min_active_partitions.
1191+
// Zero value means default - 1.
1192+
int64 min_active_partitions = 2 [(Ydb.value) = ">= 0"];
1193+
// Auto split would stop working when the partitions count reaches max_active_partitions.
1194+
// Zero value means default - 1.
1195+
int64 max_active_partitions = 3 [(Ydb.value) = ">= 0"];
1196+
// Limit for total partition count, including active (open for write) and read-only partitions.
1197+
// Zero value means default - 100.
1198+
int64 partition_count_limit = 4 [(Ydb.value) = ">= 0", deprecated = true];
1199+
1200+
// Partition write speed autoscaling options.
1201+
AutoscalingPartitionWriteSpeedStrategy partition_write_speed = 5;
1202+
}
1203+
1204+
message AutoscalingPartitionWriteSpeedStrategy {
1205+
//Partition will be autoscaled up (divided into 2 partitions)
1206+
//after write speed to the partition exceeds scale_up_threshold_percent (in percentage of maximum write speed to the partition) for the period of time threshold_time_seconds
1207+
1208+
//Partition will become a candidate to the autoscaling down
1209+
//after write speed doesn’t reach scale_down_threshold_percent (in percentage of maximum write speed to the partition) for the period of time threshold_time_seconds
1210+
//This candidate partition will be autoscaled down when other neighbour partition will become a candidate to the autoscaling down and not earlier than a retention period.
1211+
1212+
// Zero value means default - 300.
1213+
google.protobuf.Duration threshold_time = 1;
1214+
// Zero value means default - 90.
1215+
int32 scale_up_threshold_percent = 2 [(Ydb.value) = ">= 0"];
1216+
// Zero value means default - 30.
1217+
int32 scale_down_threshold_percent = 3 [(Ydb.value) = ">= 0"];
1218+
}
1219+
11671220
/**
11681221
* Create topic request sent from client to server.
11691222
*/

src/api/protos/ydb_topic.proto

Lines changed: 89 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,7 @@ message StreamReadMessage {
306306
StopPartitionSessionRequest stop_partition_session_request = 9;
307307

308308
UpdatePartitionSession update_partition_session = 10;
309+
EndPartitionSession end_partition_session = 11;
309310
}
310311
}
311312

@@ -320,6 +321,8 @@ message StreamReadMessage {
320321
string reader_name = 3;
321322
// Direct reading from a partition node.
322323
bool direct_read = 4;
324+
// Indicates that the SDK supports autoscaling.
325+
bool autoscaling_support = 5;
323326

324327
message TopicReadSettings {
325328
// Topic path.
@@ -558,6 +561,20 @@ message StreamReadMessage {
558561
int64 direct_read_id = 2;
559562
}
560563

564+
// Signal from server that client has finished reading the partition and all messages have been read.
565+
// Once a partition has been finished no further messages will ever arrive to that partition.
566+
// This command is a hint to the client to commit offsets, after which the child partitions will be balanced independently in different reading sessions.
567+
// Unlike StopPartitionSessionRequest, the client does not have to close the reading session.
568+
// Client should not send a response to the command.
569+
message EndPartitionSession {
570+
// Partition session identifier.
571+
int64 partition_session_id = 1;
572+
573+
// Ids of partitions which were merged with the ended partition.
574+
repeated int64 adjacent_partition_ids = 2;
575+
// Ids of partitions which was formed when the ended partition was split or merged.
576+
repeated int64 child_partition_ids = 3;
577+
}
561578
}
562579

563580
// Messages for bidirectional streaming rpc StreamDirectRead
@@ -796,24 +813,93 @@ message AlterConsumer {
796813
map<string, string> alter_attributes = 6;
797814
}
798815

816+
enum AutoscalingStrategy {
817+
// The autoscaling algorithm is not specified. The default value will be used.
818+
AUTOSCALING_STRATEGY_UNSPECIFIED = 0;
819+
// The autoscaling is disabled.
820+
AUTOSCALING_STRATEGY_DISABLED = 1;
821+
// The autoscaling algorithm will increase partitions count depending on the load characteristics.
822+
// The autoscaling algorithm will never decrease the number of partitions.
823+
AUTOSCALING_STRATEGY_SCALE_UP = 2;
824+
// The autoscaling algorithm will both increase and decrease partitions count depending on the load characteristics.
825+
AUTOSCALING_STRATEGY_SCALE_UP_AND_DOWN = 3;
826+
}
827+
799828
// Partitioning settings for topic.
800829
message PartitioningSettings {
801-
// Minimum partition count auto merge would stop working at.
830+
// Auto merge would stop working when the partitions count reaches min_active_partitions
802831
// Zero value means default - 1.
803832
int64 min_active_partitions = 1 [(Ydb.value) = ">= 0"];
833+
// Auto split would stop working when the partitions count reaches max_active_partitions
834+
// Zero value means default - 1.
835+
int64 max_active_partitions = 3 [(Ydb.value) = ">= 0"];
804836
// Limit for total partition count, including active (open for write) and read-only partitions.
805837
// Zero value means default - 100.
806-
int64 partition_count_limit = 2 [(Ydb.value) = ">= 0"];
838+
// Use max_active_partitions
839+
int64 partition_count_limit = 2 [(Ydb.value) = ">= 0", deprecated = true];
840+
// Settings for the partitions count autoscaling.
841+
AutoscalingSettings autoscaling_settings = 4;
842+
}
843+
844+
message AutoscalingSettings {
845+
// Strategy of autoscaling.
846+
AutoscalingStrategy strategy = 1;
847+
// Partition write speed autoscaling options.
848+
AutoscalingPartitionWriteSpeedStrategy partition_write_speed = 2;
849+
}
850+
851+
message AutoscalingPartitionWriteSpeedStrategy {
852+
//Partition will be autoscaled up (divided into 2 partitions)
853+
//after write speed to the partition exceeds scale_up_threshold_percent (in percentage of maximum write speed to the partition) for the period of time threshold_time_seconds
854+
855+
//Partition will become a candidate to the autoscaling down
856+
//after write speed doesn’t reach scale_down_threshold_percent (in percentage of maximum write speed to the partition) for the period of time threshold_time_seconds
857+
//This candidate partition will be autoscaled down when other neighbour partition will become a candidate to the autoscaling down and not earlier than a retention period.
858+
859+
// Zero value means default - 300.
860+
google.protobuf.Duration threshold_time = 1;
861+
// Zero value means default - 90.
862+
int32 scale_up_threshold_percent = 2 [(Ydb.value) = ">= 0"];
863+
// Zero value means default - 30.
864+
int32 scale_down_threshold_percent = 3 [(Ydb.value) = ">= 0"];
807865
}
808866

809867
// Partitioning settings for topic.
810868
message AlterPartitioningSettings {
811869
// Minimum partition count auto merge would stop working at.
812870
// Zero value means default - 1.
813871
optional int64 set_min_active_partitions = 1 [(Ydb.value) = ">= 0"];
872+
// Maximum partition count auto merge would stop working at.
873+
// Zero value means default - 1.
874+
optional int64 max_active_partitions = 3 [(Ydb.value) = ">= 0"];
814875
// Limit for total partition count, including active (open for write) and read-only partitions.
815876
// Zero value means default - 100.
816-
optional int64 set_partition_count_limit = 2 [(Ydb.value) = ">= 0"];
877+
// Use set_max_active_partitions
878+
optional int64 set_partition_count_limit = 2 [(Ydb.value) = ">= 0", deprecated = true];
879+
// Settings for autoscaling the partition number
880+
optional AlterAutoscalingSettings alter_autoscaling_settings = 4;
881+
}
882+
883+
message AlterAutoscalingSettings {
884+
// Strategy of autoscaling
885+
optional AutoscalingStrategy set_strategy = 1;
886+
// Autoscaling partition write speed options.
887+
optional AlterAutoscalingPartitionWriteSpeedStrategy set_partition_write_speed = 2;
888+
}
889+
890+
message AlterAutoscalingPartitionWriteSpeedStrategy {
891+
// The time of exceeding the threshold value, after which the partition will be
892+
// autoscaling.
893+
// Zero value means default - 300.
894+
optional google.protobuf.Duration set_threshold_time = 1;
895+
// The threshold value of the write speed to the partition as a percentage, when exceeded,
896+
// the partition will be auto split.
897+
// Zero value means default - 90.
898+
optional int32 set_scale_up_threshold_percent = 2 [(Ydb.value) = ">= 0"];
899+
// The threshold value of the write speed to the partition as a percentage, if it is not reached,
900+
// the partition will be auto merged.
901+
// Zero value means default - 30.
902+
optional int32 set_scale_down_threshold_percent = 3 [(Ydb.value) = ">= 0"];
817903
}
818904

819905
// Metering mode specifies the method used to determine consumption of resources by the topic.

src/client/federated_topic/impl/federated_read_session.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ NTopic::TReadSessionSettings FromFederated(const TFederatedReadSessionSettings&
5050
MAYBE_CONVERT_HANDLER(TReadSessionEvent::TCommitOffsetAcknowledgementEvent, CommitOffsetAcknowledgementHandler);
5151
MAYBE_CONVERT_HANDLER(TReadSessionEvent::TStartPartitionSessionEvent, StartPartitionSessionHandler);
5252
MAYBE_CONVERT_HANDLER(TReadSessionEvent::TStopPartitionSessionEvent, StopPartitionSessionHandler);
53+
MAYBE_CONVERT_HANDLER(TReadSessionEvent::TEndPartitionSessionEvent, EndPartitionSessionHandler);
5354
MAYBE_CONVERT_HANDLER(TReadSessionEvent::TPartitionSessionStatusEvent, PartitionSessionStatusHandler);
5455
MAYBE_CONVERT_HANDLER(TReadSessionEvent::TPartitionSessionClosedEvent, PartitionSessionClosedHandler);
5556
MAYBE_CONVERT_HANDLER(TReadSessionEvent::TEvent, CommonHandler);

src/client/federated_topic/impl/federated_read_session_event.cpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ using namespace NFederatedTopic;
1111
using TCommitOffsetAcknowledgementEvent = NFederatedTopic::TReadSessionEvent::TFederated<NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent>;
1212
using TStartPartitionSessionEvent = NFederatedTopic::TReadSessionEvent::TFederated<NTopic::TReadSessionEvent::TStartPartitionSessionEvent>;
1313
using TStopPartitionSessionEvent = NFederatedTopic::TReadSessionEvent::TFederated<NTopic::TReadSessionEvent::TStopPartitionSessionEvent>;
14+
using TEndPartitionSessionEvent = NFederatedTopic::TReadSessionEvent::TFederated<NTopic::TReadSessionEvent::TEndPartitionSessionEvent>;
1415
using TPartitionSessionStatusEvent = NFederatedTopic::TReadSessionEvent::TFederated<NTopic::TReadSessionEvent::TPartitionSessionStatusEvent>;
1516
using TPartitionSessionClosedEvent = NFederatedTopic::TReadSessionEvent::TFederated<NTopic::TReadSessionEvent::TPartitionSessionClosedEvent>;
1617
using TMessage = NFederatedTopic::TReadSessionEvent::TFederated<NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage>;
@@ -78,6 +79,20 @@ void TPrintable<TStopPartitionSessionEvent>::DebugString(TStringBuilder& ret, bo
7879
<< " }";
7980
}
8081

82+
void JoinIds(TStringBuilder& ret, const std::vector<ui32> ids);
83+
84+
template<>
85+
void TPrintable<TEndPartitionSessionEvent>::DebugString(TStringBuilder& ret, bool) const {
86+
const auto* self = static_cast<const TEndPartitionSessionEvent*>(this);
87+
ret << "EndPartitionSession {";
88+
self->GetFederatedPartitionSession()->DebugString(ret);
89+
ret << " AdjacentPartitionIds: ";
90+
JoinIds(ret, self->GetAdjacentPartitionIds());
91+
ret << " ChildPartitionIds: ";
92+
JoinIds(ret, self->GetChildPartitionIds());
93+
ret << " }";
94+
}
95+
8196
template<>
8297
void TPrintable<TPartitionSessionStatusEvent>::DebugString(TStringBuilder& ret, bool) const {
8398
const auto* self = static_cast<const TPartitionSessionStatusEvent*>(this);

0 commit comments

Comments
 (0)