Skip to content

Commit ccc3576

Browse files
authored
Use soft and hard limits for output [channel] backpressure (#16997)
1 parent 42da2fe commit ccc3576

13 files changed

+730
-147
lines changed

ydb/core/kqp/runtime/kqp_effects.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ class TKqpApplyEffectsConsumer : public IDqOutputConsumer {
1515
TKqpApplyEffectsConsumer(NUdf::IApplyContext* applyCtx)
1616
: ApplyCtx(applyCtx) {}
1717

18-
bool IsFull() const override {
19-
return false;
18+
EDqFillLevel GetFillLevel() const override {
19+
return NoLimit;
2020
}
2121

2222
void Consume(NUdf::TUnboxedValue&& value) final {

ydb/core/kqp/runtime/kqp_output_stream.cpp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,15 @@ class TKqpOutputRangePartitionConsumer : public IDqOutputConsumer {
3232
MKQL_ENSURE_S(KeyColumnTypes.size() == KeyColumnIndices.size());
3333

3434
SortPartitions(Partitions, KeyColumnTypes, [](const auto& partition) { return partition.Range; });
35+
36+
Aggregator = std::make_shared<TDqFillAggregator>();
37+
for (auto output : Outputs) {
38+
output->SetFillAggregator(Aggregator);
39+
}
3540
}
3641

37-
bool IsFull() const override {
38-
return AnyOf(Outputs, [](const auto& output) { return output->IsFull(); });
42+
EDqFillLevel GetFillLevel() const override {
43+
return Aggregator->GetFillLevel();
3944
}
4045

4146
void Consume(TUnboxedValue&& value) final {
@@ -67,6 +72,7 @@ class TKqpOutputRangePartitionConsumer : public IDqOutputConsumer {
6772
TVector<TKqpRangePartition> Partitions;
6873
TVector<NScheme::TTypeInfo> KeyColumnTypes;
6974
TVector<ui32> KeyColumnIndices;
75+
std::shared_ptr<TDqFillAggregator> Aggregator;
7076
};
7177

7278
} // namespace

ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,7 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
367367
if (channel) {
368368
html << "DqOutputChannel.ChannelId: " << channel->GetChannelId() << "<br />";
369369
html << "DqOutputChannel.ValuesCount: " << channel->GetValuesCount() << "<br />";
370-
html << "DqOutputChannel.IsFull: " << channel->IsFull() << "<br />";
370+
html << "DqOutputChannel.FillLevel: " << static_cast<ui32>(channel->GetFillLevel()) << "<br />";
371371
html << "DqOutputChannel.HasData: " << channel->HasData() << "<br />";
372372
html << "DqOutputChannel.IsFinished: " << channel->IsFinished() << "<br />";
373373
html << "DqInputChannel.OutputType: " << (channel->GetOutputType() ? channel->GetOutputType()->GetKindAsStr() : TString{"unknown"}) << "<br />";
@@ -399,7 +399,7 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
399399
if (info.Buffer || TaskRunnerStats.GetSink(id)) {
400400
const auto& buffer = info.Buffer ? *info.Buffer : *TaskRunnerStats.GetSink(id);
401401
html << "DqOutputBuffer.OutputIndex: " << buffer.GetOutputIndex() << "<br />";
402-
html << "DqOutputBuffer.IsFull: " << buffer.IsFull() << "<br />";
402+
html << "DqOutputBuffer.FillLevel: " << static_cast<ui32>(buffer.GetFillLevel()) << "<br />";
403403
html << "DqOutputBuffer.OutputType: " << (buffer.GetOutputType() ? buffer.GetOutputType()->GetKindAsStr() : TString{"unknown"}) << "<br />";
404404
html << "DqOutputBuffer.IsFinished: " << buffer.IsFinished() << "<br />";
405405
html << "DqOutputBuffer.HasData: " << buffer.HasData() << "<br />";

ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,17 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
5454
}
5555

5656
void DoTerminateImpl() override {
57+
// we want to log debug output info only for long running (OLAP) tasks
58+
if (TaskRunner && TBase::State == NDqProto::COMPUTE_STATE_FAILURE && TBase::RuntimeSettings.CollectFull()) {
59+
auto& stats = *TaskRunner->GetStats();
60+
if (stats.StartTs && TInstant::Now() - stats.StartTs > TDuration::Seconds(60)) {
61+
auto taskRunnerDebugString = TaskRunner->GetOutputDebugString();
62+
if (taskRunnerDebugString) {
63+
CA_LOG_E("TaskRunner->Output Debug String: " << taskRunnerDebugString);
64+
}
65+
}
66+
}
67+
5768
TaskRunner.Reset();
5869
}
5970

ydb/library/yql/dq/runtime/dq_async_output.cpp

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,29 @@ class TDqAsyncOutputBuffer : public IDqAsyncOutputBuffer {
7070
const TDqOutputStats& GetPushStats() const override {
7171
return PushStats;
7272
}
73-
73+
7474
const TDqAsyncOutputBufferStats& GetPopStats() const override {
7575
return PopStats;
7676
}
7777

78-
bool IsFull() const override {
79-
return EstimatedStoredBytes >= MaxStoredBytes;
78+
EDqFillLevel GetFillLevel() const override {
79+
return FillLevel;
80+
}
81+
82+
EDqFillLevel UpdateFillLevel() override {
83+
auto result = EstimatedStoredBytes >= MaxStoredBytes ? HardLimit : NoLimit;
84+
if (FillLevel != result) {
85+
if (Aggregator) {
86+
Aggregator->UpdateCount(FillLevel, result);
87+
}
88+
FillLevel = result;
89+
}
90+
return result;
91+
}
92+
93+
void SetFillAggregator(std::shared_ptr<TDqFillAggregator> aggregator) override {
94+
Aggregator = aggregator;
95+
Aggregator->AddCount(FillLevel);
8096
}
8197

8298
void Push(NUdf::TUnboxedValue&& value) override {
@@ -265,22 +281,25 @@ class TDqAsyncOutputBuffer : public IDqAsyncOutputBuffer {
265281
PushStats.Resume();
266282
}
267283

268-
if (IsFull()) {
269-
PopStats.TryPause();
270-
}
284+
auto fillLevel = UpdateFillLevel();
271285

272286
if (PopStats.CollectFull()) {
287+
if (fillLevel != NoLimit) {
288+
PopStats.TryPause();
289+
}
273290
PopStats.MaxMemoryUsage = std::max(PopStats.MaxMemoryUsage, EstimatedStoredBytes);
274291
PopStats.MaxRowsInMemory = std::max(PopStats.MaxRowsInMemory, Values.size());
275292
}
276293
}
277294

278295
void ReportChunkOut(ui64 rows, ui64 bytes) {
296+
auto fillLevel = UpdateFillLevel();
297+
279298
if (PopStats.CollectBasic()) {
280299
PopStats.Bytes += bytes;
281300
PopStats.Rows += rows;
282301
PopStats.Chunks++;
283-
if (!IsFull()) {
302+
if (fillLevel == NoLimit) {
284303
PopStats.Resume();
285304
}
286305
}
@@ -318,6 +337,8 @@ class TDqAsyncOutputBuffer : public IDqAsyncOutputBuffer {
318337
bool Finished = false;
319338
std::deque<TValueDesc> Values;
320339
ui64 EstimatedRowBytes = 0;
340+
std::shared_ptr<TDqFillAggregator> Aggregator;
341+
EDqFillLevel FillLevel = NoLimit;
321342
};
322343

323344
} // namespace

ydb/library/yql/dq/runtime/dq_output.h

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@
22

33
#include <ydb/library/yql/dq/actors/protos/dq_events.pb.h>
44
#include <yql/essentials/minikql/mkql_node.h>
5+
#include <yql/essentials/utils/yql_panic.h>
56

67
#include <util/datetime/base.h>
78
#include <util/generic/ptr.h>
89

9-
#include "dq_async_stats.h"
10+
#include "dq_async_stats.h"
1011

1112
namespace NYql {
1213
namespace NDqProto {
@@ -27,6 +28,58 @@ struct TDqOutputStats : public TDqAsyncStats {
2728
ui64 MaxRowsInMemory = 0;
2829
};
2930

31+
enum EDqFillLevel {
32+
NoLimit,
33+
SoftLimit,
34+
HardLimit
35+
};
36+
37+
const constexpr ui32 FILL_COUNTERS_SIZE = 4u;
38+
39+
struct TDqFillAggregator {
40+
41+
alignas(64) std::array<std::atomic<ui64>, FILL_COUNTERS_SIZE> Counts;
42+
43+
ui64 GetCount(EDqFillLevel level) {
44+
ui32 index = static_cast<ui32>(level);
45+
YQL_ENSURE(index < FILL_COUNTERS_SIZE);
46+
return Counts[index].load();
47+
}
48+
49+
void AddCount(EDqFillLevel level) {
50+
ui32 index = static_cast<ui32>(level);
51+
YQL_ENSURE(index < FILL_COUNTERS_SIZE);
52+
Counts[index]++;
53+
}
54+
55+
void UpdateCount(EDqFillLevel prevLevel, EDqFillLevel level) {
56+
ui32 index1 = static_cast<ui32>(prevLevel);
57+
ui32 index2 = static_cast<ui32>(level);
58+
YQL_ENSURE(index1 < FILL_COUNTERS_SIZE && index2 < FILL_COUNTERS_SIZE);
59+
if (index1 != index2) {
60+
Counts[index2]++;
61+
Counts[index1]--;
62+
}
63+
}
64+
65+
EDqFillLevel GetFillLevel() const {
66+
if (Counts[static_cast<ui32>(HardLimit)].load()) {
67+
return HardLimit;
68+
}
69+
if (Counts[static_cast<ui32>(NoLimit)].load()) {
70+
return NoLimit;
71+
}
72+
return Counts[static_cast<ui32>(SoftLimit)].load() ? SoftLimit : NoLimit;
73+
}
74+
75+
TString DebugString() {
76+
return TStringBuilder() << "TDqFillAggregator { N=" << Counts[static_cast<ui32>(NoLimit)].load()
77+
<< " S=" << Counts[static_cast<ui32>(SoftLimit)].load()
78+
<< " H=" << Counts[static_cast<ui32>(HardLimit)].load()
79+
<< " }";
80+
}
81+
};
82+
3083
class IDqOutput : public TSimpleRefCount<IDqOutput> {
3184
public:
3285
using TPtr = TIntrusivePtr<IDqOutput>;
@@ -36,8 +89,9 @@ class IDqOutput : public TSimpleRefCount<IDqOutput> {
3689
virtual const TDqOutputStats& GetPushStats() const = 0;
3790

3891
// <| producer methods
39-
[[nodiscard]]
40-
virtual bool IsFull() const = 0;
92+
virtual EDqFillLevel GetFillLevel() const = 0;
93+
virtual EDqFillLevel UpdateFillLevel() = 0;
94+
virtual void SetFillAggregator(std::shared_ptr<TDqFillAggregator> aggregator) = 0;
4195
// can throw TDqChannelStorageException
4296
virtual void Push(NUdf::TUnboxedValue&& value) = 0;
4397
virtual void WidePush(NUdf::TUnboxedValue* values, ui32 count) = 0;

ydb/library/yql/dq/runtime/dq_output_channel.cpp

Lines changed: 41 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -72,28 +72,48 @@ class TDqOutputChannel : public IDqOutputChannel {
7272
return PopStats;
7373
}
7474

75-
[[nodiscard]]
76-
bool IsFull() const override {
77-
if (!Storage) {
78-
return PackedDataSize + Packer.PackedSizeEstimate() >= MaxStoredBytes;
75+
EDqFillLevel CalcFillLevel() const {
76+
if (Storage) {
77+
return FirstStoredId < NextStoredId ? (Storage->IsFull() ? HardLimit : SoftLimit) : NoLimit;
78+
} else {
79+
return PackedDataSize + Packer.PackedSizeEstimate() >= MaxStoredBytes ? HardLimit : NoLimit;
80+
}
81+
}
82+
83+
EDqFillLevel GetFillLevel() const override {
84+
return FillLevel;
85+
}
86+
87+
EDqFillLevel UpdateFillLevel() override {
88+
auto result = CalcFillLevel();
89+
if (FillLevel != result) {
90+
if (Aggregator) {
91+
Aggregator->UpdateCount(FillLevel, result);
92+
}
93+
FillLevel = result;
7994
}
80-
return Storage->IsFull();
95+
return result;
96+
}
97+
98+
void SetFillAggregator(std::shared_ptr<TDqFillAggregator> aggregator) override {
99+
Aggregator = aggregator;
100+
Aggregator->AddCount(FillLevel);
81101
}
82102

83-
virtual void Push(NUdf::TUnboxedValue&& value) override {
103+
void Push(NUdf::TUnboxedValue&& value) override {
84104
YQL_ENSURE(!OutputType->IsMulti());
85105
DoPushSafe(&value, 1);
86106
}
87107

88-
virtual void WidePush(NUdf::TUnboxedValue* values, ui32 width) override {
108+
void WidePush(NUdf::TUnboxedValue* values, ui32 width) override {
89109
YQL_ENSURE(OutputType->IsMulti());
90110
YQL_ENSURE(Width == width);
91111
DoPushSafe(values, width);
92112
}
93113

94114
// Try to split data before push to fulfill ChunkSizeLimit
95115
void DoPushSafe(NUdf::TUnboxedValue* values, ui32 width) {
96-
YQL_ENSURE(!IsFull());
116+
YQL_ENSURE(GetFillLevel() != HardLimit);
97117

98118
if (Finished) {
99119
return;
@@ -194,11 +214,12 @@ class TDqOutputChannel : public IDqOutputChannel {
194214
LOG("Data spilled. Total rows spilled: " << SpilledChunkCount << ", bytesInMemory: " << (PackedDataSize + packerSize)); // FIXME with RowCount
195215
}
196216

197-
if (IsFull() || FirstStoredId < NextStoredId) {
198-
PopStats.TryPause();
199-
}
217+
UpdateFillLevel();
200218

201219
if (PopStats.CollectFull()) {
220+
if (FillLevel != NoLimit) {
221+
PopStats.TryPause();
222+
}
202223
PopStats.MaxMemoryUsage = std::max(PopStats.MaxMemoryUsage, PackedDataSize + packerSize);
203224
PopStats.MaxRowsInMemory = std::max(PopStats.MaxRowsInMemory, PackedChunkCount);
204225
}
@@ -257,11 +278,13 @@ class TDqOutputChannel : public IDqOutputChannel {
257278

258279
DLOG("Took " << data.RowCount() << " rows");
259280

281+
UpdateFillLevel();
282+
260283
if (PopStats.CollectBasic()) {
261284
PopStats.Bytes += data.Size();
262285
PopStats.Rows += data.RowCount();
263286
PopStats.Chunks++; // pop chunks do not match push chunks
264-
if (!IsFull() || FirstStoredId == NextStoredId) {
287+
if (FillLevel == NoLimit) {
265288
PopStats.Resume();
266289
}
267290
}
@@ -359,7 +382,7 @@ class TDqOutputChannel : public IDqOutputChannel {
359382
PopStats.Bytes += data.Size();
360383
PopStats.Rows += data.RowCount();
361384
PopStats.Chunks++;
362-
if (!IsFull() || FirstStoredId == NextStoredId) {
385+
if (UpdateFillLevel() == NoLimit) {
363386
PopStats.Resume();
364387
}
365388
}
@@ -391,7 +414,7 @@ class TDqOutputChannel : public IDqOutputChannel {
391414
}
392415

393416
ui64 Drop() override { // Drop channel data because channel was finished. Leave checkpoint because checkpoints keep going through channel after finishing channel data transfer.
394-
ui64 rows = GetValuesCount();
417+
ui64 chunks = GetValuesCount();
395418
Data.clear();
396419
Packer.Clear();
397420
PackedDataSize = 0;
@@ -401,7 +424,8 @@ class TDqOutputChannel : public IDqOutputChannel {
401424
PackerCurrentChunkCount = 0;
402425
PackerCurrentRowCount = 0;
403426
FirstStoredId = NextStoredId;
404-
return rows;
427+
UpdateFillLevel();
428+
return chunks;
405429
}
406430

407431
NKikimr::NMiniKQL::TType* GetOutputType() const override {
@@ -455,6 +479,8 @@ class TDqOutputChannel : public IDqOutputChannel {
455479

456480
TMaybe<NDqProto::TWatermark> Watermark;
457481
TMaybe<NDqProto::TCheckpoint> Checkpoint;
482+
std::shared_ptr<TDqFillAggregator> Aggregator;
483+
EDqFillLevel FillLevel = NoLimit;
458484
};
459485

460486
} // anonymous namespace

0 commit comments

Comments
 (0)