Skip to content

Commit 40749e9

Browse files
[+] cache for SchemeNavigate
1 parent a87320a commit 40749e9

File tree

3 files changed

+54
-3
lines changed

3 files changed

+54
-3
lines changed

ydb/core/kqp/session_actor/kqp_session_actor.cpp

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,9 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
240240
<< "ActorId: " << SelfId() << ", "
241241
<< "ActorState: " << CurrentStateFuncName() << ", ";
242242
if (Y_LIKELY(QueryState)) {
243+
if (QueryState->HasTxControl()) {
244+
result << " TxId: " << QueryState->GetTxControl().tx_id() << ", ";
245+
}
243246
result << "TraceId: " << QueryState->UserRequestContext->TraceId << ", ";
244247
}
245248
return result;
@@ -547,6 +550,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
547550
}
548551

549552
void AddOffsetsToTransaction() {
553+
LOG_I("begin request for TopicOperations");
550554
YQL_ENSURE(QueryState);
551555
if (!PrepareQueryTransaction()) {
552556
return;
@@ -557,6 +561,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
557561
if (!AreAllTheTopicsAndPartitionsKnown()) {
558562
auto navigate = QueryState->BuildSchemeCacheNavigate();
559563
Become(&TKqpSessionActor::ExecuteState);
564+
LOG_I("begin request for SchemeNavigate");
560565
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate.release()));
561566
return;
562567
}
@@ -572,7 +577,10 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
572577
return;
573578
}
574579

580+
LOG_I("end request for TopicOperations");
575581
ReplySuccess();
582+
583+
LOG_I("after ReplySuccess");
576584
}
577585

578586
void CompileQuery() {
@@ -2909,8 +2917,10 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
29092917
}
29102918

29112919
void ProcessTopicOps(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
2920+
LOG_I("end request for SchemeNavigate");
29122921
YQL_ENSURE(ev->Get()->Request);
29132922
if (ev->Get()->Request->Cookie < QueryId) {
2923+
LOG_I("unexpected return #2");
29142924
return;
29152925
}
29162926

@@ -2936,21 +2946,33 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
29362946
}
29372947

29382948
if (HasTopicWriteOperations() && !HasTopicWriteId()) {
2949+
LOG_I("begin request for WriteId");
29392950
Send(MakeTxProxyID(), new TEvTxUserProxy::TEvAllocateTxId, 0, QueryState->QueryId);
29402951
} else {
2952+
LOG_I("end request for TopicOperations");
29412953
ReplySuccess();
2954+
2955+
LOG_I("after ReplySuccess");
29422956
}
29432957
}
29442958

29452959
void Handle(TEvTxUserProxy::TEvAllocateTxIdResult::TPtr& ev) {
2960+
LOG_I("end request for WriteId");
29462961
if (CurrentStateFunc() != &TThis::ExecuteState || ev->Cookie < QueryId) {
2962+
LOG_I("unexpected return #1");
29472963
return;
29482964
}
29492965

29502966
YQL_ENSURE(QueryState);
29512967
YQL_ENSURE(QueryState->GetAction() == NKikimrKqp::QUERY_ACTION_TOPIC);
2968+
29522969
SetTopicWriteId(NLongTxService::TLockHandle(ev->Get()->TxId, TActivationContext::ActorSystem()));
2970+
2971+
LOG_I("end request for TopicOperations");
29532972
ReplySuccess();
2973+
2974+
LOG_I("current state: " << CurrentStateFuncName());
2975+
LOG_I("after ReplySuccess");
29542976
}
29552977

29562978
bool HasTopicWriteOperations() const {

ydb/core/kqp/topics/kqp_topics.cpp

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,8 @@ bool TTopicOperations::ProcessSchemeCacheNavigate(const NSchemeCache::TSchemeCac
412412
p->second.SetTabletId(partition.GetTabletId());
413413
}
414414
}
415+
416+
CachedNavigateResult_[path] = result;
415417
} else {
416418
builder << "Topic '" << JoinPath(result.Path) << "' is missing";
417419

@@ -428,9 +430,34 @@ bool TTopicOperations::ProcessSchemeCacheNavigate(const NSchemeCache::TSchemeCac
428430
return true;
429431
}
430432

431-
bool TTopicOperations::HasThisPartitionAlreadyBeenAdded(const TString& topic, ui32 partitionId) const
433+
bool TTopicOperations::HasThisPartitionAlreadyBeenAdded(const TString& topicPath, ui32 partitionId)
432434
{
433-
return Operations_.contains({topic, partitionId});
435+
if (Operations_.contains({topicPath, partitionId})) {
436+
return true;
437+
}
438+
if (!CachedNavigateResult_.contains(topicPath)) {
439+
return false;
440+
}
441+
442+
const NSchemeCache::TSchemeCacheNavigate::TEntry& entry =
443+
CachedNavigateResult_.at(topicPath);
444+
const NKikimrSchemeOp::TPersQueueGroupDescription& description =
445+
entry.PQGroupInfo->Description;
446+
447+
TString path = CanonizePath(entry.Path);
448+
Y_ABORT_UNLESS(path == topicPath,
449+
"path=%s, topicPath=%s",
450+
path.data(), topicPath.data());
451+
452+
for (const auto& partition : description.GetPartitions()) {
453+
if (partition.GetPartitionId() == partitionId) {
454+
TTopicPartition key{topicPath, partitionId};
455+
Operations_[key].SetTabletId(partition.GetTabletId());
456+
return true;
457+
}
458+
}
459+
460+
return false;
434461
}
435462

436463
void TTopicOperations::BuildTopicTxs(TTopicOperationTransactions& txs)

ydb/core/kqp/topics/kqp_topics.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ class TTopicOperations {
159159

160160
size_t GetSize() const;
161161

162-
bool HasThisPartitionAlreadyBeenAdded(const TString& topic, ui32 partitionId) const;
162+
bool HasThisPartitionAlreadyBeenAdded(const TString& topic, ui32 partitionId);
163163

164164
private:
165165
THashMap<TTopicPartition, TTopicPartitionOperations, TTopicPartition::THash> Operations_;
@@ -168,6 +168,8 @@ class TTopicOperations {
168168

169169
TMaybe<TString> Consumer_;
170170
NLongTxService::TLockHandle WriteId_;
171+
172+
THashMap<TString, NSchemeCache::TSchemeCacheNavigate::TEntry> CachedNavigateResult_;
171173
};
172174

173175
}

0 commit comments

Comments
 (0)