Skip to content

Commit cf53155

Browse files
authored
Support block IO in purebench (#7066)
1 parent f773909 commit cf53155

File tree

4 files changed

+190
-55
lines changed

4 files changed

+190
-55
lines changed

ydb/library/yql/public/udf/arrow/util.cpp

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include "defs.h"
44

55
#include <arrow/array/array_base.h>
6+
#include <arrow/array/util.h>
67
#include <arrow/chunked_array.h>
78
#include <arrow/record_batch.h>
89

@@ -30,6 +31,21 @@ ui64 GetSizeOfArrayDataInBytes(const arrow::ArrayData& data) {
3031
return size;
3132
}
3233

34+
ui64 GetSizeOfDatumInBytes(const arrow::Datum& datum) {
35+
ui64 size = sizeof(datum);
36+
if (datum.is_scalar()) {
37+
const auto& scarray = ARROW_RESULT(arrow::MakeArrayFromScalar(*datum.scalar(), 1));
38+
return size + GetSizeOfArrayDataInBytes(*scarray->data());
39+
}
40+
if (datum.is_arraylike()) {
41+
ForEachArrayData(datum, [&size](const auto& arrayData) {
42+
size += GetSizeOfArrayDataInBytes(*arrayData);
43+
});
44+
return size;
45+
}
46+
Y_ABORT("Not yet implemented");
47+
}
48+
3349
} // namespace
3450

3551
std::shared_ptr<arrow::Buffer> AllocateBitmapWithReserve(size_t bitCount, arrow::MemoryPool* pool) {
@@ -132,5 +148,14 @@ ui64 GetSizeOfArrowBatchInBytes(const arrow::RecordBatch& batch) {
132148
return size;
133149
}
134150

151+
ui64 GetSizeOfArrowExecBatchInBytes(const arrow::compute::ExecBatch& batch) {
152+
ui64 size = sizeof(batch);
153+
size += batch.num_values() * sizeof(void*);
154+
for (const auto& datum : batch.values) {
155+
size += GetSizeOfDatumInBytes(datum);
156+
}
157+
158+
return size;
159+
}
135160
}
136161
}

ydb/library/yql/public/udf/arrow/util.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
#include <util/generic/vector.h>
66

7+
#include <arrow/compute/api.h>
78
#include <arrow/datum.h>
89
#include <arrow/memory_pool.h>
910
#include <arrow/util/bit_util.h>
@@ -41,6 +42,7 @@ inline bool IsNull(const arrow::ArrayData& data, size_t index) {
4142
}
4243

4344
ui64 GetSizeOfArrowBatchInBytes(const arrow::RecordBatch& batch);
45+
ui64 GetSizeOfArrowExecBatchInBytes(const arrow::compute::ExecBatch& batch);
4446

4547
class TResizeableBuffer : public arrow::ResizableBuffer {
4648
public:

ydb/library/yql/tools/purebench/purebench.cpp

Lines changed: 162 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@
33

44
#include <ydb/library/yql/public/purecalc/purecalc.h>
55
#include <ydb/library/yql/public/purecalc/io_specs/mkql/spec.h>
6+
#include <ydb/library/yql/public/purecalc/io_specs/arrow/spec.h>
7+
#include <ydb/library/yql/public/purecalc/helpers/stream/stream_from_vector.h>
68

79
#include <ydb/library/yql/utils/log/log.h>
810
#include <ydb/library/yql/utils/backtrace/backtrace.h>
11+
#include <ydb/library/yql/public/udf/arrow/util.h>
912
#include <ydb/library/yql/public/udf/udf_registrator.h>
1013
#include <ydb/library/yql/public/udf/udf_version.h>
1114

@@ -17,11 +20,99 @@
1720
#include <util/stream/format.h>
1821
#include <util/stream/null.h>
1922

23+
#include <algorithm>
2024
#include <cmath>
2125

2226
using namespace NYql;
2327
using namespace NYql::NPureCalc;
2428

29+
TStringStream MakeGenInput(ui64 count) {
30+
TStringStream stream;
31+
NSkiff::TUncheckedSkiffWriter writer{&stream};
32+
for (ui64 i = 0; i < count; ++i) {
33+
writer.WriteVariant16Tag(0);
34+
writer.WriteInt64(i);
35+
}
36+
writer.Finish();
37+
return stream;
38+
}
39+
40+
template <typename TInputSpec, typename TOutputSpec>
41+
using TRunCallable = std::function<void (const THolder<TPullListProgram<TInputSpec, TOutputSpec>>&)>;
42+
43+
template <typename TOutputSpec>
44+
NYT::TNode RunGenSql(
45+
const IProgramFactoryPtr factory,
46+
const TVector<NYT::TNode>& inputSchema,
47+
const TString& sql,
48+
ETranslationMode isPg,
49+
TRunCallable<TSkiffInputSpec, TOutputSpec> runCallable
50+
) {
51+
auto inputSpec = TSkiffInputSpec(inputSchema);
52+
auto outputSpec = TOutputSpec({NYT::TNode::CreateEntity()});
53+
auto program = factory->MakePullListProgram(inputSpec, outputSpec, sql, isPg);
54+
55+
runCallable(program);
56+
57+
return program->MakeOutputSchema();
58+
}
59+
60+
template <typename TInputSpec, typename TStream>
61+
void ShowResults(
62+
const IProgramFactoryPtr factory,
63+
const TVector<NYT::TNode>& inputSchema,
64+
const TString& sql,
65+
ETranslationMode isPg,
66+
TStream* input
67+
) {
68+
auto inputSpec = TInputSpec(inputSchema);
69+
auto outputSpec = TYsonOutputSpec({NYT::TNode::CreateEntity()});
70+
auto program = factory->MakePullListProgram(inputSpec, outputSpec, sql, isPg);
71+
auto handle = program->Apply(input);
72+
TStringStream output;
73+
handle->Run(&output);
74+
TStringInput in(output.Str());
75+
NYson::ReformatYsonStream(&in, &Cerr, NYson::EYsonFormat::Pretty, NYson::EYsonType::ListFragment);
76+
}
77+
78+
template <typename TInputSpec, typename TOutputSpec>
79+
double RunBenchmarks(
80+
const IProgramFactoryPtr factory,
81+
const TVector<NYT::TNode>& inputSchema,
82+
const TString& sql,
83+
ETranslationMode isPg,
84+
ui32 repeats,
85+
TRunCallable<TInputSpec, TOutputSpec> runCallable
86+
) {
87+
auto inputSpec = TInputSpec(inputSchema);
88+
auto outputSpec = TOutputSpec({NYT::TNode::CreateEntity()});
89+
auto program = factory->MakePullListProgram(inputSpec, outputSpec, sql, isPg);
90+
91+
Cerr << "Dry run of test sql...\n";
92+
93+
runCallable(program);
94+
95+
Cerr << "Run benchmark...\n";
96+
97+
TVector<TDuration> times;
98+
TSimpleTimer allTimer;
99+
for (ui32 i = 0; i < repeats; ++i) {
100+
TSimpleTimer timer;
101+
runCallable(program);
102+
times.push_back(timer.Get());
103+
}
104+
105+
Cout << "Elapsed: " << allTimer.Get() << "\n";
106+
107+
Sort(times);
108+
times.erase(times.end() - times.size() / 3, times.end());
109+
110+
double sum = std::transform_reduce(times.cbegin(), times.cend(),
111+
.0, std::plus{}, [](auto t) { return std::log(t.MicroSeconds()); });
112+
113+
return std::exp(sum / times.size());
114+
}
115+
25116
int Main(int argc, const char *argv[])
26117
{
27118
Y_UNUSED(NUdf::GetStaticSymbols());
@@ -81,67 +172,83 @@ int Main(int argc, const char *argv[])
81172
.Add("StructType")
82173
.Add(members);
83174

84-
auto inputSpec1 = TSkiffInputSpec(TVector<NYT::TNode>{schema});
85-
auto outputSpec1 = TSkiffOutputSpec({NYT::TNode::CreateEntity()});
86-
auto genProgram = factory->MakePullListProgram(
87-
inputSpec1,
88-
outputSpec1,
89-
genSql,
90-
res.Has("pg") ? ETranslationMode::PG : ETranslationMode::SQL);
175+
auto inputGenSchema = TVector<NYT::TNode>{schema};
176+
auto inputGenStream = MakeGenInput(count);
177+
Cerr << "Input data size: " << inputGenStream.Size() << "\n";
178+
ETranslationMode isPgGen = res.Has("pg") ? ETranslationMode::PG : ETranslationMode::SQL;
179+
ETranslationMode isPgTest = res.Has("pt") ? ETranslationMode::PG : ETranslationMode::SQL;
180+
double normalizedTime;
181+
size_t inputBenchSize;
91182

92-
TStringStream stream;
93-
NSkiff::TUncheckedSkiffWriter writer{&stream};
94-
for (ui64 i = 0; i < count; ++i) {
95-
writer.WriteVariant16Tag(0);
96-
writer.WriteInt64(i);
97-
}
98-
writer.Finish();
99-
auto input1 = TStringStream(stream);
100-
Cerr << "Input data size: " << input1.Size() << "\n";
101-
auto handle1 = genProgram->Apply(&input1);
102-
TStringStream output1;
103-
handle1->Run(&output1);
104-
Cerr << "Generated data size: " << output1.Size() << "\n";
183+
if (blockEngineSettings == "disable") {
184+
TStringStream outputGenStream;
185+
auto outputGenSchema = RunGenSql<TSkiffOutputSpec>(
186+
factory, inputGenSchema, genSql, isPgGen,
187+
[&](const auto& program) {
188+
auto handle = program->Apply(&inputGenStream);
189+
handle->Run(&outputGenStream);
190+
Cerr << "Generated data size: " << outputGenStream.Size() << "\n";
191+
});
105192

106-
Cerr << "Dry run of test sql...\n";
107-
auto inputSpec2 = TSkiffInputSpec(genProgram->MakeOutputSchema());
108-
auto outputSpec2 = TYsonOutputSpec({NYT::TNode::CreateEntity()});
109-
auto testProgram = factory->MakePullListProgram(
110-
inputSpec2,
111-
outputSpec2,
112-
testSql,
113-
res.Has("pt") ? ETranslationMode::PG : ETranslationMode::SQL);
114-
auto input2 = TStringStream(output1);
115-
auto handle2 = testProgram->Apply(&input2);
116-
TStringStream output2;
117-
handle2->Run(&output2);
118-
if (showResults) {
119-
TStringInput in(output2.Str());
120-
NYson::ReformatYsonStream(&in, &Cerr, NYson::EYsonFormat::Pretty, NYson::EYsonType::ListFragment);
121-
}
193+
if (showResults) {
194+
auto inputResStream = TStringStream(outputGenStream);
195+
ShowResults<TSkiffInputSpec>(
196+
factory, {outputGenSchema}, testSql, isPgTest, &inputResStream);
197+
}
122198

123-
Cerr << "Run benchmark...\n";
124-
TVector<TDuration> times;
125-
TSimpleTimer allTimer;
126-
for (ui32 i = 0; i < repeats; ++i) {
127-
TSimpleTimer timer;
128-
auto input2 = TStringStream(output1);
129-
auto handle2 = testProgram->Apply(&input2);
130-
TNullOutput output2;
131-
handle2->Run(&output2);
132-
times.push_back(timer.Get());
133-
}
199+
inputBenchSize = outputGenStream.Size();
200+
normalizedTime = RunBenchmarks<TSkiffInputSpec, TSkiffOutputSpec>(
201+
factory, {outputGenSchema}, testSql, isPgTest, repeats,
202+
[&](const auto& program) {
203+
auto inputBorrowed = TStringStream(outputGenStream);
204+
auto handle = program->Apply(&inputBorrowed);
205+
TNullOutput output;
206+
handle->Run(&output);
207+
});
208+
} else {
209+
auto inputGenSpec = TSkiffInputSpec(inputGenSchema);
210+
auto outputGenSpec = TArrowOutputSpec({NYT::TNode::CreateEntity()});
211+
// XXX: <RunGenSql> cannot be used for this case, since all buffers
212+
// from the Datums in the obtained batches are owned by the worker's
213+
// allocator. Hence, the program (i.e. worker) object should be created
214+
// at the very beginning of the block, or at least prior to all the
215+
// temporary batch storages (mind outputGenStream below).
216+
auto program = factory->MakePullListProgram(
217+
inputGenSpec, outputGenSpec, genSql, isPgGen);
134218

135-
Cout << "Elapsed: " << allTimer.Get() << "\n";
136-
Sort(times);
137-
times.erase(times.end() - times.size() / 3, times.end());
138-
double s = 0;
139-
for (auto t : times) {
140-
s += std::log(t.MicroSeconds());
219+
auto handle = program->Apply(&inputGenStream);
220+
auto outputGenSchema = program->MakeOutputSchema();
221+
222+
TVector<arrow::compute::ExecBatch> outputGenStream;
223+
while (arrow::compute::ExecBatch* batch = handle->Fetch()) {
224+
outputGenStream.push_back(*batch);
225+
}
226+
227+
ui64 outputGenSize = std::transform_reduce(
228+
outputGenStream.cbegin(), outputGenStream.cend(),
229+
0l, std::plus{}, [](const auto& b) {
230+
return NYql::NUdf::GetSizeOfArrowExecBatchInBytes(b);
231+
});
232+
233+
Cerr << "Generated data size: " << outputGenSize << "\n";
234+
235+
if (showResults) {
236+
auto inputResStreamHolder = StreamFromVector(outputGenStream);
237+
auto inputResStream = inputResStreamHolder.Get();
238+
ShowResults<TArrowInputSpec>(
239+
factory, {outputGenSchema}, testSql, isPgTest, inputResStream);
240+
}
241+
242+
inputBenchSize = outputGenSize;
243+
normalizedTime = RunBenchmarks<TArrowInputSpec, TArrowOutputSpec>(
244+
factory, {outputGenSchema}, testSql, isPgTest, repeats,
245+
[&](const auto& program) {
246+
auto handle = program->Apply(StreamFromVector(outputGenStream));
247+
while (arrow::compute::ExecBatch* batch = handle->Fetch()) {}
248+
});
141249
}
142250

143-
double score = output1.Size() / std::exp(s / times.size());
144-
Cout << "Bench score: " << Prec(score, 4) << "\n";
251+
Cout << "Bench score: " << Prec(inputBenchSize / normalizedTime, 4) << "\n";
145252

146253
NLog::CleanupLogger();
147254
return 0;

ydb/library/yql/tools/purebench/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ PEERDIR(
2222
library/cpp/skiff
2323
library/cpp/yson
2424
ydb/library/yql/public/purecalc/io_specs/mkql
25+
ydb/library/yql/public/purecalc/io_specs/arrow
2526
ydb/library/yql/public/purecalc
2627
)
2728

0 commit comments

Comments
 (0)