@@ -150,8 +150,10 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
150
150
const TTxId& txId,
151
151
const NActors::TActorId selfId,
152
152
TActorId rowDispatcherActorId,
153
+ ui64 partitionId,
153
154
ui64 eventQueueId)
154
- : RowDispatcherActorId(rowDispatcherActorId) {
155
+ : RowDispatcherActorId(rowDispatcherActorId)
156
+ , PartitionId(partitionId) {
155
157
EventsQueue.Init (txId, selfId, selfId, eventQueueId, /* KeepAlive */ true );
156
158
EventsQueue.OnNewRecipientId (rowDispatcherActorId);
157
159
}
@@ -160,12 +162,15 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
160
162
ui64 NextOffset = 0 ;
161
163
bool IsWaitingRowDispatcherResponse = false ;
162
164
NYql::NDq::TRetryEventsQueue EventsQueue;
163
- bool NewDataArrived = false ;
165
+ bool HasPendingData = false ;
164
166
TActorId RowDispatcherActorId;
167
+ ui64 PartitionId;
165
168
};
166
169
167
170
TMap<ui64, SessionInfo> Sessions;
168
171
const THolderFactory& HolderFactory;
172
+ const i64 MaxBufferSize;
173
+ i64 ReadyBufferSizeBytes = 0 ;
169
174
170
175
public:
171
176
TDqPqRdReadActor (
@@ -179,7 +184,8 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
179
184
const NActors::TActorId& computeActorId,
180
185
const NActors::TActorId& localRowDispatcherActorId,
181
186
const TString& token,
182
- const ::NMonitoring::TDynamicCounterPtr& counters);
187
+ const ::NMonitoring::TDynamicCounterPtr& counters,
188
+ i64 bufferSize);
183
189
184
190
void Handle (NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPtr& ev);
185
191
void Handle (NFq::TEvRowDispatcher::TEvCoordinatorResult::TPtr& ev);
@@ -233,6 +239,7 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
233
239
void StopSessions ();
234
240
void ReInit (const TString& reason);
235
241
void PrintInternalState ();
242
+ void TrySendGetNextBatch (SessionInfo& sessionInfo);
236
243
};
237
244
238
245
TDqPqRdReadActor::TDqPqRdReadActor (
@@ -246,13 +253,15 @@ TDqPqRdReadActor::TDqPqRdReadActor(
246
253
const NActors::TActorId& computeActorId,
247
254
const NActors::TActorId& localRowDispatcherActorId,
248
255
const TString& token,
249
- const ::NMonitoring::TDynamicCounterPtr& counters)
256
+ const ::NMonitoring::TDynamicCounterPtr& counters,
257
+ i64 bufferSize)
250
258
: TActor<TDqPqRdReadActor>(&TDqPqRdReadActor::StateFunc)
251
259
, TDqPqReadActorBase (inputIndex, taskId, this ->SelfId (), txId, std::move (sourceParams), std::move (readParams), computeActorId)
252
260
, Token (token)
253
261
, LocalRowDispatcherActorId (localRowDispatcherActorId)
254
262
, Metrics (txId, taskId, counters)
255
263
, HolderFactory (holderFactory)
264
+ , MaxBufferSize (bufferSize)
256
265
{
257
266
MetadataFields.reserve (SourceParams.MetadataFieldsSize ());
258
267
TPqMetaExtractor fieldsExtractor;
@@ -375,23 +384,28 @@ i64 TDqPqRdReadActor::GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& b
375
384
buffer.clear ();
376
385
do {
377
386
auto & readyBatch = ReadyBuffer.front ();
378
- SRC_LOG_T (" Return " << readyBatch.Data .size () << " items" );
379
387
380
388
for (const auto & message : readyBatch.Data ) {
381
389
auto [item, size] = CreateItem (message);
382
390
buffer.push_back (std::move (item));
383
391
}
384
392
usedSpace += readyBatch.UsedSpace ;
385
393
freeSpace -= readyBatch.UsedSpace ;
386
- SRC_LOG_T (" usedSpace " << usedSpace);
387
- SRC_LOG_T (" freeSpace " << freeSpace);
388
-
389
394
TPartitionKey partitionKey{TString{}, readyBatch.PartitionId };
390
395
PartitionToOffset[partitionKey] = readyBatch.NextOffset ;
391
396
SRC_LOG_T (" NextOffset " << readyBatch.NextOffset );
392
397
ReadyBuffer.pop ();
393
398
} while (freeSpace > 0 && !ReadyBuffer.empty ());
394
399
400
+ ReadyBufferSizeBytes -= usedSpace;
401
+ SRC_LOG_T (" Return " << buffer.RowCount () << " rows, buffer size " << ReadyBufferSizeBytes << " , free space " << freeSpace << " , result size " << usedSpace);
402
+
403
+ if (!ReadyBuffer.empty ()) {
404
+ Send (ComputeActorId, new TEvNewAsyncInputDataArrived (InputIndex));
405
+ }
406
+ for (auto & [partitionId, sessionInfo] : Sessions) {
407
+ TrySendGetNextBatch (sessionInfo);
408
+ }
395
409
ProcessState ();
396
410
return usedSpace;
397
411
}
@@ -490,11 +504,8 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev
490
504
SRC_LOG_W (" Wrong seq num ignore message, seqNo " << meta.GetSeqNo ());
491
505
return ;
492
506
}
493
- sessionInfo.NewDataArrived = true ;
494
- Metrics.InFlyGetNextBatch ->Inc ();
495
- auto event = std::make_unique<NFq::TEvRowDispatcher::TEvGetNextBatch>();
496
- event->Record .SetPartitionId (partitionId);
497
- sessionInfo.EventsQueue .Send (event.release ());
507
+ sessionInfo.HasPendingData = true ;
508
+ TrySendGetNextBatch (sessionInfo);
498
509
}
499
510
500
511
void TDqPqRdReadActor::Handle (const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPtr& ev) {
@@ -578,7 +589,7 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvCoordinatorResult::TPtr&
578
589
Sessions.emplace (
579
590
std::piecewise_construct,
580
591
std::forward_as_tuple (partitionId),
581
- std::forward_as_tuple (TxId, SelfId (), rowDispatcherActorId, partitionId));
592
+ std::forward_as_tuple (TxId, SelfId (), rowDispatcherActorId, partitionId, partitionId ));
582
593
}
583
594
}
584
595
}
@@ -637,11 +648,12 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvMessageBatch::TPtr& ev)
637
648
for (const auto & message : ev->Get ()->Record .GetMessages ()) {
638
649
SRC_LOG_T (" Json: " << message.GetJson ());
639
650
activeBatch.Data .emplace_back (message.GetJson ());
640
- activeBatch.UsedSpace += message.GetJson ().size ();
641
651
sessionInfo.NextOffset = message.GetOffset () + 1 ;
642
652
bytes += message.GetJson ().size ();
643
653
SRC_LOG_T (" TEvMessageBatch NextOffset " << sessionInfo.NextOffset );
644
654
}
655
+ activeBatch.UsedSpace = bytes;
656
+ ReadyBufferSizeBytes += bytes;
645
657
IngressStats.Bytes += bytes;
646
658
IngressStats.Chunks ++;
647
659
activeBatch.NextOffset = ev->Get ()->Record .GetNextMessageOffset ();
@@ -697,6 +709,20 @@ void TDqPqRdReadActor::Handle(TEvPrivate::TEvProcessState::TPtr&) {
697
709
ProcessState ();
698
710
}
699
711
712
+ void TDqPqRdReadActor::TrySendGetNextBatch (SessionInfo& sessionInfo) {
713
+ if (!sessionInfo.HasPendingData ) {
714
+ return ;
715
+ }
716
+ if (ReadyBufferSizeBytes > MaxBufferSize) {
717
+ return ;
718
+ }
719
+ Metrics.InFlyGetNextBatch ->Inc ();
720
+ auto event = std::make_unique<NFq::TEvRowDispatcher::TEvGetNextBatch>();
721
+ sessionInfo.HasPendingData = false ;
722
+ event->Record .SetPartitionId (sessionInfo.PartitionId );
723
+ sessionInfo.EventsQueue .Send (event.release ());
724
+ }
725
+
700
726
std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqRdReadActor (
701
727
NPq::NProto::TDqPqTopicSource&& settings,
702
728
ui64 inputIndex,
@@ -709,7 +735,7 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqRdReadActor(
709
735
const NActors::TActorId& localRowDispatcherActorId,
710
736
const NKikimr::NMiniKQL::THolderFactory& holderFactory,
711
737
const ::NMonitoring::TDynamicCounterPtr& counters,
712
- i64 /* bufferSize*/ ) // TODO
738
+ i64 bufferSize)
713
739
{
714
740
auto taskParamsIt = taskParams.find (" pq" );
715
741
YQL_ENSURE (taskParamsIt != taskParams.end (), " Failed to get pq task params" );
@@ -731,7 +757,8 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqRdReadActor(
731
757
computeActorId,
732
758
localRowDispatcherActorId,
733
759
token,
734
- counters
760
+ counters,
761
+ bufferSize
735
762
);
736
763
737
764
return {actor, actor};
0 commit comments