Skip to content

Commit 8f01128

Browse files
authored
Merge stable 25-1-analytics p2 (#20041)
2 parents 86be04f + f6693b7 commit 8f01128

File tree

222 files changed

+4832
-805
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

222 files changed

+4832
-805
lines changed

ydb/core/formats/arrow/program/functions.cpp

Lines changed: 169 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,176 @@
11
#include "functions.h"
22

33
#include <contrib/libs/apache/arrow/cpp/src/arrow/compute/api.h>
4+
#include <contrib/libs/apache/arrow/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h>
5+
#include <contrib/libs/apache/arrow/cpp/src/arrow/compute/kernels/codegen_internal.h>
6+
#include <contrib/libs/apache/arrow/cpp/src/arrow/compute/registry_internal.h>
47
#include <contrib/libs/apache/arrow/cpp/src/arrow/table.h>
58

69
namespace NKikimr::NArrow::NSSA {
10+
11+
namespace internal {
12+
13+
// Find the largest compatible primitive type for a primitive type.
14+
template <typename I, typename Enable = void>
15+
struct FindAccumulatorType {};
16+
17+
template <typename I>
18+
struct FindAccumulatorType<I, arrow::enable_if_boolean<I>> {
19+
using Type = arrow::UInt64Type;
20+
};
21+
22+
template <typename I>
23+
struct FindAccumulatorType<I, arrow::enable_if_signed_integer<I>> {
24+
using Type = arrow::Int64Type;
25+
};
26+
27+
template <typename I>
28+
struct FindAccumulatorType<I, arrow::enable_if_unsigned_integer<I>> {
29+
using Type = arrow::UInt64Type;
30+
};
31+
32+
template <typename I>
33+
struct FindAccumulatorType<I, arrow::enable_if_floating_point<I>> {
34+
using Type = arrow::DoubleType;
35+
};
36+
37+
template <>
38+
struct FindAccumulatorType<arrow::FloatType, void> {
39+
using Type = arrow::FloatType;
40+
};
41+
42+
template <typename ArrowType, arrow::compute::SimdLevel::type SimdLevel>
43+
struct SumImpl : public arrow::compute::ScalarAggregator {
44+
using ThisType = SumImpl<ArrowType, SimdLevel>;
45+
using CType = typename ArrowType::c_type;
46+
using SumType = typename FindAccumulatorType<ArrowType>::Type;
47+
using OutputType = typename arrow::TypeTraits<SumType>::ScalarType;
48+
49+
arrow::Status Consume(arrow::compute::KernelContext*, const arrow::compute::ExecBatch& batch) override {
50+
if (batch[0].is_array()) {
51+
const auto& data = batch[0].array();
52+
this->Count += data->length - data->GetNullCount();
53+
if (arrow::is_boolean_type<ArrowType>::value) {
54+
this->Sum +=
55+
static_cast<typename SumType::c_type>(arrow::BooleanArray(data).true_count());
56+
} else {
57+
this->Sum +=
58+
arrow::compute::detail::SumArray<CType, typename SumType::c_type, SimdLevel>(
59+
*data);
60+
}
61+
} else {
62+
const auto& data = *batch[0].scalar();
63+
this->Count += data.is_valid * batch.length;
64+
if (data.is_valid) {
65+
this->Sum += arrow::compute::internal::UnboxScalar<ArrowType>::Unbox(data) * batch.length;
66+
}
67+
}
68+
return arrow::Status::OK();
69+
}
70+
71+
arrow::Status MergeFrom(arrow::compute::KernelContext*, arrow::compute::KernelState&& src) override {
72+
const auto& other = arrow::checked_cast<const ThisType&>(src);
73+
this->Count += other.Count;
74+
this->Sum += other.Sum;
75+
return arrow::Status::OK();
76+
}
77+
78+
arrow::Status Finalize(arrow::compute::KernelContext*, arrow::Datum* out) override {
79+
if (this->Count < Options.min_count) {
80+
out->value = std::make_shared<OutputType>();
81+
} else {
82+
out->value = arrow::MakeScalar(this->Sum);
83+
}
84+
return arrow::Status::OK();
85+
}
86+
87+
size_t Count = 0;
88+
typename SumType::c_type Sum = 0;
89+
arrow::compute::ScalarAggregateOptions Options;
90+
};
91+
92+
template <typename ArrowType>
93+
struct SumImplDefault : public SumImpl<ArrowType, arrow::compute::SimdLevel::NONE> {
94+
explicit SumImplDefault(const arrow::compute::ScalarAggregateOptions& options) {
95+
this->Options = options;
96+
}
97+
};
98+
99+
void AddScalarAggKernels(arrow::compute::KernelInit init,
100+
const std::vector<std::shared_ptr<arrow::DataType>>& types,
101+
std::shared_ptr<arrow::DataType> out_ty,
102+
arrow::compute::ScalarAggregateFunction* func) {
103+
for (const auto& ty : types) {
104+
// scalar[InT] -> scalar[OutT]
105+
auto sig = arrow::compute::KernelSignature::Make({arrow::compute::InputType::Scalar(ty)}, arrow::ValueDescr::Scalar(out_ty));
106+
AddAggKernel(std::move(sig), init, func, arrow::compute::SimdLevel::NONE);
107+
}
108+
}
109+
110+
void AddArrayScalarAggKernels(arrow::compute::KernelInit init,
111+
const std::vector<std::shared_ptr<arrow::DataType>>& types,
112+
std::shared_ptr<arrow::DataType> out_ty,
113+
arrow::compute::ScalarAggregateFunction* func,
114+
arrow::compute::SimdLevel::type simd_level = arrow::compute::SimdLevel::NONE) {
115+
arrow::compute::aggregate::AddBasicAggKernels(init, types, out_ty, func, simd_level);
116+
AddScalarAggKernels(init, types, out_ty, func);
117+
}
118+
119+
arrow::Result<std::unique_ptr<arrow::compute::KernelState>> SumInit(arrow::compute::KernelContext* ctx,
120+
const arrow::compute::KernelInitArgs& args) {
121+
arrow::compute::aggregate::SumLikeInit<SumImplDefault> visitor(
122+
ctx, *args.inputs[0].type,
123+
static_cast<const arrow::compute::ScalarAggregateOptions&>(*args.options));
124+
return visitor.Create();
125+
}
126+
127+
static std::unique_ptr<arrow::compute::FunctionRegistry> CreateCustomRegistry() {
128+
arrow::compute::FunctionRegistry* defaultRegistry = arrow::compute::GetFunctionRegistry();
129+
auto registry = arrow::compute::FunctionRegistry::Make();
130+
for (const auto& func : defaultRegistry->GetFunctionNames()) {
131+
if (func == "sum") {
132+
auto aggregateFunc = dynamic_cast<arrow::compute::ScalarAggregateFunction*>(defaultRegistry->GetFunction(func)->get());
133+
if (!aggregateFunc) {
134+
DCHECK_OK(registry->AddFunction(*defaultRegistry->GetFunction(func)));
135+
continue;
136+
}
137+
arrow::compute::ScalarAggregateFunction newFunc(func, aggregateFunc->arity(), &aggregateFunc->doc(), aggregateFunc->default_options());
138+
for (const arrow::compute::ScalarAggregateKernel* kernel : aggregateFunc->kernels()) {
139+
auto shouldReplaceKernel = [](const arrow::compute::ScalarAggregateKernel& kernel) {
140+
const auto& params = kernel.signature->in_types();
141+
if (params.empty()) {
142+
return false;
143+
}
144+
145+
if (params[0].kind() == arrow::compute::InputType::Kind::EXACT_TYPE) {
146+
auto type = params[0].type();
147+
return type->id() == arrow::Type::FLOAT;
148+
}
149+
150+
return false;
151+
};
152+
153+
if (shouldReplaceKernel(*kernel)) {
154+
AddArrayScalarAggKernels(SumInit, {arrow::float32()}, arrow::float32(), &newFunc);
155+
} else {
156+
DCHECK_OK(newFunc.AddKernel(*kernel));
157+
}
158+
}
159+
DCHECK_OK(registry->AddFunction(std::make_shared<arrow::compute::ScalarAggregateFunction>(std::move(newFunc))));
160+
} else {
161+
DCHECK_OK(registry->AddFunction(*defaultRegistry->GetFunction(func)));
162+
}
163+
}
164+
165+
return registry;
166+
}
167+
arrow::compute::FunctionRegistry* GetCustomFunctionRegistry() {
168+
static auto registry = internal::CreateCustomRegistry();
169+
return registry.get();
170+
}
171+
172+
} // namespace internal
173+
7174
TConclusion<arrow::Datum> TInternalFunction::Call(
8175
const TExecFunctionContext& context, const std::shared_ptr<TAccessorsCollection>& resources) const {
9176
auto funcNames = GetRegistryFunctionNames();
@@ -16,7 +183,8 @@ TConclusion<arrow::Datum> TInternalFunction::Call(
16183
if (GetContext() && GetContext()->func_registry()->GetFunction(funcName).ok()) {
17184
result = arrow::compute::CallFunction(funcName, *arguments, FunctionOptions.get(), GetContext());
18185
} else {
19-
result = arrow::compute::CallFunction(funcName, *arguments, FunctionOptions.get());
186+
arrow::compute::ExecContext defaultContext(arrow::default_memory_pool(), nullptr, internal::GetCustomFunctionRegistry());
187+
result = arrow::compute::CallFunction(funcName, *arguments, FunctionOptions.get(), &defaultContext);
20188
}
21189

22190
if (result.ok() && funcName == "count"sv) {

ydb/core/formats/arrow/program/ya.make

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,4 +57,8 @@ GENERATE_ENUM_SERIALIZATION(execution.h)
5757

5858
YQL_LAST_ABI_VERSION()
5959

60+
CFLAGS(
61+
-Wno-unused-parameter
62+
)
63+
6064
END()

ydb/core/grpc_services/query/rpc_fetch_script_results.cpp

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -125,18 +125,14 @@ class TFetchScriptResultsRPC : public TRpcRequestActor<TFetchScriptResultsRPC, T
125125
}
126126

127127
bool GetExecutionIdFromRequest() {
128-
try {
129-
TMaybe<TString> executionId = NKqp::ScriptExecutionIdFromOperation(GetProtoRequest()->operation_id());
130-
if (!executionId) {
131-
Reply(Ydb::StatusIds::BAD_REQUEST, "Invalid operation id");
132-
return false;
133-
}
134-
ExecutionId = *executionId;
135-
return true;
136-
} catch (const std::exception& ex) {
137-
Reply(Ydb::StatusIds::BAD_REQUEST, TStringBuilder() << "Invalid operation id: " << ex.what());
128+
TString error;
129+
TMaybe<TString> executionId = NKqp::ScriptExecutionIdFromOperation(GetProtoRequest()->operation_id(), error);
130+
if (!executionId) {
131+
Reply(Ydb::StatusIds::BAD_REQUEST, error);
138132
return false;
139133
}
134+
ExecutionId = *executionId;
135+
return true;
140136
}
141137

142138
private:

ydb/core/kqp/common/kqp_script_executions.cpp

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
#include "kqp_script_executions.h"
22

3+
#include <util/string/builder.h>
4+
35
#include <ydb/public/sdk/cpp/src/library/operation_id/protos/operation_id.pb.h>
46

57
namespace NKikimr::NKqp {
@@ -11,21 +13,29 @@ TString ScriptExecutionOperationFromExecutionId(const TString& executionId) {
1113
return NOperationId::ProtoToString(operationId);
1214
}
1315

14-
TMaybe<TString> ScriptExecutionIdFromOperation(const TString& operationId) {
16+
TMaybe<TString> ScriptExecutionIdFromOperation(const TString& operationId, TString& error) try {
1517
NOperationId::TOperationId operation(operationId);
16-
return ScriptExecutionIdFromOperation(operation);
18+
return ScriptExecutionIdFromOperation(operation, error);
19+
} catch (const std::exception& ex) {
20+
error = TStringBuilder() << "Invalid operation id: " << ex.what();
21+
return Nothing();
1722
}
1823

19-
TMaybe<TString> ScriptExecutionIdFromOperation(const NOperationId::TOperationId& operationId) {
24+
TMaybe<TString> ScriptExecutionIdFromOperation(const NOperationId::TOperationId& operationId, TString& error) try {
2025
if (operationId.GetKind() != NOperationId::TOperationId::SCRIPT_EXECUTION) {
26+
error = TStringBuilder() << "Invalid operation id, expected SCRIPT_EXECUTION = " << static_cast<int>(NOperationId::TOperationId::SCRIPT_EXECUTION) << " kind, got " << static_cast<int>(operationId.GetKind());
2127
return Nothing();
2228
}
2329

2430
const auto& values = operationId.GetValue("id");
2531
if (values.empty() || !values[0]) {
32+
error = TStringBuilder() << "Invalid operation id, please specify key 'id'";
2633
return Nothing();
2734
}
2835
return TString{*values[0]};
36+
} catch (const std::exception& ex) {
37+
error = TStringBuilder() << "Invalid operation id: " << ex.what();
38+
return Nothing();
2939
}
3040

3141
} // namespace NKikimr::NKqp

ydb/core/kqp/common/kqp_script_executions.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
namespace NKikimr::NKqp {
99

1010
TString ScriptExecutionOperationFromExecutionId(const TString& executionId);
11-
TMaybe<TString> ScriptExecutionIdFromOperation(const TString& operationId);
12-
TMaybe<TString> ScriptExecutionIdFromOperation(const NOperationId::TOperationId& operationId);
11+
TMaybe<TString> ScriptExecutionIdFromOperation(const TString& operationId, TString& error);
12+
TMaybe<TString> ScriptExecutionIdFromOperation(const NOperationId::TOperationId& operationId, TString& error);
1313

1414
} // namespace NKikimr::NKqp

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
8888
std::atomic<ui64> MinChannelBufferSize = 0;
8989
std::atomic<ui64> MinMemAllocSize = 1_MB;
9090
std::atomic<ui64> MinMemFreeSize = 32_MB;
91+
std::atomic<ui64> ChannelChunkSizeLimit = 48_MB;
9192

9293
public:
9394
TKqpCaFactory(const NKikimrConfig::TTableServiceConfig::TResourceManager& config,
@@ -106,6 +107,7 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
106107
MkqlLightProgramMemoryLimit.store(config.GetMkqlLightProgramMemoryLimit());
107108
MkqlHeavyProgramMemoryLimit.store(config.GetMkqlHeavyProgramMemoryLimit());
108109
MinChannelBufferSize.store(config.GetMinChannelBufferSize());
110+
ChannelChunkSizeLimit.store(config.GetChannelChunkSizeLimit());
109111
MinMemAllocSize.store(config.GetMinMemAllocSize());
110112
MinMemFreeSize.store(config.GetMinMemFreeSize());
111113
}
@@ -142,6 +144,7 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
142144

143145
memoryLimits.ChannelBufferSize = std::max<ui32>(estimation.ChannelBufferMemoryLimit / std::max<ui32>(1, inputChannelsCount), MinChannelBufferSize.load());
144146
memoryLimits.OutputChunkMaxSize = args.OutputChunkMaxSize;
147+
memoryLimits.ChunkSizeLimit = ChannelChunkSizeLimit.load();
145148
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "channel_info")
146149
("ch_size", estimation.ChannelBufferMemoryLimit)
147150
("ch_count", estimation.ChannelBuffersCount)

ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ static constexpr ui64 MAX_SHARD_RETRIES = 5; // retry after: 0, 250, 500, 1000
2121
static constexpr ui64 MAX_TOTAL_SHARD_RETRIES = 20;
2222
static constexpr ui64 MAX_SHARD_RESOLVES = 3;
2323

24+
constexpr TDuration REGISTRATION_TIMEOUT = TDuration::Seconds(60);
25+
constexpr TDuration PING_PERIOD = TDuration::Seconds(30);
26+
2427
} // anonymous namespace
2528

2629
TKqpScanFetcherActor::TKqpScanFetcherActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TComputeRuntimeSettings& settings,
@@ -79,16 +82,18 @@ void TKqpScanFetcherActor::Bootstrap() {
7982
auto& state = PendingShards.emplace_back(TShardState(read.GetShardId()));
8083
state.Ranges = BuildSerializedTableRanges(read);
8184
}
85+
RegistrationStartTime = Now();
8286
for (auto&& c : ComputeActorIds) {
8387
Sender<TEvScanExchange::TEvRegisterFetcher>().SendTo(c);
8488
}
8589
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "bootstrap")("compute", ComputeActorIds.size())("shards", PendingShards.size());
8690
StartTableScan();
8791
Become(&TKqpScanFetcherActor::StateFunc);
88-
Schedule(TDuration::Seconds(30), new NActors::TEvents::TEvWakeup());
92+
Schedule(PING_PERIOD, new NActors::TEvents::TEvWakeup());
8993
}
9094

9195
void TKqpScanFetcherActor::HandleExecute(TEvScanExchange::TEvAckData::TPtr& ev) {
96+
RegistrationFinished = true;
9297
AFL_ENSURE(ev->Get()->GetFreeSpace());
9398
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "AckDataFromCompute")("self_id", SelfId())("scan_id", ScanId)(
9499
"packs_to_send", InFlightComputes.GetPacksToSendCount())("from", ev->Sender)("shards remain", PendingShards.size())(
@@ -697,8 +702,18 @@ void TKqpScanFetcherActor::CheckFinish() {
697702
}
698703

699704
void TKqpScanFetcherActor::HandleExecute(NActors::TEvents::TEvWakeup::TPtr&) {
700-
InFlightShards.PingAllScanners();
701-
Schedule(TDuration::Seconds(30), new NActors::TEvents::TEvWakeup());
705+
if (RegistrationFinished) {
706+
InFlightShards.PingAllScanners();
707+
} else if (Now() - RegistrationStartTime > REGISTRATION_TIMEOUT) {
708+
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "TEvWakeup")("info", "Abort fetcher due to Registration timeout");
709+
InFlightShards.AbortAllScanners("Abort fetcher due to Registration timeout");
710+
TIssues issues;
711+
issues.AddIssue(TIssue("Abort fetcher due to Registration timeout"));
712+
SendGlobalFail(NDqProto::COMPUTE_STATE_FAILURE, NYql::NDqProto::StatusIds::INTERNAL_ERROR, issues);
713+
PassAway();
714+
return;
715+
}
716+
Schedule(PING_PERIOD, new NActors::TEvents::TEvWakeup());
702717
}
703718

704719
} // namespace NKikimr::NKqp::NScanPrivate

ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,8 @@ class TKqpScanFetcherActor: public NActors::TActorBootstrapped<TKqpScanFetcherAc
189189
std::set<ui32> TrackingNodes;
190190
ui32 MaxInFlight = 1024;
191191
bool IsAggregationRequest = false;
192+
bool RegistrationFinished = false;
193+
TInstant RegistrationStartTime;
192194
};
193195

194196
}

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1100,6 +1100,14 @@ class TKqpExecuterBase : public TActor<TDerived> {
11001100
structuredToken = NYql::CreateStructuredTokenParser(externalSource.GetAuthInfo()).ToBuilder().ReplaceReferences(SecureParams).ToJson();
11011101
}
11021102

1103+
ui64 selfNodeIdx = 0;
1104+
for (size_t i = 0; i < resourceSnapshot.size(); ++i) {
1105+
if (resourceSnapshot[i].GetNodeId() == SelfId().NodeId()) {
1106+
selfNodeIdx = i;
1107+
break;
1108+
}
1109+
}
1110+
11031111
TVector<ui64> tasksIds;
11041112

11051113
// generate all tasks
@@ -1119,7 +1127,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
11191127
if (resourceSnapshot.empty()) {
11201128
task.Meta.Type = TTaskMeta::TTaskType::Compute;
11211129
} else {
1122-
task.Meta.NodeId = resourceSnapshot[i % resourceSnapshot.size()].GetNodeId();
1130+
task.Meta.NodeId = resourceSnapshot[(selfNodeIdx + i) % resourceSnapshot.size()].GetNodeId();
11231131
task.Meta.Type = TTaskMeta::TTaskType::Scan;
11241132
}
11251133

0 commit comments

Comments
 (0)