Skip to content

Commit 49e62b4

Browse files
Handle spilling errors correctly (#7435)
1 parent ee6b772 commit 49e62b4

25 files changed

+145
-83
lines changed

ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ using namespace NYql::NDq;
1414

1515
class TKqpTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContext {
1616
public:
17-
TKqpTaskRunnerExecutionContext(ui64 txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp)
18-
: TDqTaskRunnerExecutionContext(txId, std::move(wakeUp))
17+
TKqpTaskRunnerExecutionContext(ui64 txId, bool withSpilling, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback)
18+
: TDqTaskRunnerExecutionContext(txId, std::move(wakeUpCallback), std::move(errorCallback))
1919
, WithSpilling_(withSpilling)
2020
{
2121
}

ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,10 @@ void TKqpComputeActor::DoBootstrap() {
7272
auto taskRunner = MakeDqTaskRunner(TBase::GetAllocatorPtr(), execCtx, settings, logger);
7373
SetTaskRunner(taskRunner);
7474

75-
auto wakeup = [this]{ ContinueExecute(); };
75+
auto wakeupCallback = [this]{ ContinueExecute(); };
76+
auto errorCallback = [this](const TString& error){ SendError(error); };
7677
try {
77-
PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeup)));
78+
PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeupCallback), std::move(errorCallback)));
7879
} catch (const NMiniKQL::TKqpEnsureFail& e) {
7980
InternalError((TIssuesIds::EIssueCode) e.GetCode(), e.GetMessage());
8081
return;

ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,8 @@ void TKqpScanComputeActor::DoBootstrap() {
220220
TBase::SetTaskRunner(taskRunner);
221221

222222
auto wakeup = [this] { ContinueExecute(); };
223-
TBase::PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeup)));
223+
auto errorCallback = [this](const TString& error){ SendError(error); };
224+
TBase::PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeup), std::move(errorCallback)));
224225

225226
ComputeCtx.AddTableScan(0, Meta, GetStatsMode());
226227
ScanData = &ComputeCtx.GetTableScan(0);

ydb/core/kqp/ut/spilling/kqp_scan_spilling_ut.cpp

Lines changed: 43 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ NKikimrConfig::TAppConfig AppCfg() {
3232
return appCfg;
3333
}
3434

35-
NKikimrConfig::TAppConfig AppCfgLowComputeLimits(double reasonableTreshold) {
35+
NKikimrConfig::TAppConfig AppCfgLowComputeLimits(double reasonableTreshold, bool enableSpilling=true) {
3636
NKikimrConfig::TAppConfig appCfg;
3737

3838
auto* rm = appCfg.MutableTableServiceConfig()->MutableResourceManager();
@@ -43,12 +43,32 @@ NKikimrConfig::TAppConfig AppCfgLowComputeLimits(double reasonableTreshold) {
4343

4444
auto* spilling = appCfg.MutableTableServiceConfig()->MutableSpillingServiceConfig()->MutableLocalFileConfig();
4545

46-
spilling->SetEnable(true);
46+
spilling->SetEnable(enableSpilling);
4747
spilling->SetRoot("./spilling/");
4848

4949
return appCfg;
5050
}
5151

52+
void FillTableWithData(NQuery::TQueryClient& db, ui64 numRows=300) {
53+
for (ui32 i = 0; i < numRows; ++i) {
54+
auto result = db.ExecuteQuery(Sprintf(R"(
55+
--!syntax_v1
56+
REPLACE INTO `/Root/KeyValue` (Key, Value) VALUES (%d, "%s")
57+
)", i, TString(200000 + i, 'a' + (i % 26)).c_str()), NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
58+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
59+
}
60+
}
61+
62+
constexpr auto SimpleGraceJoinWithSpillingQuery = R"(
63+
--!syntax_v1
64+
PRAGMA ydb.EnableSpillingNodes="GraceJoin";
65+
PRAGMA ydb.CostBasedOptimizationLevel='0';
66+
PRAGMA ydb.HashJoinMode='graceandself';
67+
select t1.Key, t1.Value, t2.Key, t2.Value
68+
from `/Root/KeyValue` as t1 full join `/Root/KeyValue` as t2 on t1.Value = t2.Value
69+
order by t1.Value
70+
)";
71+
5272

5373
} // anonymous namespace
5474

@@ -79,31 +99,15 @@ Y_UNIT_TEST_TWIN(SpillingInRuntimeNodes, EnabledSpilling) {
7999

80100
auto db = kikimr.GetQueryClient();
81101

82-
for (ui32 i = 0; i < 300; ++i) {
83-
auto result = db.ExecuteQuery(Sprintf(R"(
84-
--!syntax_v1
85-
REPLACE INTO `/Root/KeyValue` (Key, Value) VALUES (%d, "%s")
86-
)", i, TString(200000 + i, 'a' + (i % 26)).c_str()), NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
87-
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
88-
}
89-
90-
auto query = R"(
91-
--!syntax_v1
92-
PRAGMA ydb.EnableSpillingNodes="GraceJoin";
93-
PRAGMA ydb.CostBasedOptimizationLevel='0';
94-
PRAGMA ydb.HashJoinMode='graceandself';
95-
select t1.Key, t1.Value, t2.Key, t2.Value
96-
from `/Root/KeyValue` as t1 full join `/Root/KeyValue` as t2 on t1.Value = t2.Value
97-
order by t1.Value
98-
)";
102+
FillTableWithData(db);
99103

100104
auto explainMode = NYdb::NQuery::TExecuteQuerySettings().ExecMode(NYdb::NQuery::EExecMode::Explain);
101-
auto planres = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx(), explainMode).ExtractValueSync();
105+
auto planres = db.ExecuteQuery(SimpleGraceJoinWithSpillingQuery, NYdb::NQuery::TTxControl::NoTx(), explainMode).ExtractValueSync();
102106
UNIT_ASSERT_VALUES_EQUAL_C(planres.GetStatus(), EStatus::SUCCESS, planres.GetIssues().ToString());
103107

104108
Cerr << planres.GetStats()->GetAst() << Endl;
105109

106-
auto result = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), NYdb::NQuery::TExecuteQuerySettings()).ExtractValueSync();
110+
auto result = db.ExecuteQuery(SimpleGraceJoinWithSpillingQuery, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), NYdb::NQuery::TExecuteQuerySettings()).ExtractValueSync();
107111
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
108112

109113
TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);
@@ -116,6 +120,24 @@ Y_UNIT_TEST_TWIN(SpillingInRuntimeNodes, EnabledSpilling) {
116120
}
117121
}
118122

123+
Y_UNIT_TEST(HandleErrorsCorrectly) {
124+
Cerr << "cwd: " << NFs::CurrentWorkingDirectory() << Endl;
125+
TKikimrRunner kikimr(AppCfgLowComputeLimits(0.01, false));
126+
127+
auto db = kikimr.GetQueryClient();
128+
129+
FillTableWithData(db);
130+
131+
auto explainMode = NYdb::NQuery::TExecuteQuerySettings().ExecMode(NYdb::NQuery::EExecMode::Explain);
132+
auto planres = db.ExecuteQuery(SimpleGraceJoinWithSpillingQuery, NYdb::NQuery::TTxControl::NoTx(), explainMode).ExtractValueSync();
133+
UNIT_ASSERT_VALUES_EQUAL_C(planres.GetStatus(), EStatus::SUCCESS, planres.GetIssues().ToString());
134+
135+
Cerr << planres.GetStats()->GetAst() << Endl;
136+
137+
auto result = db.ExecuteQuery(SimpleGraceJoinWithSpillingQuery, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), NYdb::NQuery::TExecuteQuerySettings()).ExtractValueSync();
138+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::INTERNAL_ERROR, result.GetIssues().ToString());
139+
}
140+
119141
Y_UNIT_TEST(SelfJoinQueryService) {
120142
Cerr << "cwd: " << NFs::CurrentWorkingDirectory() << Endl;
121143

ydb/core/tx/datashard/datashard_kqp.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1012,7 +1012,11 @@ class TKqpTaskRunnerExecutionContext: public NDq::IDqTaskRunnerExecutionContext
10121012
return {};
10131013
}
10141014

1015-
std::function<void()> GetWakeupCallback() const override {
1015+
NDq::TWakeUpCallback GetWakeupCallback() const override {
1016+
return {};
1017+
}
1018+
1019+
NDq::TErrorCallback GetErrorCallback() const override {
10161020
return {};
10171021
}
10181022

ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,9 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
127127

128128
Become(&TDqAsyncComputeActor::StateFuncWrapper<&TDqAsyncComputeActor::StateFuncBody>);
129129

130-
auto wakeup = [this]{ ContinueExecute(EResumeSource::CABootstrapWakeup); };
131-
std::shared_ptr<IDqTaskRunnerExecutionContext> execCtx = std::make_shared<TDqTaskRunnerExecutionContext>(TxId, std::move(wakeup));
130+
auto wakeupCallback = [this]{ ContinueExecute(EResumeSource::CABootstrapWakeup); };
131+
auto errorCallback = [this](const TString& error){ SendError(error); };
132+
std::shared_ptr<IDqTaskRunnerExecutionContext> execCtx = std::make_shared<TDqTaskRunnerExecutionContext>(TxId, std::move(wakeupCallback), std::move(errorCallback));
132133

133134
Send(TaskRunnerActorId,
134135
new NTaskRunnerActor::TEvTaskRunnerCreate(

ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,9 @@ class TDqComputeActor : public TDqSyncComputeActorBase<TDqComputeActor> {
5858

5959
auto taskRunner = TaskRunnerFactory(GetAllocatorPtr(), Task, RuntimeSettings.StatsMode, logger);
6060
SetTaskRunner(taskRunner);
61-
auto wakeup = [this]{ ContinueExecute(EResumeSource::CABootstrapWakeup); };
62-
TDqTaskRunnerExecutionContext execCtx(TxId, std::move(wakeup));
61+
auto wakeupCallback = [this]{ ContinueExecute(EResumeSource::CABootstrapWakeup); };
62+
auto errorCallback = [this](const TString& error){ SendError(error); };
63+
TDqTaskRunnerExecutionContext execCtx(TxId, std::move(wakeupCallback), std::move(errorCallback));
6364
PrepareTaskRunner(execCtx);
6465

6566
ContinueExecute(EResumeSource::CABootstrap);

ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -629,6 +629,10 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
629629
}
630630
}
631631

632+
void SendError(const TString& error) {
633+
this->Send(this->SelfId(), TEvDq::TEvAbortExecution::InternalError(error));
634+
}
635+
632636
protected: //TDqComputeActorChannels::ICallbacks
633637
//i64 GetInputChannelFreeSpace(ui64 channelId) is pure and must be overridded in derived class
634638

ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
213213
TaskRunner->Prepare(this->Task, limits, execCtx);
214214

215215
if (this->Task.GetEnableSpilling()) {
216-
TaskRunner->SetSpillerFactory(std::make_shared<TDqSpillerFactory>(execCtx.GetTxId(), NActors::TActivationContext::ActorSystem(), execCtx.GetWakeupCallback()));
216+
TaskRunner->SetSpillerFactory(std::make_shared<TDqSpillerFactory>(execCtx.GetTxId(), NActors::TActivationContext::ActorSystem(), execCtx.GetWakeupCallback(), execCtx.GetErrorCallback()));
217217
}
218218

219219
for (auto& [channelId, channel] : this->InputChannelsMap) {

ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@
66
namespace NYql {
77
namespace NDq {
88

9-
TDqTaskRunnerExecutionContext::TDqTaskRunnerExecutionContext(TTxId txId, IDqChannelStorage::TWakeUpCallback&& wakeUp)
9+
TDqTaskRunnerExecutionContext::TDqTaskRunnerExecutionContext(TTxId txId, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback)
1010
: TxId_(txId)
11-
, WakeUp_(std::move(wakeUp))
11+
, WakeUpCallback_(std::move(wakeUpCallback))
12+
, ErrorCallback_(std::move(errorCallback))
1213
{
1314
}
1415

@@ -18,14 +19,18 @@ IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64
1819

1920
IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId, bool withSpilling, NActors::TActorSystem* actorSystem) const {
2021
if (withSpilling) {
21-
return CreateDqChannelStorage(TxId_, channelId, WakeUp_, actorSystem);
22+
return CreateDqChannelStorage(TxId_, channelId, WakeUpCallback_, ErrorCallback_, actorSystem);
2223
} else {
2324
return nullptr;
2425
}
2526
}
2627

27-
std::function<void()> TDqTaskRunnerExecutionContext::GetWakeupCallback() const {
28-
return WakeUp_;
28+
TWakeUpCallback TDqTaskRunnerExecutionContext::GetWakeupCallback() const {
29+
return WakeUpCallback_;
30+
}
31+
32+
TErrorCallback TDqTaskRunnerExecutionContext::GetErrorCallback() const {
33+
return ErrorCallback_;
2934
}
3035

3136
TTxId TDqTaskRunnerExecutionContext::GetTxId() const {

0 commit comments

Comments
 (0)