Skip to content

Commit a50edfe

Browse files
authored
Statistics: OperationId is generated by KQP (#7694)
1 parent 702825d commit a50edfe

16 files changed

+174
-80
lines changed

ydb/core/kqp/gateway/actors/analyze_actor.cpp

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include <ydb/core/base/path.h>
44
#include <ydb/core/base/tablet_pipecache.h>
5+
#include <ydb/core/util/ulid.h>
56
#include <ydb/library/actors/core/log.h>
67
#include <ydb/library/services/services.pb.h>
78

@@ -15,6 +16,18 @@ enum {
1516

1617
using TNavigate = NSchemeCache::TSchemeCacheNavigate;
1718

19+
TString MakeOperationId() {
20+
TULIDGenerator ulidGen;
21+
return ulidGen.Next(TActivationContext::Now()).ToBinary();
22+
}
23+
24+
TAnalyzeActor::TAnalyzeActor(TString tablePath, TVector<TString> columns, NThreading::TPromise<NYql::IKikimrGateway::TGenericResult> promise)
25+
: TablePath(tablePath)
26+
, Columns(columns)
27+
, Promise(promise)
28+
, OperationId(MakeOperationId())
29+
{}
30+
1831
void TAnalyzeActor::Bootstrap() {
1932
using TNavigate = NSchemeCache::TSchemeCacheNavigate;
2033
auto navigate = std::make_unique<TNavigate>();
@@ -34,7 +47,7 @@ void TAnalyzeActor::SendAnalyzeStatus() {
3447

3548
auto getStatus = std::make_unique<NStat::TEvStatistics::TEvAnalyzeStatus>();
3649
auto& record = getStatus->Record;
37-
PathIdFromPathId(PathId, record.MutablePathId());
50+
record.SetOperationId(OperationId);
3851

3952
Send(
4053
MakePipePerNodeCacheID(false),
@@ -43,9 +56,19 @@ void TAnalyzeActor::SendAnalyzeStatus() {
4356
}
4457

4558
void TAnalyzeActor::Handle(NStat::TEvStatistics::TEvAnalyzeResponse::TPtr& ev, const TActorContext& ctx) {
46-
Y_UNUSED(ev);
4759
Y_UNUSED(ctx);
4860

61+
const auto& record = ev->Get()->Record;
62+
const TString operationId = record.GetOperationId();
63+
64+
if (operationId != OperationId) {
65+
ALOG_CRIT(NKikimrServices::KQP_GATEWAY,
66+
"TAnalyzeActor, TEvAnalyzeResponse has operationId=" << operationId
67+
<< " , but expected " << OperationId);
68+
}
69+
70+
71+
// TODO Don't send EvAnalyzeStatus, EvAnalyzeResponse is already here
4972
SendAnalyzeStatus();
5073
}
5174

@@ -172,6 +195,7 @@ void TAnalyzeActor::SendStatisticsAggregatorAnalyze(const NSchemeCache::TSchemeC
172195

173196
auto analyzeRequest = std::make_unique<NStat::TEvStatistics::TEvAnalyze>();
174197
auto& record = analyzeRequest->Record;
198+
record.SetOperationId(OperationId);
175199
auto table = record.AddTables();
176200

177201
PathIdFromPathId(PathId, table->MutablePathId());
@@ -199,6 +223,7 @@ void TAnalyzeActor::SendStatisticsAggregatorAnalyze(const NSchemeCache::TSchemeC
199223
*table->MutableColumnTags()->Add() = tagByColumnName[columnName];
200224
}
201225

226+
// TODO This request should be retried if StatisticsAggregator fails
202227
Send(
203228
MakePipePerNodeCacheID(false),
204229
new TEvPipeCache::TEvForward(analyzeRequest.release(), entry.DomainInfo->Params.GetStatisticsAggregator(), true),

ydb/core/kqp/gateway/actors/analyze_actor.h

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,7 @@ struct TEvAnalyzePrivate {
2020

2121
class TAnalyzeActor : public NActors::TActorBootstrapped<TAnalyzeActor> {
2222
public:
23-
TAnalyzeActor(TString tablePath, TVector<TString> columns, NThreading::TPromise<NYql::IKikimrGateway::TGenericResult> promise)
24-
: TablePath(tablePath)
25-
, Columns(columns)
26-
, Promise(promise)
27-
{}
23+
TAnalyzeActor(TString tablePath, TVector<TString> columns, NThreading::TPromise<NYql::IKikimrGateway::TGenericResult> promise);
2824

2925
void Bootstrap();
3026

@@ -61,6 +57,7 @@ class TAnalyzeActor : public NActors::TActorBootstrapped<TAnalyzeActor> {
6157
// For Statistics Aggregator
6258
std::optional<ui64> StatisticsAggregatorId;
6359
TPathId PathId;
60+
TString OperationId;
6461
};
6562

6663
} // end of NKikimr::NKqp

ydb/core/protos/out/out.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include <ydb/core/protos/flat_scheme_op.pb.h>
2525
#include <ydb/core/protos/subdomains.pb.h>
2626
#include <ydb/core/protos/data_events.pb.h>
27+
#include <ydb/core/protos/statistics.pb.h>
2728

2829
#include <util/stream/output.h>
2930

@@ -238,3 +239,7 @@ Y_DECLARE_OUT_SPEC(, NKikimrDataEvents::TEvWrite::TOperation::EOperationType, st
238239
Y_DECLARE_OUT_SPEC(, NKikimrDataEvents::TEvWrite::ETxMode, stream, value) {
239240
stream << NKikimrDataEvents::TEvWrite::ETxMode_Name(value);
240241
}
242+
243+
Y_DECLARE_OUT_SPEC(, NKikimrStat::TEvAnalyzeStatusResponse_EStatus, stream, value) {
244+
stream << NKikimrStat::TEvAnalyzeStatusResponse_EStatus_Name(value);
245+
}

ydb/core/protos/statistics.proto

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,24 +81,24 @@ message TTable {
8181

8282
// KQP -> SA
8383
message TEvAnalyze {
84-
optional uint64 Cookie = 1; // request cookie to match response item
84+
optional bytes OperationId = 1; // unique identifier to match response item
8585
repeated TTable Tables = 2; // list of analyzed tables and columns
8686
repeated EColumnStatisticType Types = 3; // list of statistics types requested. Empty means asking for all available.
8787
}
8888

8989
// SA -> KQP
9090
message TEvAnalyzeResponse {
91-
optional uint64 Cookie = 1;
91+
optional bytes OperationId = 1;
9292
}
9393

9494
// KQP -> SA
9595
message TEvAnalyzeStatus {
96-
optional NKikimrProto.TPathID PathId = 1;
96+
optional bytes OperationId = 1; // unique identifier to match response item
9797
}
9898

9999
// SA -> KQP
100100
message TEvAnalyzeStatusResponse {
101-
optional NKikimrProto.TPathID PathId = 1;
101+
optional bytes OperationId = 1;
102102

103103
enum EStatus {
104104
STATUS_UNSPECIFIED = 0;

ydb/core/statistics/aggregator/aggregator_impl.cpp

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -433,17 +433,18 @@ void TStatisticsAggregator::Handle(TEvStatistics::TEvStatTableCreationResponse::
433433
}
434434

435435
void TStatisticsAggregator::Handle(TEvStatistics::TEvAnalyzeStatus::TPtr& ev) {
436-
auto& inRecord = ev->Get()->Record;
437-
auto pathId = PathIdFromPathId(inRecord.GetPathId());
436+
const auto& inRecord = ev->Get()->Record;
437+
const TString operationId = inRecord.GetOperationId();
438438

439439
auto response = std::make_unique<TEvStatistics::TEvAnalyzeStatusResponse>();
440440
auto& outRecord = response->Record;
441+
outRecord.SetOperationId(operationId);
441442

442-
if (TraversalTableId.PathId == pathId) {
443+
if (ForceTraversalOperationId == operationId) {
443444
outRecord.SetStatus(NKikimrStat::TEvAnalyzeStatusResponse::STATUS_IN_PROGRESS);
444445
} else {
445446
if (std::any_of(ForceTraversals.begin(), ForceTraversals.end(),
446-
[&pathId](const TForceTraversal& elem) { return elem.PathId == pathId;})) {
447+
[&operationId](const TForceTraversal& elem) { return elem.OperationId == operationId;})) {
447448
outRecord.SetStatus(NKikimrStat::TEvAnalyzeStatusResponse::STATUS_ENQUEUED);
448449
} else {
449450
outRecord.SetStatus(NKikimrStat::TEvAnalyzeStatusResponse::STATUS_NO_OPERATION);
@@ -586,7 +587,6 @@ void TStatisticsAggregator::ScheduleNextTraversal(NIceDb::TNiceDb& db) {
586587
pathId = operation.PathId;
587588

588589
ForceTraversalOperationId = operation.OperationId;
589-
ForceTraversalCookie = operation.Cookie;
590590
ForceTraversalColumnTags = operation.ColumnTags;
591591
ForceTraversalTypes = operation.Types;
592592
ForceTraversalReplyToActorId = operation.ReplyToActorId;
@@ -678,22 +678,17 @@ void TStatisticsAggregator::PersistStartKey(NIceDb::TNiceDb& db) {
678678

679679
void TStatisticsAggregator::PersistForceTraversal(NIceDb::TNiceDb& db) {
680680
PersistSysParam(db, Schema::SysParam_ForceTraversalOperationId, ToString(ForceTraversalOperationId));
681-
PersistSysParam(db, Schema::SysParam_ForceTraversalCookie, ToString(ForceTraversalCookie));
681+
PersistSysParam(db, Schema::SysParam_ForceTraversalCookie, ForceTraversalOperationId);
682682
PersistSysParam(db, Schema::SysParam_ForceTraversalColumnTags, ToString(ForceTraversalColumnTags));
683683
PersistSysParam(db, Schema::SysParam_ForceTraversalTypes, ToString(ForceTraversalTypes));
684684
}
685685

686-
void TStatisticsAggregator::PersistNextForceTraversalOperationId(NIceDb::TNiceDb& db) {
687-
PersistSysParam(db, Schema::SysParam_NextForceTraversalOperationId, ToString(NextForceTraversalOperationId));
688-
}
689-
690686
void TStatisticsAggregator::PersistGlobalTraversalRound(NIceDb::TNiceDb& db) {
691687
PersistSysParam(db, Schema::SysParam_GlobalTraversalRound, ToString(GlobalTraversalRound));
692688
}
693689

694690
void TStatisticsAggregator::ResetTraversalState(NIceDb::TNiceDb& db) {
695-
ForceTraversalOperationId = 0;
696-
ForceTraversalCookie = 0;
691+
ForceTraversalOperationId.clear();
697692
TraversalTableId.PathId = TPathId();
698693
ForceTraversalColumnTags.clear();
699694
ForceTraversalTypes.clear();

ydb/core/statistics/aggregator/aggregator_impl.h

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,6 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
147147
void PersistTraversal(NIceDb::TNiceDb& db);
148148
void PersistForceTraversal(NIceDb::TNiceDb& db);
149149
void PersistStartKey(NIceDb::TNiceDb& db);
150-
void PersistNextForceTraversalOperationId(NIceDb::TNiceDb& db);
151150
void PersistGlobalTraversalRound(NIceDb::TNiceDb& db);
152151

153152
void ResetTraversalState(NIceDb::TNiceDb& db);
@@ -306,15 +305,13 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
306305

307306
private: // stored in local db
308307

309-
ui64 ForceTraversalOperationId = 0;
310-
ui64 ForceTraversalCookie = 0;
308+
TString ForceTraversalOperationId;
311309
TString ForceTraversalColumnTags;
312310
TString ForceTraversalTypes;
313311
TTableId TraversalTableId;
314312
bool TraversalIsColumnTable = false;
315313
TSerializedCellVec TraversalStartKey;
316314
TInstant TraversalStartTime;
317-
ui64 NextForceTraversalOperationId = 0;
318315

319316
size_t GlobalTraversalRound = 1;
320317

@@ -327,8 +324,7 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
327324
TTraversalsByTime ScheduleTraversalsByTime;
328325

329326
struct TForceTraversal {
330-
ui64 OperationId = 0;
331-
ui64 Cookie = 0;
327+
TString OperationId;
332328
TPathId PathId;
333329
TString ColumnTags;
334330
TString Types;

ydb/core/statistics/aggregator/schema.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ struct TAggregatorSchema : NIceDb::Schema {
5050
struct OperationId : Column<1, NScheme::NTypeIds::Uint64> {};
5151
struct OwnerId : Column<2, NScheme::NTypeIds::Uint64> {};
5252
struct LocalPathId : Column<3, NScheme::NTypeIds::Uint64> {};
53-
struct Cookie : Column<4, NScheme::NTypeIds::Uint64> {};
53+
struct Cookie : Column<4, NScheme::NTypeIds::String> {};
5454
struct ColumnTags : Column<5, NScheme::NTypeIds::String> {};
5555
struct Types : Column<6, NScheme::NTypeIds::String> {};
5656
@@ -87,7 +87,7 @@ struct TAggregatorSchema : NIceDb::Schema {
8787
static constexpr ui64 SysParam_ForceTraversalColumnTags = 7;
8888
static constexpr ui64 SysParam_ForceTraversalTypes = 8;
8989
static constexpr ui64 SysParam_TraversalStartTime = 9;
90-
static constexpr ui64 SysParam_NextForceTraversalOperationId = 10;
90+
// deprecated 10
9191
static constexpr ui64 SysParam_TraversalIsColumnTable = 11;
9292
static constexpr ui64 SysParam_GlobalTraversalRound = 12;
9393
};

ydb/core/statistics/aggregator/tx_analyze_table.cpp

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,35 +19,39 @@ struct TStatisticsAggregator::TTxAnalyzeTable : public TTxBase {
1919
TTxType GetTxType() const override { return TXTYPE_ANALYZE_TABLE; }
2020

2121
bool Execute(TTransactionContext& txc, const TActorContext&) override {
22-
SA_LOG_D("[" << Self->TabletID() << "] TTxAnalyzeTable::Execute");
22+
SA_LOG_D("[" << Self->TabletID() << "] TTxAnalyzeTable::Execute. ReplyToActorId " << ReplyToActorId << " , Record " << Record);
2323

2424
if (!Self->EnableColumnStatistics) {
2525
return true;
2626
}
2727

2828
NIceDb::TNiceDb db(txc.DB);
2929

30-
const ui64 cookie = Record.GetCookie();
30+
const TString operationId = Record.GetOperationId();
3131
const TString types = JoinVectorIntoString(TVector<ui32>(Record.GetTypes().begin(), Record.GetTypes().end()), ",");
3232

3333
for (const auto& table : Record.GetTables()) {
3434
const TPathId pathId = PathIdFromPathId(table.GetPathId());
3535
const TString columnTags = JoinVectorIntoString(TVector<ui32>{table.GetColumnTags().begin(),table.GetColumnTags().end()},",");
3636

37-
// drop request with the same cookie and path from this sender
38-
if (std::any_of(Self->ForceTraversals.begin(), Self->ForceTraversals.end(),
39-
[this, &pathId, &cookie](const TForceTraversal& elem) {
37+
// check existing force traversal with the same cookie and path
38+
auto forceTraversal = std::find_if(Self->ForceTraversals.begin(), Self->ForceTraversals.end(),
39+
[&pathId, &operationId](const TForceTraversal& elem) {
4040
return elem.PathId == pathId
41-
&& elem.Cookie == cookie
42-
&& elem.ReplyToActorId == ReplyToActorId
43-
;})) {
41+
&& elem.OperationId == operationId;});
42+
43+
// update existing force traversal
44+
if (forceTraversal != Self->ForceTraversals.end()) {
45+
SA_LOG_D("[" << Self->TabletID() << "] TTxAnalyzeTable::Execute. Update existing force traversal. PathId " << pathId << " , ReplyToActorId " << ReplyToActorId);
46+
forceTraversal->ReplyToActorId = ReplyToActorId;
4447
return true;
4548
}
4649

50+
SA_LOG_D("[" << Self->TabletID() << "] TTxAnalyzeTable::Execute. Create new force traversal operation for pathId " << pathId);
51+
4752
// create new force trasersal
4853
TForceTraversal operation {
49-
.OperationId = Self->NextForceTraversalOperationId,
50-
.Cookie = cookie,
54+
.OperationId = operationId,
5155
.PathId = pathId,
5256
.ColumnTags = columnTags,
5357
.Types = types,
@@ -66,8 +70,6 @@ struct TStatisticsAggregator::TTxAnalyzeTable : public TTxBase {
6670
*/
6771
}
6872

69-
Self->PersistNextForceTraversalOperationId(db);
70-
7173
return true;
7274
}
7375

@@ -77,8 +79,6 @@ struct TStatisticsAggregator::TTxAnalyzeTable : public TTxBase {
7779
};
7880

7981
void TStatisticsAggregator::Handle(TEvStatistics::TEvAnalyze::TPtr& ev) {
80-
++NextForceTraversalOperationId;
81-
8282
Execute(new TTxAnalyzeTable(this, ev->Get()->Record, ev->Sender), TActivationContext::AsActorContext());
8383
}
8484

ydb/core/statistics/aggregator/tx_finish_trasersal.cpp

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,13 @@
55
namespace NKikimr::NStat {
66

77
struct TStatisticsAggregator::TTxFinishTraversal : public TTxBase {
8-
ui64 OperationId;
9-
ui64 Cookie;
8+
TString OperationId;
109
TPathId PathId;
1110
TActorId ReplyToActorId;
1211

1312
TTxFinishTraversal(TSelf* self)
1413
: TTxBase(self)
1514
, OperationId(self->ForceTraversalOperationId)
16-
, Cookie(self->ForceTraversalCookie)
1715
, PathId(self->TraversalTableId.PathId)
1816
, ReplyToActorId(self->ForceTraversalReplyToActorId)
1917
{}
@@ -43,12 +41,12 @@ struct TStatisticsAggregator::TTxFinishTraversal : public TTxBase {
4341

4442
if (operationsRemain) {
4543
SA_LOG_D("[" << Self->TabletID() << "] TTxFinishTraversal::Complete. Don't send TEvAnalyzeResponse. " <<
46-
"There are pending operations, Cookie " << Cookie << " , ActorId=" << ReplyToActorId);
44+
"There are pending operations, OperationId " << OperationId << " , ActorId=" << ReplyToActorId);
4745
} else {
4846
SA_LOG_D("[" << Self->TabletID() << "] TTxFinishTraversal::Complete. " <<
49-
"Send TEvAnalyzeResponse, Cookie=" << Cookie << ", ActorId=" << ReplyToActorId);
47+
"Send TEvAnalyzeResponse, OperationId=" << OperationId << ", ActorId=" << ReplyToActorId);
5048
auto response = std::make_unique<TEvStatistics::TEvAnalyzeResponse>();
51-
response->Record.SetCookie(Cookie);
49+
response->Record.SetOperationId(OperationId);
5250
ctx.Send(ReplyToActorId, response.release());
5351
}
5452
}

ydb/core/statistics/aggregator/tx_init.cpp

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -55,15 +55,10 @@ struct TStatisticsAggregator::TTxInit : public TTxBase {
5555
SA_LOG_D("[" << Self->TabletID() << "] Loaded traversal start key");
5656
break;
5757
case Schema::SysParam_ForceTraversalOperationId: {
58-
Self->ForceTraversalOperationId = FromString<ui64>(value);
58+
Self->ForceTraversalOperationId = value;
5959
SA_LOG_D("[" << Self->TabletID() << "] Loaded traversal operation id: " << value);
6060
break;
6161
}
62-
case Schema::SysParam_ForceTraversalCookie: {
63-
Self->ForceTraversalCookie = FromString<ui64>(value);
64-
SA_LOG_D("[" << Self->TabletID() << "] Loaded traversal cookie: " << value);
65-
break;
66-
}
6762
case Schema::SysParam_TraversalTableOwnerId:
6863
Self->TraversalTableId.PathId.OwnerId = FromString<ui64>(value);
6964
SA_LOG_D("[" << Self->TabletID() << "] Loaded traversal table owner id: "
@@ -90,11 +85,6 @@ struct TStatisticsAggregator::TTxInit : public TTxBase {
9085
SA_LOG_D("[" << Self->TabletID() << "] Loaded traversal start time: " << us);
9186
break;
9287
}
93-
case Schema::SysParam_NextForceTraversalOperationId: {
94-
Self->NextForceTraversalOperationId = FromString<ui64>(value);
95-
SA_LOG_D("[" << Self->TabletID() << "] Loaded next traversal operation id: " << value);
96-
break;
97-
}
9888
case Schema::SysParam_TraversalIsColumnTable: {
9989
Self->TraversalIsColumnTable = FromString<bool>(value);
10090
SA_LOG_D("[" << Self->TabletID() << "] Loaded traversal IsColumnTable: " << value);
@@ -217,7 +207,7 @@ struct TStatisticsAggregator::TTxInit : public TTxBase {
217207
ui64 operationId = rowset.GetValue<Schema::ForceTraversals::OperationId>();
218208
ui64 ownerId = rowset.GetValue<Schema::ForceTraversals::OwnerId>();
219209
ui64 localPathId = rowset.GetValue<Schema::ForceTraversals::LocalPathId>();
220-
ui64 cookie = rowset.GetValue<Schema::ForceTraversals::Cookie>();
210+
TString cookie = rowset.GetValue<Schema::ForceTraversals::Cookie>();
221211
TString columnTags = rowset.GetValue<Schema::ForceTraversals::ColumnTags>();
222212
TString types = rowset.GetValue<Schema::ForceTraversals::Types>();
223213

0 commit comments

Comments
 (0)