Skip to content

Commit cc60f67

Browse files
committed
YQ-4161 support block writing in s3 (#16375)
1 parent 780b669 commit cc60f67

15 files changed

+1038
-293
lines changed

ydb/library/yql/dq/runtime/dq_async_output.cpp

Lines changed: 91 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ namespace {
1111

1212
class TDqAsyncOutputBuffer : public IDqAsyncOutputBuffer {
1313
struct TValueDesc {
14-
std::variant<NUdf::TUnboxedValue, NDqProto::TWatermark, NDqProto::TCheckpoint> Value;
14+
std::variant<NUdf::TUnboxedValue, NKikimr::NMiniKQL::TUnboxedValueVector, NDqProto::TWatermark, NDqProto::TCheckpoint> Value;
1515
ui64 EstimatedSize;
1616

1717
TValueDesc(NUdf::TUnboxedValue&& value, ui64 size)
@@ -20,6 +20,12 @@ class TDqAsyncOutputBuffer : public IDqAsyncOutputBuffer {
2020
{
2121
}
2222

23+
TValueDesc(NUdf::TUnboxedValue* values, ui32 count, ui64 size)
24+
: Value(NKikimr::NMiniKQL::TUnboxedValueVector(values, values + count))
25+
, EstimatedSize(size)
26+
{
27+
}
28+
2329
TValueDesc(NDqProto::TWatermark&& watermark, ui64 size)
2430
: Value(std::move(watermark))
2531
, EstimatedSize(size)
@@ -32,17 +38,24 @@ class TDqAsyncOutputBuffer : public IDqAsyncOutputBuffer {
3238
{
3339
}
3440

41+
bool HasValue() const {
42+
return std::holds_alternative<NUdf::TUnboxedValue>(Value) || std::holds_alternative<NKikimr::NMiniKQL::TUnboxedValueVector>(Value);
43+
}
44+
3545
TValueDesc(const TValueDesc&) = default;
3646
TValueDesc(TValueDesc&&) = default;
3747
};
3848

49+
static constexpr ui64 REESTIMATE_ROW_SIZE_PERIOD = 1024;
50+
3951
public:
4052
TDqOutputStats PushStats;
4153
TDqAsyncOutputBufferStats PopStats;
4254

4355
TDqAsyncOutputBuffer(ui64 outputIndex, const TString& type, NKikimr::NMiniKQL::TType* outputType, ui64 maxStoredBytes, TCollectStatsLevel level)
4456
: MaxStoredBytes(maxStoredBytes)
4557
, OutputType(outputType)
58+
, IsBlock(IsBlockType(OutputType))
4659
{
4760
PushStats.Level = level;
4861
PopStats.Level = level;
@@ -67,20 +80,11 @@ class TDqAsyncOutputBuffer : public IDqAsyncOutputBuffer {
6780
}
6881

6982
void Push(NUdf::TUnboxedValue&& value) override {
70-
if (ValuesPushed++ % 1000 == 0) {
71-
ReestimateRowBytes(value);
72-
}
73-
Y_ABORT_UNLESS(EstimatedRowBytes > 0);
74-
Values.emplace_back(std::move(value), EstimatedRowBytes);
75-
EstimatedStoredBytes += EstimatedRowBytes;
76-
77-
ReportChunkIn(1, EstimatedRowBytes);
83+
DoPush(std::move(value));
7884
}
7985

8086
void WidePush(NUdf::TUnboxedValue* values, ui32 count) override {
81-
Y_UNUSED(values);
82-
Y_UNUSED(count);
83-
YQL_ENSURE(false, "Wide stream is not supported");
87+
DoPush(values, count);
8488
}
8589

8690
void Push(NDqProto::TWatermark&& watermark) override {
@@ -115,7 +119,7 @@ class TDqAsyncOutputBuffer : public IDqAsyncOutputBuffer {
115119

116120
// Calc values count.
117121
for (auto iter = Values.cbegin(), end = Values.cend();
118-
usedBytes < bytes && iter != end && std::holds_alternative<NUdf::TUnboxedValue>(iter->Value);
122+
usedBytes < bytes && iter != end && iter->HasValue();
119123
++iter)
120124
{
121125
++valuesCount;
@@ -124,7 +128,15 @@ class TDqAsyncOutputBuffer : public IDqAsyncOutputBuffer {
124128

125129
// Reserve size and return data.
126130
while (valuesCount--) {
127-
batch.emplace_back(std::move(std::get<NUdf::TUnboxedValue>(Values.front().Value)));
131+
auto& value = Values.front().Value;
132+
if (std::holds_alternative<NUdf::TUnboxedValue>(value)) {
133+
batch.emplace_back(std::move(std::get<NUdf::TUnboxedValue>(value)));
134+
} else if (std::holds_alternative<NKikimr::NMiniKQL::TUnboxedValueVector>(value)) {
135+
auto& multiValue = std::get<NKikimr::NMiniKQL::TUnboxedValueVector>(value);
136+
batch.PushRow(multiValue.data(), multiValue.size());
137+
} else {
138+
YQL_ENSURE(false, "Unsupported output value");
139+
}
128140
Values.pop_front();
129141
}
130142
Y_ABORT_UNLESS(EstimatedStoredBytes >= usedBytes);
@@ -181,7 +193,7 @@ class TDqAsyncOutputBuffer : public IDqAsyncOutputBuffer {
181193
return false;
182194
}
183195
for (const TValueDesc& v : Values) {
184-
if (std::holds_alternative<NUdf::TUnboxedValue>(v.Value)) {
196+
if (v.HasValue()) {
185197
return false;
186198
}
187199
}
@@ -194,8 +206,47 @@ class TDqAsyncOutputBuffer : public IDqAsyncOutputBuffer {
194206
}
195207

196208
private:
209+
template <typename... TArgs>
210+
void DoPush(TArgs&&... args) {
211+
if (ValuesPushed++ % REESTIMATE_ROW_SIZE_PERIOD == 0) {
212+
ReestimateRowBytes(args...);
213+
}
214+
215+
Y_ABORT_UNLESS(EstimatedRowBytes > 0);
216+
EstimatedStoredBytes += EstimatedRowBytes;
217+
ReportChunkIn(GetRowsCount(args...), EstimatedRowBytes);
218+
219+
Values.emplace_back(std::forward<TArgs>(args)..., EstimatedRowBytes);
220+
}
221+
222+
ui64 GetRowsCount(const NUdf::TUnboxedValue& value) const {
223+
Y_UNUSED(value);
224+
return 1;
225+
}
226+
227+
ui64 GetRowsCount(const NUdf::TUnboxedValue* values, ui32 count) const {
228+
if (!IsBlock) {
229+
return 1;
230+
}
231+
return NKikimr::NMiniKQL::TArrowBlock::From(values[count - 1]).GetDatum().scalar_as<arrow::UInt64Scalar>().value;
232+
}
233+
197234
void ReestimateRowBytes(const NUdf::TUnboxedValue& value) {
198-
const ui64 valueSize = TDqDataSerializer::EstimateSize(value, OutputType);
235+
DoReestimateRowBytes(TDqDataSerializer::EstimateSize(value, OutputType));
236+
}
237+
238+
void ReestimateRowBytes(const NUdf::TUnboxedValue* values, ui32 count) {
239+
const auto* multiType = static_cast<NKikimr::NMiniKQL::TMultiType* const>(OutputType);
240+
YQL_ENSURE(multiType, "Expected multi type for wide output");
241+
242+
ui64 valueSize = 0;
243+
for (ui32 i = 0; i < count; ++i) {
244+
valueSize += TDqDataSerializer::EstimateSize(values[i], multiType->GetElementType(i));
245+
}
246+
DoReestimateRowBytes(valueSize);
247+
}
248+
249+
void DoReestimateRowBytes(ui64 valueSize) {
199250
if (EstimatedRowBytes) {
200251
EstimatedRowBytes = static_cast<ui64>(0.6 * valueSize + 0.4 * EstimatedRowBytes);
201252
} else {
@@ -235,9 +286,33 @@ class TDqAsyncOutputBuffer : public IDqAsyncOutputBuffer {
235286
}
236287
}
237288

289+
static bool IsBlockType(const NKikimr::NMiniKQL::TType* type) {
290+
if (!type->IsMulti()) {
291+
return false;
292+
}
293+
294+
const NKikimr::NMiniKQL::TMultiType* multiType = static_cast<const NKikimr::NMiniKQL::TMultiType*>(type);
295+
const ui32 width = multiType->GetElementsCount();
296+
if (!width) {
297+
return false;
298+
}
299+
300+
for (ui32 i = 0; i < width; i++) {
301+
if (!multiType->GetElementType(i)->IsBlock()) {
302+
return false;
303+
}
304+
}
305+
306+
const auto lengthType = static_cast<const NKikimr::NMiniKQL::TBlockType*>(multiType->GetElementType(width - 1));
307+
return lengthType->GetShape() == NKikimr::NMiniKQL::TBlockType::EShape::Scalar
308+
&& lengthType->GetItemType()->IsData()
309+
&& static_cast<const NKikimr::NMiniKQL::TDataType*>(lengthType->GetItemType())->GetDataSlot() == NUdf::EDataSlot::Uint64;
310+
}
311+
238312
private:
239313
const ui64 MaxStoredBytes;
240314
NKikimr::NMiniKQL::TType* const OutputType;
315+
const bool IsBlock = false;
241316
ui64 EstimatedStoredBytes = 0;
242317
ui64 ValuesPushed = 0;
243318
bool Finished = false;

ydb/library/yql/providers/s3/actors/yql_arrow_column_converters.cpp

Lines changed: 144 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -669,6 +669,28 @@ TColumnConverter BuildCustomConverter(const std::shared_ptr<arrow::DataType>& or
669669
}
670670
}
671671

672+
TColumnConverter ArrowComputeConvertor(const std::string& columnName, const std::shared_ptr<arrow::DataType>& sourceType, const std::shared_ptr<arrow::DataType>& targetType) {
673+
YQL_ENSURE(arrow::compute::CanCast(*sourceType, *targetType), "Can not cast column " << columnName << ", from source type " << sourceType->ToString() << " to target type " << targetType->ToString());
674+
return [targetType](const std::shared_ptr<arrow::Array>& value) {
675+
auto res = arrow::compute::Cast(*value, targetType);
676+
THROW_ARROW_NOT_OK(res.status());
677+
return std::move(res).ValueOrDie();
678+
};
679+
}
680+
681+
TColumnConverter YqlBlockTzDateToArrow(const std::string& columnName, const std::shared_ptr<arrow::DataType>& sourceType) {
682+
YQL_ENSURE(sourceType->id() == arrow::Type::STRUCT, "Yql Tz block shoud have struct type");
683+
YQL_ENSURE(sourceType->num_fields() == 2, "Yql Tz block shoud have two fields");
684+
return [columnName, sourceType](const std::shared_ptr<arrow::Array>& value) {
685+
YQL_ENSURE(value->type()->Equals(sourceType), "Unexpected block type: " << value->type()->ToString() << ", expected type: " << sourceType->ToString() << " in column: " << columnName);
686+
const auto structValue = std::static_pointer_cast<arrow::StructArray>(value);
687+
const auto dateField = structValue->field(0)->data()->Copy();
688+
dateField->null_count = structValue->null_count();
689+
dateField->buffers[0] = structValue->null_bitmap();
690+
return arrow::MakeArray(dateField);
691+
};
692+
}
693+
672694
}
673695

674696
namespace NYql::NDq {
@@ -698,11 +720,50 @@ TColumnConverter BuildColumnConverter(const std::string& columnName, const std::
698720
<< targetType->ToString() << ", got: " << originalType->ToString());
699721
}
700722

701-
return [targetType](const std::shared_ptr<arrow::Array>& value) {
702-
auto res = arrow::compute::Cast(*value, targetType);
703-
THROW_ARROW_NOT_OK(res.status());
704-
return std::move(res).ValueOrDie();
705-
};
723+
return ArrowComputeConvertor(columnName, originalType, targetType);
724+
}
725+
726+
TColumnConverter BuildOutputColumnConverter(const std::string& columnName, NKikimr::NMiniKQL::TType* columnType) {
727+
std::shared_ptr<arrow::DataType> yqlArrowType, s3OutputType;
728+
YQL_ENSURE(ConvertArrowType(columnType, yqlArrowType), "Got unsupported yql block type: " << *columnType << " in column " << columnName);
729+
YQL_ENSURE(S3ConvertArrowOutputType(columnType, s3OutputType), "Got unsupported s3 output block type: " << *columnType << " in column " << columnName);
730+
731+
if (columnType->IsOptional()) {
732+
columnType = AS_TYPE(TOptionalType, columnType)->GetItemType();
733+
}
734+
YQL_ENSURE(columnType->IsData(), "Allowed only data types for S3 output, but got: " << *columnType << " in column " << columnName);
735+
const auto slot = AS_TYPE(TDataType, columnType)->GetDataSlot();
736+
YQL_ENSURE(slot, "Got invalid data type " << *columnType << " in column " << columnName);
737+
738+
switch (*slot) {
739+
case NUdf::EDataSlot::Bool:
740+
case NUdf::EDataSlot::Int8:
741+
case NUdf::EDataSlot::Uint8:
742+
case NUdf::EDataSlot::Int16:
743+
case NUdf::EDataSlot::Uint16:
744+
case NUdf::EDataSlot::Int32:
745+
case NUdf::EDataSlot::Uint32:
746+
case NUdf::EDataSlot::Int64:
747+
case NUdf::EDataSlot::Uint64:
748+
case NUdf::EDataSlot::Float:
749+
case NUdf::EDataSlot::Double:
750+
case NUdf::EDataSlot::String:
751+
case NUdf::EDataSlot::Date:
752+
case NUdf::EDataSlot::Datetime:
753+
case NUdf::EDataSlot::Timestamp:
754+
return {};
755+
case NUdf::EDataSlot::Utf8:
756+
case NUdf::EDataSlot::Json:
757+
return ArrowComputeConvertor(columnName, yqlArrowType, s3OutputType);
758+
case NUdf::EDataSlot::TzDate:
759+
case NUdf::EDataSlot::TzDatetime:
760+
case NUdf::EDataSlot::TzTimestamp:
761+
return YqlBlockTzDateToArrow(columnName, yqlArrowType);
762+
default:
763+
YQL_ENSURE(false, "Got unsupported s3 output block type: " << *columnType << " in column " << columnName);
764+
}
765+
766+
return {};
706767
}
707768

708769
void BuildColumnConverters(std::shared_ptr<arrow::Schema> outputSchema, std::shared_ptr<arrow::Schema> dataSchema,
@@ -753,4 +814,82 @@ std::shared_ptr<arrow::RecordBatch> ConvertArrowColumns(std::shared_ptr<arrow::R
753814
return arrow::RecordBatch::Make(batch->schema(), batch->num_rows(), columns);
754815
}
755816

817+
// Type conversion same as in ClickHouseClient.SerializeFormat udf
818+
bool S3ConvertArrowOutputType(NUdf::EDataSlot slot, std::shared_ptr<arrow::DataType>& type) {
819+
switch (slot) {
820+
case NUdf::EDataSlot::Int8:
821+
type = arrow::int8();
822+
return true;
823+
case NUdf::EDataSlot::Bool:
824+
case NUdf::EDataSlot::Uint8:
825+
type = arrow::uint8();
826+
return true;
827+
case NUdf::EDataSlot::Int16:
828+
type = arrow::int16();
829+
return true;
830+
case NUdf::EDataSlot::Date:
831+
case NUdf::EDataSlot::TzDate:
832+
case NUdf::EDataSlot::Uint16:
833+
type = arrow::uint16();
834+
return true;
835+
case NUdf::EDataSlot::Int32:
836+
type = arrow::int32();
837+
return true;
838+
case NUdf::EDataSlot::Datetime:
839+
case NUdf::EDataSlot::TzDatetime:
840+
case NUdf::EDataSlot::Uint32:
841+
type = arrow::uint32();
842+
return true;
843+
case NUdf::EDataSlot::Int64:
844+
type = arrow::int64();
845+
return true;
846+
case NUdf::EDataSlot::Uint64:
847+
type = arrow::uint64();
848+
return true;
849+
case NUdf::EDataSlot::Float:
850+
type = arrow::float32();
851+
return true;
852+
case NUdf::EDataSlot::Double:
853+
type = arrow::float64();
854+
return true;
855+
case NUdf::EDataSlot::String:
856+
case NUdf::EDataSlot::Utf8:
857+
case NUdf::EDataSlot::Json:
858+
type = arrow::binary();
859+
return true;
860+
case NUdf::EDataSlot::Timestamp:
861+
case NUdf::EDataSlot::TzTimestamp:
862+
type = arrow::timestamp(arrow::TimeUnit::MICRO, "UTC");
863+
return true;
864+
default:
865+
break;
866+
}
867+
return false;
868+
}
869+
870+
bool S3ConvertArrowOutputType(TType* itemType, std::shared_ptr<arrow::DataType>& type) {
871+
if (itemType->IsOptional()) {
872+
itemType = AS_TYPE(TOptionalType, itemType)->GetItemType();
873+
}
874+
if (!itemType->IsData()) {
875+
return false;
876+
}
877+
878+
const auto slot = AS_TYPE(TDataType, itemType)->GetDataSlot();
879+
if (!slot) {
880+
return false;
881+
}
882+
883+
return S3ConvertArrowOutputType(*slot, type);
884+
}
885+
886+
void BuildOutputColumnConverters(const NKikimr::NMiniKQL::TStructType* outputStructType, std::vector<TColumnConverter>& columnConverters) {
887+
columnConverters.reserve(outputStructType->GetMembersCount());
888+
for (ui32 i = 0; i < outputStructType->GetMembersCount(); ++i) {
889+
auto* const type = outputStructType->GetMemberType(i);
890+
const std::string name(outputStructType->GetMemberName(i));
891+
columnConverters.emplace_back(BuildOutputColumnConverter(name, type));
892+
}
893+
}
894+
756895
} // namespace NYql::NDq

ydb/library/yql/providers/s3/actors/yql_arrow_column_converters.h

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
#pragma once
22

33
#include <yql/essentials/parser/pg_wrapper/interface/arrow.h>
4+
#include <yql/essentials/providers/common/schema/mkql/yql_mkql_schema.h>
45

5-
#include <ydb/library/yql/udfs/common/clickhouse/client/src/Formats/FormatSettings.h>
6+
namespace NDB {
7+
8+
struct FormatSettings;
9+
10+
} // namespace NDB
611

712
namespace NYql::NDq {
813

@@ -13,6 +18,10 @@ TColumnConverter BuildColumnConverter(
1318
NKikimr::NMiniKQL::TType* yqlType,
1419
const NDB::FormatSettings& formatSettings);
1520

21+
TColumnConverter BuildOutputColumnConverter(
22+
const std::string& columnName,
23+
NKikimr::NMiniKQL::TType* columnType);
24+
1625
void BuildColumnConverters(
1726
std::shared_ptr<arrow::Schema> outputSchema,
1827
std::shared_ptr<arrow::Schema> dataSchema,
@@ -25,4 +34,11 @@ std::shared_ptr<arrow::RecordBatch> ConvertArrowColumns(
2534
std::shared_ptr<arrow::RecordBatch> batch,
2635
std::vector<TColumnConverter>& columnConverters);
2736

37+
bool S3ConvertArrowOutputType(NUdf::EDataSlot slot, std::shared_ptr<arrow::DataType>& type);
38+
bool S3ConvertArrowOutputType(NKikimr::NMiniKQL::TType* itemType, std::shared_ptr<arrow::DataType>& type);
39+
40+
void BuildOutputColumnConverters(
41+
const NKikimr::NMiniKQL::TStructType* outputStructType,
42+
std::vector<TColumnConverter>& columnConverters);
43+
2844
} // namespace NYql::NDq

0 commit comments

Comments
 (0)