Skip to content

Commit 736f850

Browse files
[*] unnecessary SchemeCache calls
1 parent 272d2eb commit 736f850

File tree

3 files changed

+40
-4
lines changed

3 files changed

+40
-4
lines changed

ydb/core/kqp/session_actor/kqp_session_actor.cpp

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,21 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
531531
CompileQuery();
532532
}
533533

534+
bool AreAllTheTopicsAndPartitionsKnown() const {
535+
const NKikimrKqp::TTopicOperationsRequest& operations = QueryState->GetTopicOperations();
536+
for (const auto& topic : operations.GetTopics()) {
537+
auto path = CanonizePath(NPersQueue::GetFullTopicPath(TlsActivationContext->AsActorContext(),
538+
QueryState->GetDatabase(), topic.path()));
539+
540+
for (const auto& partition : topic.partitions()) {
541+
if (!QueryState->TxCtx->TopicOperations.HasThisPartitionAlreadyBeenAdded(path, partition.partition_id())) {
542+
return false;
543+
}
544+
}
545+
}
546+
return true;
547+
}
548+
534549
void AddOffsetsToTransaction() {
535550
YQL_ENSURE(QueryState);
536551
if (!PrepareQueryTransaction()) {
@@ -539,10 +554,25 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
539554

540555
QueryState->AddOffsetsToTransaction();
541556

542-
auto navigate = QueryState->BuildSchemeCacheNavigate();
557+
if (!AreAllTheTopicsAndPartitionsKnown()) {
558+
auto navigate = QueryState->BuildSchemeCacheNavigate();
559+
Become(&TKqpSessionActor::ExecuteState);
560+
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate.release()));
561+
return;
562+
}
543563

544-
Become(&TKqpSessionActor::ExecuteState);
545-
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate.release()));
564+
TString message;
565+
if (!QueryState->TryMergeTopicOffsets(QueryState->TopicOperations, message)) {
566+
ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << message;
567+
}
568+
569+
if (HasTopicWriteOperations() && !HasTopicWriteId()) {
570+
Become(&TKqpSessionActor::ExecuteState);
571+
Send(MakeTxProxyID(), new TEvTxUserProxy::TEvAllocateTxId, 0, QueryState->QueryId);
572+
return;
573+
}
574+
575+
ReplySuccess();
546576
}
547577

548578
void CompileQuery() {

ydb/core/kqp/topics/kqp_topics.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,6 @@ void TTopicPartitionOperations::Merge(const TTopicPartitionOperations& rhs)
194194
{
195195
Y_ABORT_UNLESS(Topic_.Empty() || Topic_ == rhs.Topic_);
196196
Y_ABORT_UNLESS(Partition_.Empty() || Partition_ == rhs.Partition_);
197-
Y_ABORT_UNLESS(TabletId_.Empty() || TabletId_ == rhs.TabletId_);
198197

199198
if (Topic_.Empty()) {
200199
Topic_ = rhs.Topic_;
@@ -429,6 +428,11 @@ bool TTopicOperations::ProcessSchemeCacheNavigate(const NSchemeCache::TSchemeCac
429428
return true;
430429
}
431430

431+
bool TTopicOperations::HasThisPartitionAlreadyBeenAdded(const TString& topic, ui32 partitionId) const
432+
{
433+
return Operations_.contains({topic, partitionId});
434+
}
435+
432436
void TTopicOperations::BuildTopicTxs(TTopicOperationTransactions& txs)
433437
{
434438
for (auto& [_, operations] : Operations_) {

ydb/core/kqp/topics/kqp_topics.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,8 @@ class TTopicOperations {
159159

160160
size_t GetSize() const;
161161

162+
bool HasThisPartitionAlreadyBeenAdded(const TString& topic, ui32 partitionId) const;
163+
162164
private:
163165
THashMap<TTopicPartition, TTopicPartitionOperations, TTopicPartition::THash> Operations_;
164166
bool HasReadOperations_ = false;

0 commit comments

Comments
 (0)