@@ -18,9 +18,6 @@ void TScanHead::OnSourceReady(const std::shared_ptr<IDataSource>& source, std::s
18
18
Context->GetCommonContext ()->GetCounters ().OnSourceFinished (
19
19
source->GetRecordsCount (), source->GetUsedRawBytes (), tableExt ? tableExt->num_rows () : 0 );
20
20
21
- if ((!tableExt || !tableExt->num_rows ()) && Context->GetCommonContext ()->GetReadMetadata ()->HasLimit () && InFlightLimit < MaxInFlight) {
22
- InFlightLimit = 2 * InFlightLimit;
23
- }
24
21
source->MutableStageResult ().SetResultChunk (std::move (tableExt), startIndex, recordsCount);
25
22
while (FetchingSources.size ()) {
26
23
auto frontSource = FetchingSources.front ();
@@ -59,13 +56,18 @@ void TScanHead::OnSourceReady(const std::shared_ptr<IDataSource>& source, std::s
59
56
AFL_VERIFY (FetchingSourcesByIdx.erase (frontSource->GetSourceIdx ()));
60
57
FetchingSources.pop_front ();
61
58
frontSource->ClearResult ();
62
- if (Context->GetCommonContext ()->GetReadMetadata ()->HasLimit () && FetchingSources .size () && frontSource->GetResultRecordsCount ()) {
63
- FinishedSources. emplace (frontSource);
64
- while (FinishedSources.size () && (*FinishedSources. begin ())-> GetFinish () < FetchingSources. front ()-> GetStart ()) {
65
- auto fetchingSource = FetchingSources .front ();
59
+ if (Context->GetCommonContext ()->GetReadMetadata ()->HasLimit () && SortedSources .size () && frontSource->GetResultRecordsCount ()) {
60
+ AFL_VERIFY (FetchingInFlightSources. erase (frontSource) );
61
+ AFL_VERIFY (FinishedSources.emplace (frontSource). second );
62
+ while (FinishedSources. size () && (*FinishedSources. begin ())-> GetFinish () < SortedSources .front ()-> GetStart ()) {
66
63
auto finishedSource = *FinishedSources.begin ();
64
+ if (!finishedSource->GetResultRecordsCount () && Context->GetCommonContext ()->GetReadMetadata ()->HasLimit () &&
65
+ InFlightLimit < MaxInFlight) {
66
+ InFlightLimit = 2 * InFlightLimit;
67
+ }
67
68
FetchedCount += finishedSource->GetResultRecordsCount ();
68
69
FinishedSources.erase (FinishedSources.begin ());
70
+ --IntervalsInFlightCount;
69
71
AFL_DEBUG (NKikimrServices::TX_COLUMNSHARD)(" event" , " source_finished" )(" source_id" , finishedSource->GetSourceId ())(
70
72
" source_idx" , finishedSource->GetSourceIdx ())(" limit" , Context->GetCommonContext ()->GetReadMetadata ()->GetLimitRobust ())(
71
73
" fetched" , finishedSource->GetResultRecordsCount ());
@@ -119,14 +121,41 @@ TConclusion<bool> TScanHead::BuildNextInterval() {
119
121
if (!Context->IsActive ()) {
120
122
return false ;
121
123
}
124
+ if (InFlightLimit <= IntervalsInFlightCount) {
125
+ return false ;
126
+ }
127
+ if (SortedSources.size () == 0 ) {
128
+ return false ;
129
+ }
122
130
bool changed = false ;
123
- while (SortedSources.size () && FetchingSources.size () < InFlightLimit) {
131
+ ui32 inFlightCountLocal = 0 ;
132
+ if (SortedSources.size ()) {
133
+ for (auto it = FetchingInFlightSources.begin (); it != FetchingInFlightSources.end (); ++it) {
134
+ if ((*it)->GetFinish () < SortedSources.front ()->GetStart ()) {
135
+ ++inFlightCountLocal;
136
+ }
137
+ }
138
+ }
139
+ AFL_VERIFY (IntervalsInFlightCount == inFlightCountLocal)(" count_global" , IntervalsInFlightCount)(" count_local" , inFlightCountLocal);
140
+ while (SortedSources.size () && inFlightCountLocal < InFlightLimit) {
124
141
SortedSources.front ()->StartProcessing (SortedSources.front ());
125
142
FetchingSources.emplace_back (SortedSources.front ());
126
143
FetchingSourcesByIdx.emplace (SortedSources.front ()->GetSourceIdx (), SortedSources.front ());
144
+ AFL_VERIFY (FetchingInFlightSources.emplace (SortedSources.front ()).second );
127
145
SortedSources.pop_front ();
146
+ if (SortedSources.size ()) {
147
+ ui32 inFlightCountLocalNew = 0 ;
148
+ for (auto it = FetchingInFlightSources.begin (); it != FetchingInFlightSources.end (); ++it) {
149
+ if ((*it)->GetFinish () < SortedSources.front ()->GetStart ()) {
150
+ ++inFlightCountLocalNew;
151
+ }
152
+ }
153
+ AFL_VERIFY (inFlightCountLocal <= inFlightCountLocalNew);
154
+ inFlightCountLocal = inFlightCountLocalNew;
155
+ }
128
156
changed = true ;
129
157
}
158
+ IntervalsInFlightCount = inFlightCountLocal;
130
159
return changed;
131
160
}
132
161
0 commit comments