Skip to content

Commit 27828e0

Browse files
committed
Added serialization and deserialization functions to block state
Added serde for state in block aggregation for spilling. commit_hash:44233e34f49c9722849a69ad7c49a2b87e65c75d
1 parent 0ce733c commit 27828e0

File tree

7 files changed

+127
-9
lines changed

7 files changed

+127
-9
lines changed

yql/essentials/minikql/comp_nodes/mkql_block_agg_count.cpp

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,16 @@ class TCountAllAggregator<TFinalizeKeysTag> : public TFinalizeKeysTag::TBase {
148148
}
149149
}
150150

151+
void SerializeState(void* state, NUdf::TOutputBuffer& buffer) final {
152+
auto typedState = static_cast<TState*>(state);
153+
buffer.PushNumber(typedState->Count_);
154+
}
155+
156+
void DeserializeState(void* state, NUdf::TInputBuffer& buffer) final {
157+
auto typedState = static_cast<TState*>(state);
158+
buffer.PopNumber(typedState->Count_);
159+
}
160+
151161
std::unique_ptr<IAggColumnBuilder> MakeResultBuilder(ui64 size) final {
152162
return std::make_unique<TColumnBuilder>(size, Ctx_);
153163
}
@@ -417,6 +427,6 @@ std::unique_ptr<IBlockAggregatorFactory> MakeBlockCountAllFactory() {
417427
std::unique_ptr<IBlockAggregatorFactory> MakeBlockCountFactory() {
418428
return std::make_unique<TBlockCountFactory>();
419429
}
420-
430+
421431
}
422432
}

yql/essentials/minikql/comp_nodes/mkql_block_agg_factory.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include <yql/essentials/minikql/computation/mkql_computation_node.h>
44
#include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
5+
#include <yql/essentials/minikql/computation/mkql_block_builder.h>
56

67
namespace NKikimr {
78
namespace NMiniKQL {
@@ -63,6 +64,10 @@ class IBlockAggregatorFinalizeKeys : public IBlockAggregatorBase {
6364

6465
virtual std::unique_ptr<IAggColumnBuilder> MakeResultBuilder(ui64 size) = 0;
6566

67+
virtual void SerializeState(void* state, NUdf::TOutputBuffer& buffer) = 0;
68+
69+
virtual void DeserializeState(void* state, NUdf::TInputBuffer& buffer) = 0;
70+
6671
explicit IBlockAggregatorFinalizeKeys(ui32 stateSize)
6772
: IBlockAggregatorBase(stateSize)
6873
{}

yql/essentials/minikql/comp_nodes/mkql_block_agg_minmax.cpp

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <yql/essentials/minikql/computation/mkql_block_builder.h>
99
#include <yql/essentials/minikql/computation/mkql_block_reader.h>
1010
#include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
11+
#include <yql/essentials/minikql/computation/mkql_computation_node_pack.h>
1112

1213
#include <yql/essentials/minikql/arrow/arrow_defs.h>
1314
#include <yql/essentials/minikql/arrow/arrow_util.h>
@@ -352,6 +353,7 @@ class TMinMaxBlockGenericAggregator<TFinalizeKeysTag, IsMin> : public TFinalizeK
352353
, Reader_(MakeBlockReader(TTypeInfoHelper(), type))
353354
, Converter_(MakeBlockItemConverter(TTypeInfoHelper(), type, ctx.Builder->GetPgBuilder()))
354355
, Compare_(TBlockTypeHelper().MakeComparator(type))
356+
, Packer_(false, type)
355357
{
356358
}
357359

@@ -373,6 +375,16 @@ class TMinMaxBlockGenericAggregator<TFinalizeKeysTag, IsMin> : public TFinalizeK
373375
PushValueToState<IsMin>(typedState, datum, row, *Reader_, *Converter_, *Compare_, Ctx_);
374376
}
375377

378+
void SerializeState(void* state, NUdf::TOutputBuffer& buffer) final {
379+
auto typedState = static_cast<TGenericState*>(state);
380+
buffer.PushString(Packer_.Pack(*typedState));
381+
}
382+
383+
void DeserializeState(void* state, NUdf::TInputBuffer& buffer) final {
384+
auto typedState = static_cast<TGenericState*>(state);
385+
*typedState = Packer_.Unpack(buffer.PopString(), Ctx_.HolderFactory).Release();
386+
}
387+
376388
std::unique_ptr<IAggColumnBuilder> MakeResultBuilder(ui64 size) final {
377389
return std::make_unique<TGenericColumnBuilder>(size, Type_, Ctx_);
378390
}
@@ -383,6 +395,7 @@ class TMinMaxBlockGenericAggregator<TFinalizeKeysTag, IsMin> : public TFinalizeK
383395
const std::unique_ptr<IBlockReader> Reader_;
384396
const std::unique_ptr<IBlockItemConverter> Converter_;
385397
const NYql::NUdf::IBlockItemComparator::TPtr Compare_;
398+
const TValuePacker Packer_;
386399
};
387400

388401
template <typename TStringType, bool IsMin>
@@ -605,6 +618,17 @@ class TMinMaxBlockStringAggregator<TFinalizeKeysTag, TStringType, IsMin> : publi
605618
PushValueToState<TStringType, IsMin>(typedState, datum, row);
606619
}
607620

621+
void SerializeState(void* state, NUdf::TOutputBuffer& buffer) final {
622+
auto typedState = static_cast<TGenericState*>(state);
623+
buffer.PushString(typedState->AsStringRef());
624+
}
625+
626+
void DeserializeState(void* state, NUdf::TInputBuffer& buffer) final {
627+
auto typedState = static_cast<TGenericState*>(state);
628+
629+
*typedState = std::move(MakeString(buffer.PopString()));
630+
}
631+
608632
std::unique_ptr<IAggColumnBuilder> MakeResultBuilder(ui64 size) final {
609633
return std::make_unique<TGenericColumnBuilder>(size, Type_, Ctx_);
610634
}
@@ -834,6 +858,24 @@ class TMinMaxBlockFixedAggregator<TFinalizeKeysTag, IsNullable, IsScalar, TIn, I
834858
PushValueToState<IsNullable, IsScalar, TIn, IsMin>(typedState.Get(), datum, row);
835859
}
836860

861+
void SerializeState(void* state, NUdf::TOutputBuffer& buffer) final {
862+
auto typedState = MakeStateWrapper<TStateType>(state);
863+
if constexpr (IsNullable) {
864+
buffer.PushNumber(typedState->IsValid);
865+
}
866+
buffer.PushNumber(typedState->Value);
867+
}
868+
869+
void DeserializeState(void* state, NUdf::TInputBuffer& buffer) final {
870+
auto typedState = MakeStateWrapper<TStateType>(state);
871+
872+
buffer.PopNumber(typedState->Value);
873+
874+
if constexpr (IsNullable) {
875+
buffer.PopNumber(typedState->IsValid);
876+
}
877+
}
878+
837879
std::unique_ptr<IAggColumnBuilder> MakeResultBuilder(ui64 size) final {
838880
return std::make_unique<TColumnBuilder<IsNullable, TIn, IsMin>>(size, Type_, Ctx_);
839881
}

yql/essentials/minikql/comp_nodes/mkql_block_agg_some.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
#include <yql/essentials/minikql/computation/mkql_block_reader.h>
77
#include <yql/essentials/minikql/computation/mkql_block_builder.h>
88

9+
#include <yql/essentials/minikql/computation/mkql_computation_node_pack.h>
10+
911
namespace NKikimr {
1012
namespace NMiniKQL {
1113

@@ -182,6 +184,7 @@ class TSomeBlockGenericAggregator<TFinalizeKeysTag> : public TFinalizeKeysTag::T
182184
, Type_(type)
183185
, Reader_(MakeBlockReader(TTypeInfoHelper(), type))
184186
, Converter_(MakeBlockItemConverter(TTypeInfoHelper(), type, ctx.Builder->GetPgBuilder()))
187+
, Packer_(false, type)
185188
{
186189
}
187190

@@ -207,6 +210,16 @@ class TSomeBlockGenericAggregator<TFinalizeKeysTag> : public TFinalizeKeysTag::T
207210
PushValueToState(typedState, datum, row, *Reader_, *Converter_, Ctx_);
208211
}
209212

213+
void SerializeState(void* state, NUdf::TOutputBuffer& buffer) final {
214+
auto typedState = static_cast<TGenericState*>(state);
215+
buffer.PushString(Packer_.Pack(*typedState));
216+
}
217+
218+
void DeserializeState(void* state, NUdf::TInputBuffer& buffer) final {
219+
auto typedState = static_cast<TGenericState*>(state);
220+
*typedState = Packer_.Unpack(buffer.PopString(), Ctx_.HolderFactory).Release();
221+
}
222+
210223
std::unique_ptr<IAggColumnBuilder> MakeResultBuilder(ui64 size) final {
211224
return std::make_unique<TGenericColumnBuilder>(size, Type_, Ctx_);
212225
}
@@ -216,6 +229,7 @@ class TSomeBlockGenericAggregator<TFinalizeKeysTag> : public TFinalizeKeysTag::T
216229
TType* const Type_;
217230
const std::unique_ptr<IBlockReader> Reader_;
218231
const std::unique_ptr<IBlockItemConverter> Converter_;
232+
TValuePacker Packer_;
219233
};
220234

221235
template <typename TTag>

yql/essentials/minikql/comp_nodes/mkql_block_agg_sum.cpp

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,24 @@ class TSumBlockAggregator<TFinalizeKeysTag, IsNullable, IsScalar, TIn, TSum> : p
349349
PushValueToState<IsNullable, IsScalar, TIn, TSum>(typedState.Get(), datum, row);
350350
}
351351

352+
void SerializeState(void* state, NUdf::TOutputBuffer& buffer) final {
353+
auto typedState = MakeStateWrapper<TStateType>(state);
354+
if constexpr (IsNullable) {
355+
buffer.PushNumber(typedState->IsValid_);
356+
}
357+
buffer.PushNumber(typedState->Sum_);
358+
}
359+
360+
void DeserializeState(void* state, NUdf::TInputBuffer& buffer) final {
361+
auto typedState = MakeStateWrapper<TStateType>(state);
362+
363+
buffer.PopNumber(typedState->Sum_);
364+
365+
if constexpr (IsNullable) {
366+
buffer.PopNumber(typedState->IsValid_);
367+
}
368+
}
369+
352370
std::unique_ptr<IAggColumnBuilder> MakeResultBuilder(ui64 size) final {
353371
return std::make_unique<TSumColumnBuilder<IsNullable, TSum>>(size, DataType_, Ctx_);
354372
}
@@ -574,6 +592,18 @@ class TAvgBlockAggregatorOverState : public TFinalizeKeysTag::TBase {
574592
}
575593
}
576594

595+
void SerializeState(void* state, NUdf::TOutputBuffer& buffer) final {
596+
auto typedState = MakeStateWrapper<TAvgState<TOut>>(state);
597+
buffer.PushNumber(typedState->Sum_);
598+
buffer.PushNumber(typedState->Count_);
599+
}
600+
601+
void DeserializeState(void* state, NUdf::TInputBuffer& buffer) final {
602+
auto typedState = MakeStateWrapper<TAvgState<TOut>>(state);
603+
buffer.PopNumber(typedState->Count_);
604+
buffer.PopNumber(typedState->Sum_);
605+
}
606+
577607
std::unique_ptr<IAggColumnBuilder> MakeResultBuilder(ui64 size) final {
578608
return std::make_unique<TAvgResultColumnBuilder<TOut>>(size, Ctx_);
579609
}

yql/essentials/parser/pg_wrapper/arrow.h

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -339,11 +339,11 @@ struct TGenericExec {
339339
if constexpr (!TArgsPolicy::VarArgs) {
340340
if (TArgsPolicy::IsFixedArg.size() == 2) {
341341
if (batch.values[0].is_scalar()) {
342-
return Dispatch3<HasScalars, HasNulls, EScalarArgBinary::First>(batch, length, state, builder);
342+
return Dispatch3<HasScalars, HasNulls, EScalarArgBinary::First>(batch, length, state, builder);
343343
}
344344

345345
if (batch.values[1].is_scalar()) {
346-
return Dispatch3<HasScalars, HasNulls, EScalarArgBinary::Second>(batch, length, state, builder);
346+
return Dispatch3<HasScalars, HasNulls, EScalarArgBinary::Second>(batch, length, state, builder);
347347
}
348348
}
349349
}
@@ -378,8 +378,8 @@ struct TGenericExec {
378378
if (!constexpr_for_tuple([&](auto const& j, auto const& v) {
379379
NullableDatum d;
380380
if (HasScalars && (
381-
(ScalarArgBinary == EScalarArgBinary::First && j == 0) ||
382-
(ScalarArgBinary == EScalarArgBinary::Second && j == 1) ||
381+
(ScalarArgBinary == EScalarArgBinary::First && j == 0) ||
382+
(ScalarArgBinary == EScalarArgBinary::Second && j == 1) ||
383383
inputArgsAccessor.IsScalar[j])) {
384384
d = inputArgsAccessor.Scalars[j];
385385
} else {
@@ -401,7 +401,7 @@ struct TGenericExec {
401401
}
402402

403403
fcinfo->args[j] = d;
404-
return true;
404+
return true;
405405
}, TArgsPolicy::IsFixedArg)) {
406406
if constexpr (IsFixedResult) {
407407
fixedResultValidMask[i] = 0;
@@ -627,7 +627,7 @@ class TGenericAgg {
627627
if (!HasInitValue && IsTransStrict) {
628628
Y_ENSURE(AggDesc_.ArgTypes.size() == 1);
629629
}
630-
630+
631631
const auto& transDesc = NPg::LookupProc(AggDesc_.TransFuncId);
632632
for (ui32 i = 1; i < transDesc.ArgTypes.size(); ++i) {
633633
IsFixedArg_.push_back(NPg::LookupType(transDesc.ArgTypes[i]).PassByValue);
@@ -925,7 +925,7 @@ SkipCall:;
925925
InputArgsAccessor_.Bind(Values_, 1);
926926
BatchNum_ = batchNum;
927927
}
928-
928+
929929
void InitKey(void* state, ui64 batchNum, const NKikimr::NUdf::TUnboxedValue* columns, ui64 row) final {
930930
new(state) NullableDatum();
931931
auto typedState = (NullableDatum*)state;
@@ -1160,7 +1160,7 @@ SkipCall:;
11601160
combineCallInfo->args[0] = *typedState;
11611161
combineCallInfo->args[1] = deser;
11621162
auto ret = this->CombineFunc_(combineCallInfo);
1163-
if constexpr (!HasDeserialize) {
1163+
if constexpr (!HasDeserialize) {
11641164
if (!combineCallInfo->isnull && ret == d.value) {
11651165
typedState->isnull = false;
11661166
typedState->value = CloneDatumToAggContext<IsTransTypeFixed>(d.value, this->TransTypeLen_);
@@ -1172,6 +1172,18 @@ SkipCall:;
11721172
SaveToAggContext<IsTransTypeFixed>(*typedState, this->TransTypeLen_);
11731173
}
11741174

1175+
void SerializeState(void* state, NUdf::TOutputBuffer& buffer) final {
1176+
Y_ENSURE(false, "Unimplemented");
1177+
Y_UNUSED(state);
1178+
Y_UNUSED(buffer);
1179+
}
1180+
1181+
void DeserializeState(void* state, NUdf::TInputBuffer& buffer) final {
1182+
Y_ENSURE(false, "Unimplemented");
1183+
Y_UNUSED(state);
1184+
Y_UNUSED(buffer);
1185+
}
1186+
11751187
std::unique_ptr<NKikimr::NMiniKQL::IAggColumnBuilder> MakeResultBuilder(ui64 size) final {
11761188
auto typeLen = NPg::LookupType(FinalType_).TypeLen;
11771189
if constexpr (IsFinalTypeFixed) {

yql/essentials/public/udf/arrow/block_io_buffer.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@ class TInputBuffer {
2020
return c;
2121
}
2222

23+
template <typename T>
24+
void PopNumber(T& result) {
25+
result = PopNumber<T>();
26+
}
27+
2328
template <typename T>
2429
T PopNumber() {
2530
Ensure(sizeof(T));

0 commit comments

Comments
 (0)