Skip to content

Commit 8e21656

Browse files
authored
YQ-4101 KqpRun fixed health check for multi tenant mode (#14427)
1 parent 769580f commit 8e21656

File tree

9 files changed

+106
-33
lines changed

9 files changed

+106
-33
lines changed

ydb/core/kqp/rm_service/kqp_rm_service.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ class TKqpResourceManager : public IKqpResourceManager {
163163
, ExecutionUnitsLimit(config.GetComputeActorsCount())
164164
, SpillingPercent(config.GetSpillingPercent())
165165
, TotalMemoryResource(MakeIntrusive<TMemoryResource>(config.GetQueryMemoryLimit(), (double)100, config.GetSpillingPercent()))
166+
, ResourceSnapshotState(std::make_shared<TResourceSnapshotState>())
166167
{
167168
SetConfigValues(config);
168169
}
@@ -195,7 +196,6 @@ class TKqpResourceManager : public IKqpResourceManager {
195196

196197
void CreateResourceInfoExchanger(
197198
const NKikimrConfig::TTableServiceConfig::TResourceManager::TInfoExchangerSettings& settings) {
198-
ResourceSnapshotState = std::make_shared<TResourceSnapshotState>();
199199
auto exchanger = CreateKqpResourceInfoExchangerActor(
200200
Counters, ResourceSnapshotState, settings);
201201
ResourceInfoExchanger = ActorSystem->Register(exchanger);

ydb/core/kqp/workload_service/common/helpers.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,13 @@ class TSchemeActorBase : public NActors::TActorBootstrapped<TDerived> {
8383

8484
private:
8585
static TRetryPolicy::IRetryState::TPtr CreateRetryState() {
86-
return TRetryPolicy::GetFixedIntervalPolicy(
86+
return TRetryPolicy::GetExponentialBackoffPolicy(
8787
[](bool longDelay){return longDelay ? ERetryErrorClass::LongRetry : ERetryErrorClass::ShortRetry;}
8888
, TDuration::MilliSeconds(100)
8989
, TDuration::MilliSeconds(500)
90-
, 100
90+
, TDuration::Seconds(1)
91+
, std::numeric_limits<size_t>::max()
92+
, TDuration::Seconds(10)
9193
)->CreateRetryState();
9294
}
9395

ydb/library/table_creator/table_creator.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -380,11 +380,13 @@ using TTableCreatorRetryPolicy = IRetryPolicy<bool>;
380380
}
381381

382382
static TTableCreatorRetryPolicy::IRetryState::TPtr CreateRetryState() {
383-
return TTableCreatorRetryPolicy::GetFixedIntervalPolicy(
383+
return TTableCreatorRetryPolicy::GetExponentialBackoffPolicy(
384384
[](bool longDelay){return longDelay ? ERetryErrorClass::LongRetry : ERetryErrorClass::ShortRetry;}
385385
, TDuration::MilliSeconds(100)
386386
, TDuration::MilliSeconds(300)
387-
, 100
387+
, TDuration::Seconds(1)
388+
, std::numeric_limits<size_t>::max()
389+
, TDuration::Seconds(10)
388390
)->CreateRetryState();
389391
}
390392

ydb/tests/tools/kqprun/configuration/app_config.conf

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,4 +191,8 @@ TableServiceConfig {
191191
ResourceManager {
192192
QueryMemoryLimit: 64424509440
193193
}
194+
195+
WriteActorSettings {
196+
MaxWriteAttempts: 1000
197+
}
194198
}

ydb/tests/tools/kqprun/kqprun.cpp

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ struct TExecutionOptions {
4444
bool UseTemplates = false;
4545

4646
ui32 LoopCount = 1;
47+
TDuration QueryDelay;
4748
TDuration LoopDelay;
4849
bool ContinueAfterFail = false;
4950

@@ -100,11 +101,12 @@ struct TExecutionOptions {
100101
};
101102
}
102103

103-
TRequestOptions GetScriptQueryOptions(size_t index, size_t queryId, TInstant startTime) const {
104+
TRequestOptions GetScriptQueryOptions(size_t index, size_t loopId, size_t queryId, TInstant startTime) const {
104105
Y_ABORT_UNLESS(index < ScriptQueries.size());
105106

106107
TString sql = ScriptQueries[index];
107108
if (UseTemplates) {
109+
SubstGlobal(sql, "${LOOP_ID}", ToString(loopId));
108110
SubstGlobal(sql, "${QUERY_ID}", ToString(queryId));
109111
}
110112

@@ -270,12 +272,12 @@ struct TExecutionOptions {
270272
};
271273

272274

273-
void RunArgumentQuery(size_t index, size_t queryId, TInstant startTime, const TExecutionOptions& executionOptions, TKqpRunner& runner) {
275+
void RunArgumentQuery(size_t index, size_t loopId, size_t queryId, TInstant startTime, const TExecutionOptions& executionOptions, TKqpRunner& runner) {
274276
NColorizer::TColors colors = NColorizer::AutoColors(Cout);
275277

276278
switch (executionOptions.GetExecutionCase(index)) {
277279
case TExecutionOptions::EExecutionCase::GenericScript: {
278-
if (!runner.ExecuteScript(executionOptions.GetScriptQueryOptions(index, queryId, startTime))) {
280+
if (!runner.ExecuteScript(executionOptions.GetScriptQueryOptions(index, loopId, queryId, startTime))) {
279281
ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Script execution failed";
280282
}
281283
Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Fetching script results..." << colors.Default() << Endl;
@@ -292,21 +294,21 @@ void RunArgumentQuery(size_t index, size_t queryId, TInstant startTime, const TE
292294
}
293295

294296
case TExecutionOptions::EExecutionCase::GenericQuery: {
295-
if (!runner.ExecuteQuery(executionOptions.GetScriptQueryOptions(index, queryId, startTime))) {
297+
if (!runner.ExecuteQuery(executionOptions.GetScriptQueryOptions(index, loopId, queryId, startTime))) {
296298
ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Query execution failed";
297299
}
298300
break;
299301
}
300302

301303
case TExecutionOptions::EExecutionCase::YqlScript: {
302-
if (!runner.ExecuteYqlScript(executionOptions.GetScriptQueryOptions(index, queryId, startTime))) {
304+
if (!runner.ExecuteYqlScript(executionOptions.GetScriptQueryOptions(index, loopId, queryId, startTime))) {
303305
ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Yql script execution failed";
304306
}
305307
break;
306308
}
307309

308310
case TExecutionOptions::EExecutionCase::AsyncQuery: {
309-
runner.ExecuteQueryAsync(executionOptions.GetScriptQueryOptions(index, queryId, startTime));
311+
runner.ExecuteQueryAsync(executionOptions.GetScriptQueryOptions(index, loopId, queryId, startTime));
310312
break;
311313
}
312314
}
@@ -327,24 +329,25 @@ void RunArgumentQueries(const TExecutionOptions& executionOptions, TKqpRunner& r
327329
const size_t numberLoops = executionOptions.LoopCount;
328330
for (size_t queryId = 0; queryId < numberQueries * numberLoops || numberLoops == 0; ++queryId) {
329331
size_t id = queryId % numberQueries;
330-
if (id == 0 && queryId > 0) {
331-
Sleep(executionOptions.LoopDelay);
332+
if (queryId > 0) {
333+
Sleep(id == 0 ? executionOptions.LoopDelay : executionOptions.QueryDelay);
332334
}
333335

334336
const TInstant startTime = TInstant::Now();
337+
const size_t loopId = queryId / numberQueries;
335338
if (executionOptions.GetExecutionCase(id) != TExecutionOptions::EExecutionCase::AsyncQuery) {
336339
Cout << colors.Yellow() << startTime.ToIsoStringLocal() << " Executing script";
337340
if (numberQueries > 1) {
338341
Cout << " " << id;
339342
}
340343
if (numberLoops != 1) {
341-
Cout << ", loop " << queryId / numberQueries;
344+
Cout << ", loop " << loopId;
342345
}
343346
Cout << "..." << colors.Default() << Endl;
344347
}
345348

346349
try {
347-
RunArgumentQuery(id, queryId, startTime, executionOptions, runner);
350+
RunArgumentQuery(id, loopId, queryId, startTime, executionOptions, runner);
348351
} catch (const yexception& exception) {
349352
if (executionOptions.ContinueAfterFail) {
350353
Cerr << colors.Red() << CurrentExceptionMessage() << colors.Default() << Endl;
@@ -713,6 +716,11 @@ class TMain : public TMainBase {
713716
.DefaultValue(0)
714717
.StoreMappedResultT<ui64>(&ExecutionOptions.LoopDelay, &TDuration::MilliSeconds<ui64>);
715718

719+
options.AddLongOption("query-delay", "Delay in milliseconds between queries starts")
720+
.RequiredArgument("uint")
721+
.DefaultValue(0)
722+
.StoreMappedResultT<ui64>(&ExecutionOptions.QueryDelay, &TDuration::MilliSeconds<ui64>);
723+
716724
options.AddLongOption("continue-after-fail", "Don't not stop requests execution after fails")
717725
.NoArgument()
718726
.SetFlag(&ExecutionOptions.ContinueAfterFail);
@@ -751,7 +759,7 @@ class TMain : public TMainBase {
751759

752760
options.AddLongOption('H', "health-check", TStringBuilder() << "Level of health check before start (max level " << static_cast<ui32>(TYdbSetupSettings::EHealthCheck::Max) - 1 << ")")
753761
.RequiredArgument("uint")
754-
.DefaultValue(static_cast<ui8>(TYdbSetupSettings::EHealthCheck::NodesCount))
762+
.DefaultValue(static_cast<ui8>(TYdbSetupSettings::EHealthCheck::FetchDatabase))
755763
.StoreMappedResultT<ui8>(&RunnerOptions.YdbSettings.HealthCheckLevel, [](ui8 value) {
756764
return static_cast<TYdbSetupSettings::EHealthCheck>(std::min(value, static_cast<ui8>(TYdbSetupSettings::EHealthCheck::Max)));
757765
});

ydb/tests/tools/kqprun/src/actors.cpp

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
#include <ydb/core/kqp/common/simple/services.h>
66
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
7-
7+
#include <ydb/core/kqp/workload_service/actors/actors.h>
88

99
namespace NKqpRun {
1010

@@ -254,6 +254,8 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaite
254254
void Bootstrap() {
255255
Become(&TResourcesWaiterActor::StateFunc);
256256

257+
Schedule(Settings_.HealthCheckTimeout, new NActors::TEvents::TEvWakeup());
258+
257259
HealthCheckStage_ = EHealthCheck::NodesCount;
258260
DoHealthCheck();
259261
}
@@ -264,17 +266,26 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaite
264266
return;
265267
}
266268

269+
if (TInstant::Now() - StartTime_ >= Settings_.HealthCheckTimeout) {
270+
FailTimeout();
271+
return;
272+
}
273+
267274
switch (HealthCheckStage_) {
268-
case TYdbSetupSettings::EHealthCheck::NodesCount:
275+
case EHealthCheck::NodesCount:
269276
CheckResourcesPublish();
270277
break;
271278

272-
case TYdbSetupSettings::EHealthCheck::ScriptRequest:
279+
case EHealthCheck::FetchDatabase:
280+
FetchDatabase();
281+
break;
282+
283+
case EHealthCheck::ScriptRequest:
273284
StartScriptQuery();
274285
break;
275286

276-
case TYdbSetupSettings::EHealthCheck::None:
277-
case TYdbSetupSettings::EHealthCheck::Max:
287+
case EHealthCheck::None:
288+
case EHealthCheck::Max:
278289
Finish();
279290
break;
280291
}
@@ -283,14 +294,25 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaite
283294
void Handle(TEvPrivate::TEvResourcesInfo::TPtr& ev) {
284295
const auto nodeCount = ev->Get()->NodeCount;
285296
if (nodeCount == Settings_.ExpectedNodeCount) {
286-
HealthCheckStage_ = EHealthCheck::ScriptRequest;
297+
HealthCheckStage_ = EHealthCheck::FetchDatabase;
287298
DoHealthCheck();
288299
return;
289300
}
290301

291302
Retry(TStringBuilder() << "invalid node count, got " << nodeCount << ", expected " << Settings_.ExpectedNodeCount, true);
292303
}
293304

305+
void Handle(NKikimr::NKqp::NWorkload::TEvFetchDatabaseResponse::TPtr& ev) {
306+
const auto status = ev->Get()->Status;
307+
if (status == Ydb::StatusIds::SUCCESS) {
308+
HealthCheckStage_ = EHealthCheck::ScriptRequest;
309+
DoHealthCheck();
310+
return;
311+
}
312+
313+
Retry(TStringBuilder() << "failed to fetch database with status " << status << ", reason:\n" << CoutColors_.Default() << ev->Get()->Issues.ToString(), true);
314+
}
315+
294316
void Handle(NKikimr::NKqp::TEvKqp::TEvScriptResponse::TPtr& ev) {
295317
const auto status = ev->Get()->Status;
296318
if (status == Ydb::StatusIds::SUCCESS) {
@@ -304,6 +326,7 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaite
304326
STRICT_STFUNC(StateFunc,
305327
sFunc(NActors::TEvents::TEvWakeup, DoHealthCheck);
306328
hFunc(TEvPrivate::TEvResourcesInfo, Handle);
329+
hFunc(NKikimr::NKqp::NWorkload::TEvFetchDatabaseResponse, Handle);
307330
hFunc(NKikimr::NKqp::TEvKqp::TEvScriptResponse, Handle);
308331
)
309332

@@ -324,6 +347,10 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaite
324347
});
325348
}
326349

350+
void FetchDatabase() {
351+
Register(NKikimr::NKqp::NWorkload::CreateDatabaseFetcherActor(SelfId(), Settings_.Database));
352+
}
353+
327354
void StartScriptQuery() {
328355
auto event = MakeHolder<NKikimr::NKqp::TEvKqp::TEvScriptRequest>();
329356
event->Record.SetUserToken(NACLib::TUserToken("", BUILTIN_ACL_ROOT, {}).SerializeAsString());
@@ -344,11 +371,12 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaite
344371

345372
if (auto delay = RetryState_->GetNextRetryDelay(shortRetry)) {
346373
if (Settings_.VerboseLevel >= EVerbose::InitLogs) {
347-
Cout << CoutColors_.Cyan() << "Retry in " << *delay << " " << message << CoutColors_.Default() << Endl;
374+
const TString str = TStringBuilder() << CoutColors_.Cyan() << "Retry for database '" << Settings_.Database << "' in " << *delay << " " << message << CoutColors_.Default();
375+
Cout << str << Endl;
348376
}
349377
Schedule(*delay, new NActors::TEvents::TEvWakeup());
350378
} else {
351-
Fail(TStringBuilder() << "Health check timeout " << Settings_.HealthCheckTimeout << " exceeded, use --health-check-timeout for increasing it or check out health check logs by using --verbose " << static_cast<ui32>(EVerbose::InitLogs));
379+
FailTimeout();
352380
}
353381
}
354382

@@ -357,6 +385,10 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaite
357385
PassAway();
358386
}
359387

388+
void FailTimeout() {
389+
Fail(TStringBuilder() << "Health check timeout " << Settings_.HealthCheckTimeout << " exceeded for database '" << Settings_.Database << "', use --health-check-timeout for increasing it or check out health check logs by using --verbose " << static_cast<ui32>(EVerbose::InitLogs));
390+
}
391+
360392
void Fail(const TString& error) {
361393
Promise_.SetException(error);
362394
PassAway();
@@ -368,6 +400,7 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaite
368400

369401
private:
370402
const TWaitResourcesSettings Settings_;
403+
const TInstant StartTime_ = TInstant::Now();
371404
const NColorizer::TColors CoutColors_ = NColorizer::AutoColors(Cout);
372405
const IRetryPolicy::TPtr RetryPolicy_;
373406
IRetryPolicy::IRetryState::TPtr RetryState_ = nullptr;

ydb/tests/tools/kqprun/src/common.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ struct TYdbSetupSettings : public NKikimrRun::TServerSettings {
4343
enum class EHealthCheck {
4444
None,
4545
NodesCount,
46+
FetchDatabase,
4647
ScriptRequest,
4748
Max
4849
};

ydb/tests/tools/kqprun/src/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ SRCS(
77
)
88

99
PEERDIR(
10+
ydb/core/kqp/workload_service/actors
1011
ydb/core/testlib
1112

1213
ydb/tests/tools/kqprun/runlib

ydb/tests/tools/kqprun/src/ydb_setup.cpp

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ void FillQueryMeta(TQueryMeta& meta, const NKikimrKqp::TQueryResponse& response)
136136

137137
class TYdbSetup::TImpl {
138138
using EVerbose = TYdbSetupSettings::EVerbose;
139+
using EHealthCheck = TYdbSetupSettings::EHealthCheck;
139140

140141
private:
141142
TAutoPtr<TLogBackend> CreateLogBackend() const {
@@ -399,19 +400,36 @@ class TYdbSetup::TImpl {
399400
NYql::NLog::InitLogger(NActors::CreateNullBackend());
400401
}
401402

402-
void WaitResourcesPublishing() const {
403-
auto promise = NThreading::NewPromise();
403+
NThreading::TFuture<void> RunHealthCheck(const TString& database) const {
404+
EHealthCheck level = Settings_.HealthCheckLevel;
405+
i32 nodesCount = Settings_.NodeCount;
406+
if (database != Settings_.DomainName) {
407+
nodesCount = Tenants_->Size(database);
408+
} else if (StorageMeta_.TenantsSize() > 0) {
409+
level = std::min(level, EHealthCheck::NodesCount);
410+
}
411+
404412
const TWaitResourcesSettings settings = {
405-
.ExpectedNodeCount = static_cast<i32>(Settings_.NodeCount),
406-
.HealthCheckLevel = Settings_.HealthCheckLevel,
413+
.ExpectedNodeCount = nodesCount,
414+
.HealthCheckLevel = level,
407415
.HealthCheckTimeout = Settings_.HealthCheckTimeout,
408416
.VerboseLevel = Settings_.VerboseLevel,
409-
.Database = NKikimr::CanonizePath(Settings_.DomainName)
417+
.Database = NKikimr::CanonizePath(database)
410418
};
411-
GetRuntime()->Register(CreateResourcesWaiterActor(promise, settings), 0, GetRuntime()->GetAppData().SystemPoolId);
419+
const auto promise = NThreading::NewPromise();
420+
GetRuntime()->Register(CreateResourcesWaiterActor(promise, settings), GetNodeIndexForDatabase(database), GetRuntime()->GetAppData().SystemPoolId);
421+
422+
return promise.GetFuture();
423+
}
424+
425+
void WaitResourcesPublishing() const {
426+
std::vector<NThreading::TFuture<void>> futures(1, RunHealthCheck(Settings_.DomainName));
427+
for (const auto& [tenantName, _] : StorageMeta_.GetTenants()) {
428+
futures.emplace_back(RunHealthCheck(GetTenantPath(tenantName)));
429+
}
412430

413431
try {
414-
promise.GetFuture().GetValue(2 * Settings_.HealthCheckTimeout);
432+
NThreading::WaitAll(futures).GetValue(2 * Settings_.HealthCheckTimeout);
415433
} catch (...) {
416434
ythrow yexception() << "Failed to initialize all resources: " << CurrentExceptionMessage();
417435
}
@@ -628,7 +646,11 @@ class TYdbSetup::TImpl {
628646
}
629647

630648
TString GetDatabasePath(const TString& database) const {
631-
return NKikimr::CanonizePath(database ? database : GetDefaultDatabase());
649+
const TString& result = NKikimr::CanonizePath(database ? database : GetDefaultDatabase());
650+
if (StorageMeta_.TenantsSize() > 0 && result == NKikimr::CanonizePath(Settings_.DomainName)) {
651+
ythrow yexception() << "Cannot use root domain '" << result << "' as request database then created additional tenants";
652+
}
653+
return result;
632654
}
633655

634656
ui32 GetNodeIndexForDatabase(const TString& path) const {
@@ -653,7 +675,7 @@ class TYdbSetup::TImpl {
653675
ythrow yexception() << "Can not choose default database, there is more than one tenants, please use `-D <database name>`";
654676
}
655677
if (StorageMeta_.TenantsSize() == 1) {
656-
return StorageMeta_.GetTenants().begin()->first;
678+
return GetTenantPath(StorageMeta_.GetTenants().begin()->first);
657679
}
658680
return Settings_.DomainName;
659681
}

0 commit comments

Comments
 (0)