Skip to content

Commit 217efd0

Browse files
[*] cache for SchemeNavigate
1 parent 40749e9 commit 217efd0

File tree

3 files changed

+22
-21
lines changed

3 files changed

+22
-21
lines changed

ydb/core/kqp/session_actor/kqp_session_actor.cpp

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -550,7 +550,6 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
550550
}
551551

552552
void AddOffsetsToTransaction() {
553-
LOG_I("begin request for TopicOperations");
554553
YQL_ENSURE(QueryState);
555554
if (!PrepareQueryTransaction()) {
556555
return;
@@ -561,7 +560,6 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
561560
if (!AreAllTheTopicsAndPartitionsKnown()) {
562561
auto navigate = QueryState->BuildSchemeCacheNavigate();
563562
Become(&TKqpSessionActor::ExecuteState);
564-
LOG_I("begin request for SchemeNavigate");
565563
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate.release()));
566564
return;
567565
}
@@ -577,10 +575,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
577575
return;
578576
}
579577

580-
LOG_I("end request for TopicOperations");
581578
ReplySuccess();
582-
583-
LOG_I("after ReplySuccess");
584579
}
585580

586581
void CompileQuery() {
@@ -2917,10 +2912,8 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
29172912
}
29182913

29192914
void ProcessTopicOps(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
2920-
LOG_I("end request for SchemeNavigate");
29212915
YQL_ENSURE(ev->Get()->Request);
29222916
if (ev->Get()->Request->Cookie < QueryId) {
2923-
LOG_I("unexpected return #2");
29242917
return;
29252918
}
29262919

@@ -2945,21 +2938,18 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
29452938
ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << message;
29462939
}
29472940

2941+
QueryState->TxCtx->TopicOperations.CacheSchemeCacheNavigate(response->ResultSet);
2942+
29482943
if (HasTopicWriteOperations() && !HasTopicWriteId()) {
2949-
LOG_I("begin request for WriteId");
29502944
Send(MakeTxProxyID(), new TEvTxUserProxy::TEvAllocateTxId, 0, QueryState->QueryId);
2951-
} else {
2952-
LOG_I("end request for TopicOperations");
2953-
ReplySuccess();
2954-
2955-
LOG_I("after ReplySuccess");
2945+
return;
29562946
}
2947+
2948+
ReplySuccess();
29572949
}
29582950

29592951
void Handle(TEvTxUserProxy::TEvAllocateTxIdResult::TPtr& ev) {
2960-
LOG_I("end request for WriteId");
29612952
if (CurrentStateFunc() != &TThis::ExecuteState || ev->Cookie < QueryId) {
2962-
LOG_I("unexpected return #1");
29632953
return;
29642954
}
29652955

@@ -2968,11 +2958,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
29682958

29692959
SetTopicWriteId(NLongTxService::TLockHandle(ev->Get()->TxId, TActivationContext::ActorSystem()));
29702960

2971-
LOG_I("end request for TopicOperations");
29722961
ReplySuccess();
2973-
2974-
LOG_I("current state: " << CurrentStateFuncName());
2975-
LOG_I("after ReplySuccess");
29762962
}
29772963

29782964
bool HasTopicWriteOperations() const {

ydb/core/kqp/topics/kqp_topics.cpp

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,8 @@ void TTopicPartitionOperations::Merge(const TTopicPartitionOperations& rhs)
198198
if (Topic_.Empty()) {
199199
Topic_ = rhs.Topic_;
200200
Partition_ = rhs.Partition_;
201+
}
202+
if (TabletId_.Empty()) {
201203
TabletId_ = rhs.TabletId_;
202204
}
203205

@@ -412,8 +414,6 @@ bool TTopicOperations::ProcessSchemeCacheNavigate(const NSchemeCache::TSchemeCac
412414
p->second.SetTabletId(partition.GetTabletId());
413415
}
414416
}
415-
416-
CachedNavigateResult_[path] = result;
417417
} else {
418418
builder << "Topic '" << JoinPath(result.Path) << "' is missing";
419419

@@ -460,6 +460,20 @@ bool TTopicOperations::HasThisPartitionAlreadyBeenAdded(const TString& topicPath
460460
return false;
461461
}
462462

463+
void TTopicOperations::CacheSchemeCacheNavigate(const NSchemeCache::TSchemeCacheNavigate::TResultSet& results)
464+
{
465+
for (const auto& result : results) {
466+
if (result.Kind != NSchemeCache::TSchemeCacheNavigate::KindTopic) {
467+
continue;
468+
}
469+
if (!result.PQGroupInfo) {
470+
continue;
471+
}
472+
TString path = CanonizePath(result.Path);
473+
CachedNavigateResult_[path] = result;
474+
}
475+
}
476+
463477
void TTopicOperations::BuildTopicTxs(TTopicOperationTransactions& txs)
464478
{
465479
for (auto& [_, operations] : Operations_) {

ydb/core/kqp/topics/kqp_topics.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ class TTopicOperations {
147147
bool ProcessSchemeCacheNavigate(const NSchemeCache::TSchemeCacheNavigate::TResultSet& results,
148148
Ydb::StatusIds_StatusCode& status,
149149
TString& message);
150+
void CacheSchemeCacheNavigate(const NSchemeCache::TSchemeCacheNavigate::TResultSet& results);
150151

151152
void BuildTopicTxs(TTopicOperationTransactions &txs);
152153

0 commit comments

Comments
 (0)