Skip to content

Commit 398fb41

Browse files
authored
Correct Rows count in Block Output Channels (#11893)
1 parent b83d01c commit 398fb41

File tree

5 files changed

+63
-21
lines changed

5 files changed

+63
-21
lines changed

ydb/library/yql/dq/common/dq_serialized_batch.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,13 @@ TChunkedBuffer SaveForSpilling(TDqSerializedBatch&& batch) {
6565

6666
ui32 transportversion = batch.Proto.GetTransportVersion();
6767
ui32 rowCount = batch.Proto.GetRows();
68+
ui32 chunkCount = batch.Proto.GetChunks();
6869

6970
TChunkedBuffer protoPayload(std::move(*batch.Proto.MutableRaw()));
7071

7172
AppendNumber(result, transportversion);
7273
AppendNumber(result, rowCount);
74+
AppendNumber(result, chunkCount);
7375
AppendNumber(result, protoPayload.Size());
7476
result.Append(std::move(protoPayload));
7577
AppendNumber(result, batch.Payload.Size());
@@ -85,6 +87,7 @@ TDqSerializedBatch LoadSpilled(TBuffer&& blob) {
8587
TDqSerializedBatch result;
8688
result.Proto.SetTransportVersion(ReadNumber<ui32>(source));
8789
result.Proto.SetRows(ReadNumber<ui32>(source));
90+
result.Proto.SetChunks(ReadNumber<ui32>(source));
8891

8992
size_t protoSize = ReadNumber<size_t>(source);
9093
YQL_ENSURE(source.size() >= protoSize, "Premature end of spilled data");

ydb/library/yql/dq/common/dq_serialized_batch.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ struct TDqSerializedBatch {
2828
return Proto.GetRows();
2929
}
3030

31+
ui32 ChunkCount() const {
32+
return Proto.GetChunks();
33+
}
34+
3135
void Clear() {
3236
Payload.Clear();
3337
Proto.Clear();

ydb/library/yql/dq/proto/dq_transport.proto

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ enum EDataTransportVersion {
1616
message TData {
1717
uint32 TransportVersion = 1;
1818
bytes Raw = 2;
19-
uint32 Rows = 3;
19+
uint32 Rows = 5;
20+
uint32 Chunks = 3;
2021
optional uint32 PayloadId = 4;
2122
}

ydb/library/yql/dq/runtime/dq_input_channel.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class TDqInputChannel : public IDqInputChannel {
4747

4848
void PushImpl(TDqSerializedBatch&& data) {
4949
const i64 space = data.Size();
50-
const size_t rowCount = data.RowCount();
50+
const size_t chunkCount = data.ChunkCount();
5151
auto inputType = Impl.GetInputType();
5252
NKikimr::NMiniKQL::TUnboxedValueBatch batch(inputType);
5353
if (Y_UNLIKELY(PushStats.CollectProfile())) {
@@ -58,7 +58,8 @@ class TDqInputChannel : public IDqInputChannel {
5858
DataSerializer.Deserialize(std::move(data), inputType, batch);
5959
}
6060

61-
YQL_ENSURE(batch.RowCount() == rowCount);
61+
// single batch row is chunk and may be Arrow block
62+
YQL_ENSURE(batch.RowCount() == chunkCount);
6263
Impl.AddBatch(std::move(batch), space);
6364
}
6465

ydb/library/yql/dq/runtime/dq_output_channel.cpp

Lines changed: 51 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ class TDqOutputChannel : public IDqOutputChannel {
5858
}
5959

6060
ui64 GetValuesCount() const override {
61-
return SpilledRowCount + PackedRowCount + ChunkRowCount;
61+
return SpilledRowCount + PackedRowCount + PackerCurrentRowCount;
6262
}
6363

6464
const TDqOutputStats& GetPushStats() const override {
@@ -95,8 +95,12 @@ class TDqOutputChannel : public IDqOutputChannel {
9595
return;
9696
}
9797

98+
ui32 rows = Packer.IsBlock() ?
99+
NKikimr::NMiniKQL::TArrowBlock::From(values[width - 1]).GetDatum().scalar_as<arrow::UInt64Scalar>().value
100+
: 1;
101+
98102
if (PushStats.CollectBasic()) {
99-
PushStats.Rows++;
103+
PushStats.Rows += rows;
100104
PushStats.Chunks++;
101105
PushStats.Resume();
102106
}
@@ -110,7 +114,8 @@ class TDqOutputChannel : public IDqOutputChannel {
110114
values[i] = {};
111115
}
112116

113-
ChunkRowCount++;
117+
PackerCurrentRowCount += rows;
118+
PackerCurrentChunkCount++;
114119

115120
size_t packerSize = Packer.PackedSizeEstimate();
116121
if (packerSize >= MaxChunkBytes) {
@@ -120,9 +125,12 @@ class TDqOutputChannel : public IDqOutputChannel {
120125
PushStats.Bytes += Data.back().Buffer.Size();
121126
}
122127
PackedDataSize += Data.back().Buffer.Size();
123-
PackedRowCount += ChunkRowCount;
124-
Data.back().RowCount = ChunkRowCount;
125-
ChunkRowCount = 0;
128+
PackedRowCount += PackerCurrentRowCount;
129+
PackedChunkCount += PackerCurrentChunkCount;
130+
Data.back().RowCount = PackerCurrentRowCount;
131+
Data.back().ChunkCount = PackerCurrentChunkCount;
132+
PackerCurrentRowCount = 0;
133+
PackerCurrentChunkCount = 0;
126134
packerSize = 0;
127135
}
128136

@@ -134,11 +142,13 @@ class TDqOutputChannel : public IDqOutputChannel {
134142
TDqSerializedBatch data;
135143
data.Proto.SetTransportVersion(TransportVersion);
136144
data.Proto.SetRows(head.RowCount);
145+
data.Proto.SetChunks(head.ChunkCount);
137146
data.SetPayload(std::move(head.Buffer));
138147
Storage->Put(NextStoredId++, SaveForSpilling(std::move(data)));
139148

140149
PackedDataSize -= bufSize;
141150
PackedRowCount -= head.RowCount;
151+
PackedChunkCount -= head.ChunkCount;
142152

143153
SpilledRowCount += head.RowCount;
144154

@@ -199,22 +209,29 @@ class TDqOutputChannel : public IDqOutputChannel {
199209
} else if (!Data.empty()) {
200210
auto& packed = Data.front();
201211
PackedRowCount -= packed.RowCount;
212+
PackedChunkCount -= packed.ChunkCount;
202213
PackedDataSize -= packed.Buffer.Size();
203214
data.Proto.SetRows(packed.RowCount);
215+
data.Proto.SetChunks(packed.ChunkCount);
204216
data.SetPayload(std::move(packed.Buffer));
205217
Data.pop_front();
206218
} else {
207-
data.Proto.SetRows(ChunkRowCount);
219+
data.Proto.SetRows(PackerCurrentRowCount);
220+
data.Proto.SetChunks(PackerCurrentChunkCount);
208221
data.SetPayload(FinishPackAndCheckSize());
209-
ChunkRowCount = 0;
222+
if (PushStats.CollectBasic()) {
223+
PushStats.Bytes += data.Payload.Size();
224+
}
225+
PackerCurrentRowCount = 0;
226+
PackerCurrentChunkCount = 0;
210227
}
211228

212229
DLOG("Took " << data.RowCount() << " rows");
213230

214231
if (PopStats.CollectBasic()) {
215232
PopStats.Bytes += data.Size();
216-
PopStats.Rows += data.RowCount();
217-
PopStats.Chunks++;
233+
PopStats.Rows += data.RowCount();
234+
PopStats.Chunks++; // pop chunks do not match push chunks
218235
if (!IsFull() || FirstStoredId == NextStoredId) {
219236
PopStats.Resume();
220237
}
@@ -256,20 +273,31 @@ class TDqOutputChannel : public IDqOutputChannel {
256273
data.Clear();
257274
data.Proto.SetTransportVersion(TransportVersion);
258275
if (SpilledRowCount == 0 && PackedRowCount == 0) {
259-
data.Proto.SetRows(ChunkRowCount);
276+
data.Proto.SetRows(PackerCurrentRowCount);
277+
data.Proto.SetChunks(PackerCurrentChunkCount);
260278
data.SetPayload(FinishPackAndCheckSize());
261-
ChunkRowCount = 0;
279+
if (PushStats.CollectBasic()) {
280+
PushStats.Bytes += data.Payload.Size();
281+
}
282+
PackerCurrentRowCount = 0;
283+
PackerCurrentChunkCount = 0;
262284
return true;
263285
}
264286

265287
// Repack all - thats why PopAll should never be used
266-
if (ChunkRowCount) {
288+
if (PackerCurrentRowCount) {
267289
Data.emplace_back();
268290
Data.back().Buffer = FinishPackAndCheckSize();
291+
if (PushStats.CollectBasic()) {
292+
PushStats.Bytes += Data.back().Buffer.Size();
293+
}
269294
PackedDataSize += Data.back().Buffer.Size();
270-
PackedRowCount += ChunkRowCount;
271-
Data.back().RowCount = ChunkRowCount;
272-
ChunkRowCount = 0;
295+
PackedRowCount += PackerCurrentRowCount;
296+
PackedChunkCount += PackerCurrentChunkCount;
297+
Data.back().RowCount = PackerCurrentRowCount;
298+
Data.back().ChunkCount = PackerCurrentChunkCount;
299+
PackerCurrentRowCount = 0;
300+
PackerCurrentChunkCount = 0;
273301
}
274302

275303
NKikimr::NMiniKQL::TUnboxedValueBatch rows(OutputType);
@@ -332,7 +360,9 @@ class TDqOutputChannel : public IDqOutputChannel {
332360
ui64 rows = GetValuesCount();
333361
Data.clear();
334362
Packer.Clear();
335-
SpilledRowCount = PackedDataSize = PackedRowCount = ChunkRowCount = 0;
363+
PackedDataSize = 0;
364+
SpilledRowCount = PackedRowCount = PackerCurrentRowCount = 0;
365+
PackedChunkCount = PackerCurrentChunkCount = 0;
336366
FirstStoredId = NextStoredId;
337367
return rows;
338368
}
@@ -359,6 +389,7 @@ class TDqOutputChannel : public IDqOutputChannel {
359389
struct TSerializedBatch {
360390
TChunkedBuffer Buffer;
361391
ui64 RowCount = 0;
392+
ui64 ChunkCount = 0;
362393
};
363394
std::deque<TSerializedBatch> Data;
364395

@@ -368,8 +399,10 @@ class TDqOutputChannel : public IDqOutputChannel {
368399

369400
size_t PackedDataSize = 0;
370401
size_t PackedRowCount = 0;
402+
size_t PackedChunkCount = 0;
371403

372-
size_t ChunkRowCount = 0;
404+
size_t PackerCurrentRowCount = 0;
405+
size_t PackerCurrentChunkCount = 0;
373406

374407
bool Finished = false;
375408

0 commit comments

Comments
 (0)