Skip to content

Commit 6619a8f

Browse files
authored
provide database in schemecache request (#15940)
Provide database name in schemecache request. Without database name schemecache could answer with PathUnknown instead of Unavailable in some cases. This leads to bug, described in KIKIMR-22718.
1 parent fa9c0a2 commit 6619a8f

File tree

7 files changed

+42
-4
lines changed

7 files changed

+42
-4
lines changed

ydb/core/client/server/msgbus_server_pq_metacache.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -519,7 +519,11 @@ class TPersQueueMetaCacheActor : public TActorBootstrapped<TPersQueueMetaCacheAc
519519
auto inserted = DescribeTopicsWaiters.insert(std::make_pair(reqId, waiter)).second;
520520
Y_ABORT_UNLESS(inserted);
521521

522+
TMaybe<TString> db = {};
523+
522524
for (const auto& [path, database] : waiter->GetTopics()) {
525+
if (!db) db = database;
526+
if (*db != database) db = "";
523527
auto split = NKikimr::SplitPath(path);
524528
TSchemeCacheNavigate::TEntry entry;
525529
if (!split.empty()) {
@@ -532,8 +536,9 @@ class TPersQueueMetaCacheActor : public TActorBootstrapped<TPersQueueMetaCacheAc
532536

533537
schemeCacheRequest->ResultSet.emplace_back(std::move(entry));
534538
}
535-
536-
LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "send request for " << (waiter->Type == EWaiterType::DescribeAllTopics ? " all " : "") << waiter->GetTopics().size() << " topics, got " << DescribeTopicsWaiters.size() << " requests infly");
539+
if (db) schemeCacheRequest->DatabaseName = *db;
540+
LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "send request for " << (waiter->Type == EWaiterType::DescribeAllTopics ? " all " : "")
541+
<< waiter->GetTopics().size() << " topics, got " << DescribeTopicsWaiters.size() << " requests infly, db = \"" << db << "\"");
537542

538543
ctx.Send(SchemeCacheId, new TEvTxProxySchemeCache::TEvNavigateKeySet(schemeCacheRequest.release()), 0, 0, waiter->Span.GetTraceId());
539544
}

ydb/core/http_proxy/http_req.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1458,6 +1458,7 @@ namespace NKikimr::NHttpProxy {
14581458
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpPath;
14591459
entry.SyncVersion = false;
14601460
schemeCacheRequest->ResultSet.emplace_back(entry);
1461+
schemeCacheRequest->DatabaseName = CanonizePath(DatabasePath);
14611462
ctx.Send(MakeSchemeCacheID(), MakeHolder<TEvTxProxySchemeCache::TEvNavigateKeySet>(schemeCacheRequest.release()));
14621463
}
14631464

ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -558,6 +558,8 @@ void TKafkaProduceActor::ProcessInitializationRequests(const TActorContext& ctx)
558558
request->ResultSet.emplace_back(entry);
559559
}
560560

561+
request->DatabaseName = CanonizePath(Context->DatabasePath);
562+
561563
ctx.Send(MakeSchemeCacheID(), MakeHolder<TEvTxProxySchemeCache::TEvNavigateKeySet>(request.release()));
562564
}
563565

ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,13 +182,25 @@ void TKafkaSaslAuthActor::SendDescribeRequest(const TActorContext& ctx) {
182182
entry.Operation = NKikimr::NSchemeCache::TSchemeCacheNavigate::OpPath;
183183
entry.SyncVersion = false;
184184
schemeCacheRequest->ResultSet.emplace_back(entry);
185+
schemeCacheRequest->DatabaseName = CanonizePath(DatabasePath);
185186
ctx.Send(NKikimr::MakeSchemeCacheID(), MakeHolder<NKikimr::TEvTxProxySchemeCache::TEvNavigateKeySet>(schemeCacheRequest.release()));
186187
}
187188

188189
void TKafkaSaslAuthActor::Handle(NKikimr::TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
189190
const NKikimr::NSchemeCache::TSchemeCacheNavigate* navigate = ev->Get()->Request.Get();
190191
if (navigate->ErrorCount) {
191-
SendResponseAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, "", TStringBuilder() << "Database with path '" << DatabasePath << "' doesn't exists", ctx);
192+
switch(navigate->ResultSet.front().Status) {
193+
case NSchemeCache::TSchemeCacheNavigate::EStatus::RootUnknown:
194+
case NSchemeCache::TSchemeCacheNavigate::EStatus::PathErrorUnknown:
195+
case NSchemeCache::TSchemeCacheNavigate::EStatus::PathNotTable:
196+
case NSchemeCache::TSchemeCacheNavigate::EStatus::PathNotPath:
197+
case NSchemeCache::TSchemeCacheNavigate::EStatus::TableCreationNotComplete:
198+
return SendResponseAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, "", TStringBuilder() << "Database with path '" << DatabasePath << "' doesn't exists", ctx);
199+
case NSchemeCache::TSchemeCacheNavigate::EStatus::AccessDenied:
200+
return SendResponseAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, "", TStringBuilder() << "Database with path '" << DatabasePath << "' access denied", ctx);
201+
default:
202+
return SendResponseAndDie(EKafkaErrors::BROKER_NOT_AVAILABLE, "", TStringBuilder() << "Internal error with navigate status " << navigate->ResultSet.front().Status, ctx);
203+
}
192204
return;
193205
}
194206
Y_ABORT_UNLESS(navigate->ResultSet.size() == 1);

ydb/services/datastreams/put_records_actor.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,7 @@ namespace NKikimr::NDataStreams::V1 {
312312
entry.Path = NKikimr::SplitPath(this->GetTopicPath());
313313
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList;
314314
entry.SyncVersion = true;
315+
schemeCacheRequest->DatabaseName = CanonizePath(this->Request_->GetDatabaseName().GetOrElse(""));
315316
schemeCacheRequest->ResultSet.emplace_back(entry);
316317
ctx.Send(MakeSchemeCacheID(), MakeHolder<TEvTxProxySchemeCache::TEvNavigateKeySet>(schemeCacheRequest.release()));
317318
}

ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ void TReadInitAndAuthActor::SendCacheNavigateRequest(const TActorContext& ctx, c
8484
entry.SyncVersion = true;
8585
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpPath;
8686
schemeCacheRequest->ResultSet.emplace_back(entry);
87+
schemeCacheRequest->DatabaseName = AppData(ctx)->PQConfig.GetDatabase();
8788
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " Send client acl request");
8889
ctx.Send(NewSchemeCache, new TEvTxProxySchemeCache::TEvNavigateKeySet(schemeCacheRequest.Release()));
8990
}

ydb/services/persqueue_v1/persqueue_compat_ut.cpp

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,20 @@ class TPQv1CompatTestBase {
4242
Server->AnnoyingClient->CreateTopicNoLegacy(
4343
"/Root/LbCommunal/account/topic2", 1, true, true, {}, {}, "account"
4444
);
45-
4645
Server->AnnoyingClient->CreateTopicNoLegacy(
4746
"/Root/LbCommunal/account/topic2-mirrored-from-dc2", 1, true, false, {}, {}, "account"
4847
);
48+
49+
Server->AnnoyingClient->MkDir("/Root", "LbCommunal");
50+
Server->AnnoyingClient->MkDir("/Root/LbCommunal", "account2");
51+
Server->AnnoyingClient->CreateTopicNoLegacy(
52+
"/Root/LbCommunal/account2/topic3", 1, true, true, {}, {}, "account2"
53+
);
54+
Server->AnnoyingClient->CreateTopicNoLegacy(
55+
"/Root/LbCommunal/account2/topic3-mirrored-from-dc2", 1, true, false, {}, {}, "account2"
56+
);
57+
58+
4959
Server->AnnoyingClient->CreateConsumer("test-consumer");
5060
InitPQLib();
5161
}
@@ -153,6 +163,12 @@ Y_UNIT_TEST_SUITE(TPQCompatTest) {
153163
GetLocks({"account/topic1", "account/topic2"}, rs);
154164
rs->Close();
155165
}
166+
{
167+
auto rs = testServer.CreateReadSession({"account/topic1", "account/topic2", "account2/topic3"});
168+
GetLocks({"account/topic1", "account/topic2", "account2/topic3"}, rs);
169+
rs->Close();
170+
}
171+
156172
}
157173

158174
Y_UNIT_TEST(BadTopics) {

0 commit comments

Comments
 (0)