3
3
#include " show_create.h"
4
4
5
5
#include < ydb/core/base/tablet_pipe.h>
6
- #include < ydb/core/base/tablet_pipe .h>
6
+ #include < ydb/core/scheme/scheme_pathid .h>
7
7
#include < ydb/core/sys_view/common/scan_actor_base_impl.h>
8
8
#include < ydb/core/sys_view/common/schema.h>
9
9
#include < ydb/core/tx/scheme_cache/scheme_cache.h>
10
10
#include < ydb/core/tx/schemeshard/schemeshard.h>
11
+ #include < ydb/core/tx/sequenceproxy/public/events.h>
11
12
#include < ydb/core/tx/tx_proxy/proxy.h>
12
13
13
14
#include < ydb/library/actors/core/hfunc.h>
@@ -79,6 +80,7 @@ class TShowCreate : public TScanActorBase<TShowCreate> {
79
80
STFUNC (StateCollectTableSettings) {
80
81
switch (ev->GetTypeRewrite ()) {
81
82
hFunc (NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult, HandleCollectTableSettings);
83
+ hFunc (NSequenceProxy::TEvSequenceProxy::TEvGetSequenceResult, Handle);
82
84
default :
83
85
LOG_CRIT (*TlsActivationContext, NKikimrServices::SYSTEM_VIEWS,
84
86
" NSysView::TScanActorBase: unexpected event 0x%08" PRIx32, ev->GetTypeRewrite ());
@@ -169,6 +171,10 @@ class TShowCreate : public TScanActorBase<TShowCreate> {
169
171
}
170
172
}
171
173
174
+ bool NeedToCollectTableSettings (const NKikimrSchemeOp::TTableDescription& tableDesc) {
175
+ return !tableDesc.GetCdcStreams ().empty () || !tableDesc.GetSequences ().empty ();
176
+ }
177
+
172
178
void StartCollectTableSettings (const TString& tablePath, const NKikimrSchemeOp::TTableDescription& tableDesc, bool temporary) {
173
179
CollectTableSettingsState = MakeHolder<TCollectTableSettingsState>();
174
180
CollectTableSettingsState->TablePath = tablePath;
@@ -196,6 +202,15 @@ class TShowCreate : public TScanActorBase<TShowCreate> {
196
202
197
203
Send (MakeTxProxyID (), navigateRequest.release ());
198
204
}
205
+
206
+ for (const auto & sequence: tableDesc.GetSequences ()) {
207
+ auto sequencePathId = TPathId::FromProto (sequence.GetPathId ());
208
+ CollectTableSettingsState->Sequences [sequencePathId] = nullptr ;
209
+
210
+ Send (NSequenceProxy::MakeSequenceProxyServiceID (),
211
+ new NSequenceProxy::TEvSequenceProxy::TEvGetSequence (Database, sequencePathId)
212
+ );
213
+ }
199
214
}
200
215
201
216
void FillBatch (NKqp::TEvKqpCompute::TEvScanData& batch, const TString& path, const TString& statement) {
@@ -263,14 +278,14 @@ class TShowCreate : public TScanActorBase<TShowCreate> {
263
278
temporary = true ;
264
279
}
265
280
266
- if (!tableDesc. GetCdcStreams (). empty ( )) {
281
+ if (NeedToCollectTableSettings (tableDesc )) {
267
282
StartCollectTableSettings (tablePath, tableDesc, temporary);
268
283
Become (&TShowCreate::StateCollectTableSettings);
269
284
return ;
270
285
}
271
286
272
287
TCreateTableFormatter formatter;
273
- auto formatterResult = formatter.Format (tablePath, tableDesc, temporary, {});
288
+ auto formatterResult = formatter.Format (tablePath, Path, tableDesc, temporary, {} , {});
274
289
if (formatterResult.IsSuccess ()) {
275
290
path = tablePath;
276
291
statement = formatterResult.ExtractOut ();
@@ -385,16 +400,18 @@ class TShowCreate : public TScanActorBase<TShowCreate> {
385
400
it->second = MakeHolder<NKikimrSchemeOp::TPersQueueGroupDescription>(description);
386
401
CollectTableSettingsState->CurrentPersQueuesNumber ++;
387
402
388
- if (CollectTableSettingsState-> CurrentPersQueuesNumber != CollectTableSettingsState->PersQueues . size ()) {
403
+ if (! CollectTableSettingsState->IsReady ()) {
389
404
return ;
390
405
}
391
406
392
407
TCreateTableFormatter formatter;
393
408
auto formatterResult = formatter.Format (
394
409
CollectTableSettingsState->TablePath ,
410
+ Path,
395
411
CollectTableSettingsState->TableDescription ,
396
412
CollectTableSettingsState->Temporary ,
397
- CollectTableSettingsState->PersQueues
413
+ CollectTableSettingsState->PersQueues ,
414
+ CollectTableSettingsState->Sequences
398
415
);
399
416
if (formatterResult.IsSuccess ()) {
400
417
path = CollectTableSettingsState->TablePath ;
@@ -441,6 +458,57 @@ class TShowCreate : public TScanActorBase<TShowCreate> {
441
458
SendBatch (std::move (batch));
442
459
}
443
460
461
+ void Handle (NSequenceProxy::TEvSequenceProxy::TEvGetSequenceResult::TPtr& ev) {
462
+ if (ev->Get ()->Status != Ydb::StatusIds::SUCCESS) {
463
+ ReplyErrorAndDie (ev->Get ()->Status , ev->Get ()->Issues .ToString ());
464
+ return ;
465
+ }
466
+
467
+ auto * msg = ev->Get ();
468
+
469
+ auto it = CollectTableSettingsState->Sequences .find (msg->PathId );
470
+ if (it == CollectTableSettingsState->Sequences .end ()) {
471
+ return ReplyErrorAndDie (Ydb::StatusIds::INTERNAL_ERROR, TStringBuilder () << " Unknown sequence path id: " << msg->PathId );
472
+ }
473
+ if (it->second ) {
474
+ return ReplyErrorAndDie (Ydb::StatusIds::INTERNAL_ERROR, TStringBuilder () << " Found duplicate sequence path id: " << msg->PathId );
475
+ }
476
+ it->second = MakeHolder<NSequenceProxy::TEvSequenceProxy::TEvGetSequenceResult>(*msg);
477
+ CollectTableSettingsState->CurrentSequencesNumber ++;
478
+
479
+ if (!CollectTableSettingsState->IsReady ()) {
480
+ return ;
481
+ }
482
+
483
+ TCreateTableFormatter formatter;
484
+ auto formatterResult = formatter.Format (
485
+ CollectTableSettingsState->TablePath ,
486
+ Path,
487
+ CollectTableSettingsState->TableDescription ,
488
+ CollectTableSettingsState->Temporary ,
489
+ CollectTableSettingsState->PersQueues ,
490
+ CollectTableSettingsState->Sequences
491
+ );
492
+ std::optional<TString> path;
493
+ std::optional<TString> statement;
494
+ if (formatterResult.IsSuccess ()) {
495
+ path = CollectTableSettingsState->TablePath ;
496
+ statement = formatterResult.ExtractOut ();
497
+ } else {
498
+ ReplyErrorAndDie (formatterResult.GetStatus (), formatterResult.GetError ());
499
+ return ;
500
+ }
501
+
502
+ Y_ENSURE (path.has_value ());
503
+ Y_ENSURE (statement.has_value ());
504
+
505
+ auto batch = MakeHolder<NKqp::TEvKqpCompute::TEvScanData>(ScanId);
506
+
507
+ FillBatch (*batch, path.value (), statement.value ());
508
+
509
+ SendBatch (std::move (batch));
510
+ }
511
+
444
512
private:
445
513
TString Database;
446
514
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
@@ -453,6 +521,12 @@ class TShowCreate : public TScanActorBase<TShowCreate> {
453
521
bool Temporary;
454
522
THashMap<TString, THolder<NKikimrSchemeOp::TPersQueueGroupDescription>> PersQueues;
455
523
ui32 CurrentPersQueuesNumber = 0 ;
524
+ THashMap<TPathId, THolder<NSequenceProxy::TEvSequenceProxy::TEvGetSequenceResult>> Sequences;
525
+ ui32 CurrentSequencesNumber = 0 ;
526
+
527
+ bool IsReady () const {
528
+ return CurrentPersQueuesNumber == PersQueues.size () && CurrentSequencesNumber == Sequences.size ();
529
+ }
456
530
};
457
531
THolder<TCollectTableSettingsState> CollectTableSettingsState;
458
532
};
0 commit comments