Skip to content

Commit ee8a9c6

Browse files
authored
Revert "Correct Rows count in Block Output Channels (#11893)" (#13285)
1 parent dd668c0 commit ee8a9c6

File tree

5 files changed

+21
-63
lines changed

5 files changed

+21
-63
lines changed

ydb/library/yql/dq/common/dq_serialized_batch.cpp

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,11 @@ TChunkedBuffer SaveForSpilling(TDqSerializedBatch&& batch) {
6565

6666
ui32 transportversion = batch.Proto.GetTransportVersion();
6767
ui32 rowCount = batch.Proto.GetRows();
68-
ui32 chunkCount = batch.Proto.GetChunks();
6968

7069
TChunkedBuffer protoPayload(std::move(*batch.Proto.MutableRaw()));
7170

7271
AppendNumber(result, transportversion);
7372
AppendNumber(result, rowCount);
74-
AppendNumber(result, chunkCount);
7573
AppendNumber(result, protoPayload.Size());
7674
result.Append(std::move(protoPayload));
7775
AppendNumber(result, batch.Payload.Size());
@@ -87,7 +85,6 @@ TDqSerializedBatch LoadSpilled(TBuffer&& blob) {
8785
TDqSerializedBatch result;
8886
result.Proto.SetTransportVersion(ReadNumber<ui32>(source));
8987
result.Proto.SetRows(ReadNumber<ui32>(source));
90-
result.Proto.SetChunks(ReadNumber<ui32>(source));
9188

9289
size_t protoSize = ReadNumber<size_t>(source);
9390
YQL_ENSURE(source.size() >= protoSize, "Premature end of spilled data");

ydb/library/yql/dq/common/dq_serialized_batch.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,6 @@ struct TDqSerializedBatch {
2828
return Proto.GetRows();
2929
}
3030

31-
ui32 ChunkCount() const {
32-
return Proto.GetChunks();
33-
}
34-
3531
void Clear() {
3632
Payload.Clear();
3733
Proto.Clear();

ydb/library/yql/dq/proto/dq_transport.proto

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ enum EDataTransportVersion {
1616
message TData {
1717
uint32 TransportVersion = 1;
1818
bytes Raw = 2;
19-
uint32 Rows = 5;
20-
uint32 Chunks = 3;
19+
uint32 Rows = 3;
2120
optional uint32 PayloadId = 4;
2221
}

ydb/library/yql/dq/runtime/dq_input_channel.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class TDqInputChannel : public IDqInputChannel {
4747

4848
void PushImpl(TDqSerializedBatch&& data) {
4949
const i64 space = data.Size();
50-
const size_t chunkCount = data.ChunkCount();
50+
const size_t rowCount = data.RowCount();
5151
auto inputType = Impl.GetInputType();
5252
NKikimr::NMiniKQL::TUnboxedValueBatch batch(inputType);
5353
if (Y_UNLIKELY(PushStats.CollectProfile())) {
@@ -58,8 +58,7 @@ class TDqInputChannel : public IDqInputChannel {
5858
DataSerializer.Deserialize(std::move(data), inputType, batch);
5959
}
6060

61-
// single batch row is chunk and may be Arrow block
62-
YQL_ENSURE(batch.RowCount() == chunkCount);
61+
YQL_ENSURE(batch.RowCount() == rowCount);
6362
Impl.AddBatch(std::move(batch), space);
6463
}
6564

ydb/library/yql/dq/runtime/dq_output_channel.cpp

Lines changed: 18 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ class TDqOutputChannel : public IDqOutputChannel {
5858
}
5959

6060
ui64 GetValuesCount() const override {
61-
return SpilledRowCount + PackedRowCount + PackerCurrentRowCount;
61+
return SpilledRowCount + PackedRowCount + ChunkRowCount;
6262
}
6363

6464
const TDqOutputStats& GetPushStats() const override {
@@ -95,12 +95,8 @@ class TDqOutputChannel : public IDqOutputChannel {
9595
return;
9696
}
9797

98-
ui32 rows = Packer.IsBlock() ?
99-
NKikimr::NMiniKQL::TArrowBlock::From(values[width - 1]).GetDatum().scalar_as<arrow::UInt64Scalar>().value
100-
: 1;
101-
10298
if (PushStats.CollectBasic()) {
103-
PushStats.Rows += rows;
99+
PushStats.Rows++;
104100
PushStats.Chunks++;
105101
PushStats.Resume();
106102
}
@@ -114,8 +110,7 @@ class TDqOutputChannel : public IDqOutputChannel {
114110
values[i] = {};
115111
}
116112

117-
PackerCurrentRowCount += rows;
118-
PackerCurrentChunkCount++;
113+
ChunkRowCount++;
119114

120115
size_t packerSize = Packer.PackedSizeEstimate();
121116
if (packerSize >= MaxChunkBytes) {
@@ -125,12 +120,9 @@ class TDqOutputChannel : public IDqOutputChannel {
125120
PushStats.Bytes += Data.back().Buffer.Size();
126121
}
127122
PackedDataSize += Data.back().Buffer.Size();
128-
PackedRowCount += PackerCurrentRowCount;
129-
PackedChunkCount += PackerCurrentChunkCount;
130-
Data.back().RowCount = PackerCurrentRowCount;
131-
Data.back().ChunkCount = PackerCurrentChunkCount;
132-
PackerCurrentRowCount = 0;
133-
PackerCurrentChunkCount = 0;
123+
PackedRowCount += ChunkRowCount;
124+
Data.back().RowCount = ChunkRowCount;
125+
ChunkRowCount = 0;
134126
packerSize = 0;
135127
}
136128

@@ -142,13 +134,11 @@ class TDqOutputChannel : public IDqOutputChannel {
142134
TDqSerializedBatch data;
143135
data.Proto.SetTransportVersion(TransportVersion);
144136
data.Proto.SetRows(head.RowCount);
145-
data.Proto.SetChunks(head.ChunkCount);
146137
data.SetPayload(std::move(head.Buffer));
147138
Storage->Put(NextStoredId++, SaveForSpilling(std::move(data)));
148139

149140
PackedDataSize -= bufSize;
150141
PackedRowCount -= head.RowCount;
151-
PackedChunkCount -= head.ChunkCount;
152142

153143
SpilledRowCount += head.RowCount;
154144

@@ -209,29 +199,22 @@ class TDqOutputChannel : public IDqOutputChannel {
209199
} else if (!Data.empty()) {
210200
auto& packed = Data.front();
211201
PackedRowCount -= packed.RowCount;
212-
PackedChunkCount -= packed.ChunkCount;
213202
PackedDataSize -= packed.Buffer.Size();
214203
data.Proto.SetRows(packed.RowCount);
215-
data.Proto.SetChunks(packed.ChunkCount);
216204
data.SetPayload(std::move(packed.Buffer));
217205
Data.pop_front();
218206
} else {
219-
data.Proto.SetRows(PackerCurrentRowCount);
220-
data.Proto.SetChunks(PackerCurrentChunkCount);
207+
data.Proto.SetRows(ChunkRowCount);
221208
data.SetPayload(FinishPackAndCheckSize());
222-
if (PushStats.CollectBasic()) {
223-
PushStats.Bytes += data.Payload.Size();
224-
}
225-
PackerCurrentRowCount = 0;
226-
PackerCurrentChunkCount = 0;
209+
ChunkRowCount = 0;
227210
}
228211

229212
DLOG("Took " << data.RowCount() << " rows");
230213

231214
if (PopStats.CollectBasic()) {
232215
PopStats.Bytes += data.Size();
233-
PopStats.Rows += data.RowCount();
234-
PopStats.Chunks++; // pop chunks do not match push chunks
216+
PopStats.Rows += data.RowCount();
217+
PopStats.Chunks++;
235218
if (!IsFull() || FirstStoredId == NextStoredId) {
236219
PopStats.Resume();
237220
}
@@ -273,31 +256,20 @@ class TDqOutputChannel : public IDqOutputChannel {
273256
data.Clear();
274257
data.Proto.SetTransportVersion(TransportVersion);
275258
if (SpilledRowCount == 0 && PackedRowCount == 0) {
276-
data.Proto.SetRows(PackerCurrentRowCount);
277-
data.Proto.SetChunks(PackerCurrentChunkCount);
259+
data.Proto.SetRows(ChunkRowCount);
278260
data.SetPayload(FinishPackAndCheckSize());
279-
if (PushStats.CollectBasic()) {
280-
PushStats.Bytes += data.Payload.Size();
281-
}
282-
PackerCurrentRowCount = 0;
283-
PackerCurrentChunkCount = 0;
261+
ChunkRowCount = 0;
284262
return true;
285263
}
286264

287265
// Repack all - thats why PopAll should never be used
288-
if (PackerCurrentRowCount) {
266+
if (ChunkRowCount) {
289267
Data.emplace_back();
290268
Data.back().Buffer = FinishPackAndCheckSize();
291-
if (PushStats.CollectBasic()) {
292-
PushStats.Bytes += Data.back().Buffer.Size();
293-
}
294269
PackedDataSize += Data.back().Buffer.Size();
295-
PackedRowCount += PackerCurrentRowCount;
296-
PackedChunkCount += PackerCurrentChunkCount;
297-
Data.back().RowCount = PackerCurrentRowCount;
298-
Data.back().ChunkCount = PackerCurrentChunkCount;
299-
PackerCurrentRowCount = 0;
300-
PackerCurrentChunkCount = 0;
270+
PackedRowCount += ChunkRowCount;
271+
Data.back().RowCount = ChunkRowCount;
272+
ChunkRowCount = 0;
301273
}
302274

303275
NKikimr::NMiniKQL::TUnboxedValueBatch rows(OutputType);
@@ -360,9 +332,7 @@ class TDqOutputChannel : public IDqOutputChannel {
360332
ui64 rows = GetValuesCount();
361333
Data.clear();
362334
Packer.Clear();
363-
PackedDataSize = 0;
364-
SpilledRowCount = PackedRowCount = PackerCurrentRowCount = 0;
365-
PackedChunkCount = PackerCurrentChunkCount = 0;
335+
SpilledRowCount = PackedDataSize = PackedRowCount = ChunkRowCount = 0;
366336
FirstStoredId = NextStoredId;
367337
return rows;
368338
}
@@ -389,7 +359,6 @@ class TDqOutputChannel : public IDqOutputChannel {
389359
struct TSerializedBatch {
390360
TChunkedBuffer Buffer;
391361
ui64 RowCount = 0;
392-
ui64 ChunkCount = 0;
393362
};
394363
std::deque<TSerializedBatch> Data;
395364

@@ -399,10 +368,8 @@ class TDqOutputChannel : public IDqOutputChannel {
399368

400369
size_t PackedDataSize = 0;
401370
size_t PackedRowCount = 0;
402-
size_t PackedChunkCount = 0;
403371

404-
size_t PackerCurrentRowCount = 0;
405-
size_t PackerCurrentChunkCount = 0;
372+
size_t ChunkRowCount = 0;
406373

407374
bool Finished = false;
408375

0 commit comments

Comments
 (0)