@@ -488,7 +488,9 @@ struct TRawPartitionStreamEvent {
488
488
template <bool UseMigrationProtocol>
489
489
class TRawPartitionStreamEventQueue {
490
490
public:
491
- TRawPartitionStreamEventQueue () = default ;
491
+ TRawPartitionStreamEventQueue (TCallbackContextPtr<UseMigrationProtocol> cbContext)
492
+ : CbContext(cbContext) {
493
+ };
492
494
493
495
template <class ... Ts>
494
496
TRawPartitionStreamEvent<UseMigrationProtocol>& emplace_back (Ts&&... event)
@@ -549,6 +551,7 @@ class TRawPartitionStreamEventQueue {
549
551
std::deque<TRawPartitionStreamEvent<UseMigrationProtocol>>& queue);
550
552
551
553
private:
554
+ TCallbackContextPtr<UseMigrationProtocol> CbContext;
552
555
std::deque<TRawPartitionStreamEvent<UseMigrationProtocol>> Ready;
553
556
std::deque<TRawPartitionStreamEvent<UseMigrationProtocol>> NotReady;
554
557
};
@@ -582,6 +585,7 @@ class TPartitionStreamImpl : public TAPartitionStream<UseMigrationProtocol> {
582
585
, AssignId(assignId)
583
586
, FirstNotReadOffset(readOffset)
584
587
, CbContext(std::move(cbContext))
588
+ , EventsQueue(CbContext)
585
589
{
586
590
TAPartitionStream<true >::PartitionStreamId = partitionStreamId;
587
591
TAPartitionStream<true >::TopicPath = std::move (topicPath);
@@ -602,6 +606,7 @@ class TPartitionStreamImpl : public TAPartitionStream<UseMigrationProtocol> {
602
606
, AssignId(static_cast <ui64>(assignId))
603
607
, FirstNotReadOffset(static_cast <ui64>(readOffset))
604
608
, CbContext(std::move(cbContext))
609
+ , EventsQueue(CbContext)
605
610
{
606
611
TAPartitionStream<false >::PartitionSessionId = partitionStreamId;
607
612
TAPartitionStream<false >::TopicPath = std::move (topicPath);
@@ -624,6 +629,7 @@ class TPartitionStreamImpl : public TAPartitionStream<UseMigrationProtocol> {
624
629
625
630
void ConfirmCreate (std::optional<ui64> readOffset, std::optional<ui64> commitOffset);
626
631
void ConfirmDestroy ();
632
+ void ConfirmEnd (const std::vector<ui32>& childIds);
627
633
628
634
void StopReading () /* override*/ ;
629
635
void ResumeReading () /* override*/ ;
@@ -763,6 +769,7 @@ class TPartitionStreamImpl : public TAPartitionStream<UseMigrationProtocol> {
763
769
std::mutex Lock;
764
770
};
765
771
772
+
766
773
template <bool UseMigrationProtocol>
767
774
class TReadSessionEventsQueue : public TBaseSessionEventsQueue <TAReadSessionSettings<UseMigrationProtocol>,
768
775
typename TAReadSessionEvent<UseMigrationProtocol>::TEvent,
@@ -822,7 +829,8 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti
822
829
}
823
830
824
831
bool TryApplyCallbackToEventImpl (typename TParent::TEvent& event,
825
- TDeferredActions<UseMigrationProtocol>& deferred);
832
+ TDeferredActions<UseMigrationProtocol>& deferred,
833
+ TCallbackContextPtr<UseMigrationProtocol>& cbContext);
826
834
bool HasDataEventCallback () const ;
827
835
void ApplyCallbackToEventImpl (TADataReceivedEvent<UseMigrationProtocol>& event,
828
836
TUserRetrievedEventsInfoAccumulator<UseMigrationProtocol>&& eventsInfo,
@@ -858,13 +866,19 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti
858
866
859
867
void ClearAllEvents ();
860
868
869
+ void SetCallbackContext (TCallbackContextPtr<UseMigrationProtocol>& ctx) {
870
+ CbContext = ctx;
871
+ }
872
+
861
873
private:
862
874
struct THandlersVisitor : public TParent ::TBaseHandlersVisitor {
863
875
THandlersVisitor (const TAReadSessionSettings<UseMigrationProtocol>& settings,
864
876
typename TParent::TEvent& event,
865
- TDeferredActions<UseMigrationProtocol>& deferred)
877
+ TDeferredActions<UseMigrationProtocol>& deferred,
878
+ TCallbackContextPtr<UseMigrationProtocol>& cbContext)
866
879
: TParent::TBaseHandlersVisitor(settings, event)
867
- , Deferred(deferred) {
880
+ , Deferred(deferred)
881
+ , CbContext(cbContext) {
868
882
}
869
883
870
884
#define DECLARE_HANDLER (type, handler, answer ) \
@@ -879,7 +893,7 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti
879
893
} \
880
894
/* */
881
895
882
- #define DECLARE_TEMPLATE_HANDLER (type_true, type_false, handler_true, handler_false, answer ) \
896
+ #define DECLARE_TEMPLATE_HANDLER (type_true, type_false, handler_true, handler_false ) \
883
897
bool operator ()(std::conditional_t <UseMigrationProtocol, type_true, type_false>&) { \
884
898
if (this ->template PushHandler <std::conditional_t <UseMigrationProtocol, type_true, type_false>>( \
885
899
std::move (TParent::TBaseHandlersVisitor::Event), \
@@ -891,7 +905,7 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti
891
905
} \
892
906
}(), \
893
907
this ->Settings .EventHandlers_ .CommonHandler_ )) { \
894
- return answer; \
908
+ return true ; \
895
909
} \
896
910
return false ; \
897
911
} \
@@ -900,50 +914,81 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti
900
914
DECLARE_TEMPLATE_HANDLER (typename TAReadSessionEvent<true >::TDataReceivedEvent,
901
915
typename TAReadSessionEvent<false >::TDataReceivedEvent,
902
916
DataReceivedHandler_,
903
- DataReceivedHandler_,
904
- true );
917
+ DataReceivedHandler_);
905
918
DECLARE_TEMPLATE_HANDLER (typename TAReadSessionEvent<true >::TCommitAcknowledgementEvent,
906
919
typename TAReadSessionEvent<false >::TCommitOffsetAcknowledgementEvent,
907
920
CommitAcknowledgementHandler_,
908
- CommitOffsetAcknowledgementHandler_,
909
- true );
921
+ CommitOffsetAcknowledgementHandler_);
910
922
DECLARE_TEMPLATE_HANDLER (typename TAReadSessionEvent<true >::TCreatePartitionStreamEvent,
911
923
typename TAReadSessionEvent<false >::TStartPartitionSessionEvent,
912
924
CreatePartitionStreamHandler_,
913
- StartPartitionSessionHandler_,
914
- true );
925
+ StartPartitionSessionHandler_);
915
926
DECLARE_TEMPLATE_HANDLER (typename TAReadSessionEvent<true >::TDestroyPartitionStreamEvent,
916
927
typename TAReadSessionEvent<false >::TStopPartitionSessionEvent,
917
928
DestroyPartitionStreamHandler_,
918
- StopPartitionSessionHandler_,
919
- true );
929
+ StopPartitionSessionHandler_);
920
930
DECLARE_TEMPLATE_HANDLER (typename TAReadSessionEvent<true >::TPartitionStreamStatusEvent,
921
931
typename TAReadSessionEvent<false >::TPartitionSessionStatusEvent,
922
932
PartitionStreamStatusHandler_,
923
- PartitionSessionStatusHandler_,
924
- true );
925
- DECLARE_TEMPLATE_HANDLER (typename TAReadSessionEvent<true >::TPartitionStreamClosedEvent,
926
- typename TAReadSessionEvent<false >::TPartitionSessionClosedEvent,
927
- PartitionStreamClosedHandler_,
928
- PartitionSessionClosedHandler_,
929
- true );
933
+ PartitionSessionStatusHandler_);
930
934
DECLARE_HANDLER (TASessionClosedEvent<UseMigrationProtocol>, SessionClosedHandler_, false ); // Not applied
931
935
932
936
#undef DECLARE_HANDLER
933
937
#undef DECLARE_TEMPLATE_HANDLER
934
938
939
+ bool operator ()(std::conditional_t <UseMigrationProtocol, typename TAReadSessionEvent<true >::TPartitionStreamClosedEvent, typename TAReadSessionEvent<false >::TPartitionSessionClosedEvent>&) {
940
+ auto specific = [this ]() {
941
+ if constexpr (UseMigrationProtocol) {
942
+ return this ->Settings .EventHandlers_ .PartitionStreamClosedHandler_ ;
943
+ } else {
944
+ return this ->Settings .EventHandlers_ .PartitionSessionClosedHandler_ ;
945
+ }
946
+ }();
947
+
948
+ if (!specific && !this ->Settings .EventHandlers_ .CommonHandler_ ) {
949
+ return false ;
950
+ }
951
+
952
+ this ->template PushCommonHandler <>(
953
+ std::move (TParent::TBaseHandlersVisitor::Event),
954
+ [specific = specific,
955
+ common = this ->Settings .EventHandlers_ .CommonHandler_ ,
956
+ cbContext = CbContext](auto & event) {
957
+ auto & e = std::get<std::conditional_t <UseMigrationProtocol, typename TAReadSessionEvent<true >::TPartitionStreamClosedEvent, typename TAReadSessionEvent<false >::TPartitionSessionClosedEvent>>(event);
958
+ if (specific) {
959
+ specific (e);
960
+ } else if (common) {
961
+ common (event);
962
+ }
963
+ if constexpr (!UseMigrationProtocol) {
964
+ if (auto session = cbContext->LockShared ()) {
965
+ session->UnregisterPartition (e.GetPartitionSession ()->GetPartitionId (), e.GetPartitionSession ()->GetPartitionSessionId ());
966
+ }
967
+ }
968
+ });
969
+
970
+ return true ;
971
+ }
972
+
935
973
template <bool E = !UseMigrationProtocol>
936
974
constexpr std::enable_if_t <E, bool >
937
975
operator ()(typename TAReadSessionEvent<false >::TEndPartitionSessionEvent&) {
938
- if (this ->template PushHandler <typename TAReadSessionEvent<false >::TEndPartitionSessionEvent>(
939
- std::move (TParent::TBaseHandlersVisitor::Event),
940
- [this ](){
941
- return this ->Settings .EventHandlers_ .EndPartitionSessionHandler_ ;
942
- }(),
943
- this ->Settings .EventHandlers_ .CommonHandler_ )) {
976
+ if (!this ->Settings .EventHandlers_ .EndPartitionSessionHandler_ && !this ->Settings .EventHandlers_ .CommonHandler_ ) {
944
977
return false ;
945
978
}
946
- return false ;
979
+ this ->template PushCommonHandler <>(
980
+ std::move (TParent::TBaseHandlersVisitor::Event),
981
+ [specific = this ->Settings .EventHandlers_ .EndPartitionSessionHandler_ ,
982
+ common = this ->Settings .EventHandlers_ .CommonHandler_ ,
983
+ cbContext = CbContext](TReadSessionEvent::TEvent& event) {
984
+ auto & e = std::get<TReadSessionEvent::TEndPartitionSessionEvent>(event);
985
+ if (specific) {
986
+ specific (e);
987
+ } else if (common) {
988
+ common (event);
989
+ }
990
+ });
991
+ return true ;
947
992
}
948
993
949
994
bool Visit () {
@@ -955,6 +1000,7 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti
955
1000
}
956
1001
957
1002
TDeferredActions<UseMigrationProtocol>& Deferred;
1003
+ TCallbackContextPtr<UseMigrationProtocol> CbContext;
958
1004
};
959
1005
960
1006
TADataReceivedEvent<UseMigrationProtocol>
@@ -963,12 +1009,13 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti
963
1009
TUserRetrievedEventsInfoAccumulator<UseMigrationProtocol>& accumulator); // Assumes that we're under lock.
964
1010
965
1011
bool ApplyHandler (TReadSessionEventInfo<UseMigrationProtocol>& eventInfo, TDeferredActions<UseMigrationProtocol>& deferred) {
966
- THandlersVisitor visitor (this ->Settings , eventInfo.GetEvent (), deferred);
1012
+ THandlersVisitor visitor (this ->Settings , eventInfo.GetEvent (), deferred, CbContext );
967
1013
return visitor.Visit ();
968
1014
}
969
1015
970
1016
private:
971
1017
bool HasEventCallbacks;
1018
+ TCallbackContextPtr<UseMigrationProtocol> CbContext;
972
1019
};
973
1020
974
1021
} // namespace NYdb::NTopic
@@ -1044,6 +1091,7 @@ class TSingleClusterReadSessionImpl : public TEnableSelfContext<TSingleClusterRe
1044
1091
void Start ();
1045
1092
void ConfirmPartitionStreamCreate (const TPartitionStreamImpl<UseMigrationProtocol>* partitionStream, std::optional<ui64> readOffset, std::optional<ui64> commitOffset);
1046
1093
void ConfirmPartitionStreamDestroy (TPartitionStreamImpl<UseMigrationProtocol>* partitionStream);
1094
+ void ConfirmPartitionStreamEnd (TPartitionStreamImpl<UseMigrationProtocol>* partitionStream, const std::vector<ui32>& childIds);
1047
1095
void RequestPartitionStreamStatus (const TPartitionStreamImpl<UseMigrationProtocol>* partitionStream);
1048
1096
void Commit (const TPartitionStreamImpl<UseMigrationProtocol>* partitionStream, ui64 startOffset, ui64 endOffset);
1049
1097
@@ -1091,6 +1139,16 @@ class TSingleClusterReadSessionImpl : public TEnableSelfContext<TSingleClusterRe
1091
1139
return Log;
1092
1140
}
1093
1141
1142
+ void RegisterParentPartition (ui32 partitionId, ui32 parentPartitionId, ui64 parentPartitionSessionId);
1143
+ void UnregisterPartition (ui32 partitionId, ui64 partitionSessionId);
1144
+ std::vector<ui64> GetParentPartitionSessions (ui32 partitionId, ui64 partitionSessionId);
1145
+ bool AllParentSessionsHasBeenRead (ui32 partitionId, ui64 partitionSessionId);
1146
+
1147
+ void SetSelfContext (TPtr ptr) {
1148
+ TEnableSelfContext<TSingleClusterReadSessionImpl<UseMigrationProtocol>>::SetSelfContext (std::move (ptr));
1149
+ EventsQueue->SetCallbackContext (TEnableSelfContext<TSingleClusterReadSessionImpl<UseMigrationProtocol>>::SelfContext);
1150
+ }
1151
+
1094
1152
private:
1095
1153
void BreakConnectionAndReconnectImpl (TPlainStatus&& status, TDeferredActions<UseMigrationProtocol>& deferred);
1096
1154
@@ -1272,6 +1330,14 @@ class TSingleClusterReadSessionImpl : public TEnableSelfContext<TSingleClusterRe
1272
1330
std::atomic<int > DecompressionTasksInflight = 0 ;
1273
1331
i64 ReadSizeBudget;
1274
1332
i64 ReadSizeServerDelta = 0 ;
1333
+
1334
+ struct TParentInfo {
1335
+ ui32 PartitionId;
1336
+ ui64 PartitionSessionId;
1337
+ };
1338
+
1339
+ std::unordered_map<ui32, std::vector<TParentInfo>> HierarchyData;
1340
+ std::unordered_set<ui64> ReadingFinishedData;
1275
1341
};
1276
1342
1277
1343
} // namespace NYdb::NTopic
0 commit comments