@@ -218,6 +218,15 @@ void TRawPartitionStreamEventQueue<UseMigrationProtocol>::DeleteNotReadyTail(TDe
218
218
swap (ready, NotReady);
219
219
}
220
220
221
+ // //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
222
+ // TDecompressionQueueItem
223
+
224
+ template <bool UseMigrationProtocol>
225
+ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::TDecompressionQueueItem::OnDestroyReadSession()
226
+ {
227
+ BatchInfo->OnDestroyReadSession ();
228
+ }
229
+
221
230
// //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
222
231
// TSingleClusterReadSessionImpl
223
232
@@ -226,6 +235,10 @@ TSingleClusterReadSessionImpl<UseMigrationProtocol>::~TSingleClusterReadSessionI
226
235
for (auto && [_, partitionStream] : PartitionStreams) {
227
236
partitionStream->ClearQueue ();
228
237
}
238
+
239
+ for (auto & e : DecompressionQueue) {
240
+ e.OnDestroyReadSession ();
241
+ }
229
242
}
230
243
231
244
template <bool UseMigrationProtocol>
@@ -1565,6 +1578,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnDecompressionInfoDes
1565
1578
1566
1579
template <bool UseMigrationProtocol>
1567
1580
void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnDataDecompressed (i64 sourceSize, i64 estimatedDecompressedSize, i64 decompressedSize, size_t messagesCount, i64 serverBytesSize) {
1581
+
1568
1582
TDeferredActions<UseMigrationProtocol> deferred;
1569
1583
1570
1584
Y_ABORT_UNLESS (DecompressionTasksInflight > 0 );
@@ -2512,6 +2526,14 @@ void TDataDecompressionInfo<UseMigrationProtocol>::PlanDecompressionTasks(double
2512
2526
}
2513
2527
}
2514
2528
2529
+ template <bool UseMigrationProtocol>
2530
+ void TDataDecompressionInfo<UseMigrationProtocol>::OnDestroyReadSession ()
2531
+ {
2532
+ for (auto & task : Tasks) {
2533
+ task.ClearParent ();
2534
+ }
2535
+ }
2536
+
2515
2537
template <bool UseMigrationProtocol>
2516
2538
void TDataDecompressionEvent<UseMigrationProtocol>::TakeData (TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream,
2517
2539
std::vector<typename TADataReceivedEvent<UseMigrationProtocol>::TMessage>& messages,
@@ -2661,19 +2683,23 @@ TDataDecompressionInfo<UseMigrationProtocol>::TDecompressionTask::TDecompression
2661
2683
2662
2684
template <bool UseMigrationProtocol>
2663
2685
void TDataDecompressionInfo<UseMigrationProtocol>::TDecompressionTask::operator ()() {
2686
+ auto parent = Parent;
2687
+ if (!parent) {
2688
+ return ;
2689
+ }
2664
2690
i64 minOffset = Max<i64 >();
2665
2691
i64 maxOffset = 0 ;
2666
- const i64 partition_id = [this ](){
2692
+ const i64 partition_id = [parent ](){
2667
2693
if constexpr (UseMigrationProtocol) {
2668
- return Parent ->ServerMessage .partition ();
2694
+ return parent ->ServerMessage .partition ();
2669
2695
} else {
2670
- return Parent ->ServerMessage .partition_session_id ();
2696
+ return parent ->ServerMessage .partition_session_id ();
2671
2697
}
2672
2698
}();
2673
2699
i64 dataProcessed = 0 ;
2674
2700
size_t messagesProcessed = 0 ;
2675
2701
for (const TMessageRange& messages : Messages) {
2676
- auto & batch = *Parent ->ServerMessage .mutable_batches (messages.Batch );
2702
+ auto & batch = *parent ->ServerMessage .mutable_batches (messages.Batch );
2677
2703
for (size_t i = messages.MessageRange .first ; i < messages.MessageRange .second ; ++i) {
2678
2704
auto & data = *batch.mutable_message_data (i);
2679
2705
@@ -2684,7 +2710,7 @@ void TDataDecompressionInfo<UseMigrationProtocol>::TDecompressionTask::operator(
2684
2710
2685
2711
try {
2686
2712
if constexpr (UseMigrationProtocol) {
2687
- if (Parent ->DoDecompress
2713
+ if (parent ->DoDecompress
2688
2714
&& data.codec () != Ydb::PersQueue::V1::CODEC_RAW
2689
2715
&& data.codec () != Ydb::PersQueue::V1::CODEC_UNSPECIFIED
2690
2716
) {
@@ -2694,7 +2720,7 @@ void TDataDecompressionInfo<UseMigrationProtocol>::TDecompressionTask::operator(
2694
2720
data.set_codec (Ydb::PersQueue::V1::CODEC_RAW);
2695
2721
}
2696
2722
} else {
2697
- if (Parent ->DoDecompress
2723
+ if (parent ->DoDecompress
2698
2724
&& static_cast <Ydb::Topic::Codec>(batch.codec ()) != Ydb::Topic::CODEC_RAW
2699
2725
&& static_cast <Ydb::Topic::Codec>(batch.codec ()) != Ydb::Topic::CODEC_UNSPECIFIED
2700
2726
) {
@@ -2706,32 +2732,38 @@ void TDataDecompressionInfo<UseMigrationProtocol>::TDecompressionTask::operator(
2706
2732
2707
2733
DecompressedSize += data.data ().size ();
2708
2734
} catch (...) {
2709
- Parent ->PutDecompressionError (std::current_exception (), messages.Batch , i);
2735
+ parent ->PutDecompressionError (std::current_exception (), messages.Batch , i);
2710
2736
data.clear_data (); // Free memory, because we don't count it.
2711
2737
2712
- if (auto session = Parent ->CbContext ->LockShared ()) {
2738
+ if (auto session = parent ->CbContext ->LockShared ()) {
2713
2739
session->GetLog () << TLOG_INFO << " Error decompressing data: " << CurrentExceptionMessage ();
2714
2740
}
2715
2741
}
2716
2742
}
2717
2743
}
2718
- if (auto session = Parent ->CbContext ->LockShared ()) {
2744
+ if (auto session = parent ->CbContext ->LockShared ()) {
2719
2745
LOG_LAZY (session->GetLog (), TLOG_DEBUG, TStringBuilder () << " Decompression task done. Partition/PartitionSessionId: "
2720
2746
<< partition_id << " (" << minOffset << " -"
2721
2747
<< maxOffset << " )" );
2722
2748
}
2723
2749
Y_ASSERT (dataProcessed == SourceDataSize);
2724
2750
2725
- Parent ->OnDataDecompressed (SourceDataSize, EstimatedDecompressedSize, DecompressedSize, messagesProcessed);
2751
+ parent ->OnDataDecompressed (SourceDataSize, EstimatedDecompressedSize, DecompressedSize, messagesProcessed);
2726
2752
2727
- Parent ->SourceDataNotProcessed -= dataProcessed;
2753
+ parent ->SourceDataNotProcessed -= dataProcessed;
2728
2754
Ready->Ready = true ;
2729
2755
2730
- if (auto session = Parent ->CbContext ->LockShared ()) {
2756
+ if (auto session = parent ->CbContext ->LockShared ()) {
2731
2757
session->GetEventsQueue ()->SignalReadyEvents (PartitionStream);
2732
2758
}
2733
2759
}
2734
2760
2761
+ template <bool UseMigrationProtocol>
2762
+ void TDataDecompressionInfo<UseMigrationProtocol>::TDecompressionTask::ClearParent ()
2763
+ {
2764
+ Parent = nullptr ;
2765
+ }
2766
+
2735
2767
// //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
2736
2768
// TUserRetrievedEventsInfoAccumulator
2737
2769
@@ -2769,7 +2801,7 @@ void TDeferredActions<UseMigrationProtocol>::DeferReadFromProcessor(const typena
2769
2801
}
2770
2802
2771
2803
template <bool UseMigrationProtocol>
2772
- void TDeferredActions<UseMigrationProtocol>::DeferStartExecutorTask (const typename IAExecutor<UseMigrationProtocol>::TPtr& executor, typename IAExecutor<UseMigrationProtocol>::TFunction task) {
2804
+ void TDeferredActions<UseMigrationProtocol>::DeferStartExecutorTask (const typename IAExecutor<UseMigrationProtocol>::TPtr& executor, typename IAExecutor<UseMigrationProtocol>::TFunction&& task) {
2773
2805
ExecutorsTasks.emplace_back (executor, std::move (task));
2774
2806
}
2775
2807
0 commit comments