Skip to content

Commit e96a66e

Browse files
authored
Fix kafka cdc read and login without @ (#10678)
1 parent 12559bf commit e96a66e

File tree

8 files changed

+198
-24
lines changed

8 files changed

+198
-24
lines changed

ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,16 @@ struct PartitionOffsets {
2828
class TTopicOffsetActor: public NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor<TTopicOffsetActor,
2929
NKikimr::NGRpcProxy::V1::TLocalRequestBase,
3030
NKikimr::NGRpcProxy::V1::TEvPQProxy::TEvPartitionLocationResponse>,
31-
public NKikimr::NGRpcProxy::V1::TDescribeTopicActorImpl {
31+
public NKikimr::NGRpcProxy::V1::TDescribeTopicActorImpl,
32+
public NKikimr::NGRpcProxy::V1::TCdcStreamCompatible {
3233
using TBase = NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor<TTopicOffsetActor,
3334
NKikimr::NGRpcProxy::V1::TLocalRequestBase,
3435
NKikimr::NGRpcProxy::V1::TEvPQProxy::TEvPartitionLocationResponse>;
3536

3637
public:
3738
TTopicOffsetActor(std::shared_ptr<TSet<TString>> consumers,
3839
const NKikimr::NGRpcProxy::V1::TLocalRequestBase& request,
39-
const TActorId& requester,
40+
const TActorId& requester,
4041
std::shared_ptr<TSet<ui32>> partitions,
4142
const TString& originalTopicName,
4243
const TString& userSID)

ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -134,12 +134,13 @@ bool TKafkaSaslAuthActor::TryParseAuthDataTo(TKafkaSaslAuthActor::TAuthData& aut
134134
auto password = tokens[2];
135135
size_t atPos = userAndDatabase.rfind('@');
136136
if (atPos == TString::npos) {
137-
SendResponseAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, "Database not provided.", "", ctx);
138-
return false;
137+
authData.UserName = "";
138+
authData.Database = userAndDatabase;
139+
} else {
140+
authData.UserName = userAndDatabase.substr(0, atPos);
141+
authData.Database = userAndDatabase.substr(atPos + 1);
139142
}
140143

141-
authData.UserName = userAndDatabase.substr(0, atPos);
142-
authData.Database = userAndDatabase.substr(atPos + 1);
143144
authData.Password = password;
144145
return true;
145146
}

ydb/core/kafka_proxy/actors/kafka_topic_offsets_actor.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ namespace NKafka {
1414
class TTopicOffsetsActor : public NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor<TTopicOffsetsActor,
1515
TEvKafka::TGetOffsetsRequest,
1616
TEvKafka::TEvTopicOffsetsResponse>
17-
, public NKikimr::NGRpcProxy::V1::TDescribeTopicActorImpl {
17+
, public NKikimr::NGRpcProxy::V1::TDescribeTopicActorImpl
18+
, public NKikimr::NGRpcProxy::V1::TCdcStreamCompatible {
1819

1920
using TBase = TPQInternalSchemaActor<TTopicOffsetsActor,
2021
TEvKafka::TGetOffsetsRequest,
@@ -30,17 +31,18 @@ using TBase = TPQInternalSchemaActor<TTopicOffsetsActor,
3031
void StateWork(TAutoPtr<IEventHandle>& ev);
3132

3233
void HandleCacheNavigateResponse(NKikimr::TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) override;
33-
34+
3435
virtual void ApplyResponse(TTabletInfo&, NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse::TPtr&,
3536
const TActorContext&) override {
3637
Y_ABORT();
3738
}
39+
3840
bool ApplyResponse(NKikimr::TEvPersQueue::TEvGetPartitionsLocationResponse::TPtr&, const TActorContext&) override {
3941
Y_ABORT();
4042
}
4143

4244
void ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx) override;
43-
45+
4446
void Reply(const TActorContext&) override;
4547

4648
void RaiseError(const TString& error, const Ydb::PersQueue::ErrorCode::ErrorCode errorCode, const Ydb::StatusIds::StatusCode status, const TActorContext&) override;

ydb/core/kafka_proxy/ut/ut_protocol.cpp

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1000,6 +1000,11 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
10001000
TString topicName = "/Root/topic-0-test";
10011001
TString shortTopicName = "topic-0-test";
10021002
TString notExistsTopicName = "/Root/not-exists";
1003+
1004+
TString tableName = "/Root/table-0-test";
1005+
TString feedName = "feed";
1006+
TString feedPath = tableName + "/" + feedName;
1007+
10031008
ui64 minActivePartitions = 10;
10041009

10051010
TString key = "record-key";
@@ -1207,6 +1212,60 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
12071212
}
12081213
}
12091214

1215+
{
1216+
NYdb::NTable::TTableClient tableClient(*testServer.Driver);
1217+
tableClient.RetryOperationSync([&](TSession session)
1218+
{
1219+
NYdb::NTable::TTableBuilder builder;
1220+
builder.AddNonNullableColumn("key", NYdb::EPrimitiveType::Int64).SetPrimaryKeyColumn("key");
1221+
builder.AddNonNullableColumn("value", NYdb::EPrimitiveType::Int64);
1222+
1223+
auto createResult = session.CreateTable(tableName, builder.Build()).ExtractValueSync();
1224+
UNIT_ASSERT_VALUES_EQUAL(createResult.IsTransportError(), false);
1225+
Cerr << createResult.GetIssues().ToString() << "\n";
1226+
UNIT_ASSERT_VALUES_EQUAL(createResult.GetStatus(), EStatus::SUCCESS);
1227+
1228+
auto alterResult = session.AlterTable(tableName, NYdb::NTable::TAlterTableSettings()
1229+
.AppendAddChangefeeds(NYdb::NTable::TChangefeedDescription(feedName,
1230+
NYdb::NTable::EChangefeedMode::Updates,
1231+
NYdb::NTable::EChangefeedFormat::Json))
1232+
).ExtractValueSync();
1233+
Cerr << alterResult.GetIssues().ToString() << "\n";
1234+
UNIT_ASSERT_VALUES_EQUAL(alterResult.IsTransportError(), false);
1235+
UNIT_ASSERT_VALUES_EQUAL(alterResult.GetStatus(), EStatus::SUCCESS);
1236+
return alterResult;
1237+
}
1238+
);
1239+
1240+
TValueBuilder rows;
1241+
rows.BeginList();
1242+
rows.AddListItem()
1243+
.BeginStruct()
1244+
.AddMember("key").Int64(1)
1245+
.AddMember("value").Int64(2)
1246+
.EndStruct();
1247+
rows.EndList();
1248+
1249+
auto upsertResult = tableClient.BulkUpsert(tableName, rows.Build()).GetValueSync();
1250+
UNIT_ASSERT_EQUAL(upsertResult.GetStatus(), EStatus::SUCCESS);
1251+
}
1252+
1253+
{
1254+
// Check CDC
1255+
std::vector<std::pair<TString, std::vector<i32>>> topics {{feedPath, {0}}};
1256+
auto msg = client.Fetch(topics);
1257+
UNIT_ASSERT_VALUES_EQUAL(msg->Responses.size(), 1);
1258+
UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Partitions.size(), 1);
1259+
UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Partitions[0].ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
1260+
UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Partitions[0].Records.has_value(), true);
1261+
UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Partitions[0].Records->Records.size(), 1);
1262+
auto record = msg->Responses[0].Partitions[0].Records->Records[0];
1263+
1264+
auto data = record.Value.value();
1265+
auto dataStr = TString(data.data(), data.size());
1266+
UNIT_ASSERT_VALUES_EQUAL(dataStr, "{\"update\":{\"value\":2},\"key\":[1]}");
1267+
}
1268+
12101269
} // Y_UNIT_TEST(FetchScenario)
12111270

12121271
Y_UNIT_TEST(BalanceScenario) {
@@ -2300,4 +2359,53 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
23002359

23012360
Sleep(TDuration::Seconds(1));
23022361
}
2362+
2363+
Y_UNIT_TEST(LoginWithApiKeyWithoutAt) {
2364+
TInsecureTestServer testServer;
2365+
2366+
TString topicName = "/Root/topic-0-test";
2367+
2368+
NYdb::NTopic::TTopicClient pqClient(*testServer.Driver);
2369+
{
2370+
auto result =
2371+
pqClient
2372+
.CreateTopic(topicName,
2373+
NYdb::NTopic::TCreateTopicSettings()
2374+
.PartitioningSettings(10, 100)
2375+
.BeginAddConsumer("consumer-0").EndAddConsumer())
2376+
.ExtractValueSync();
2377+
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
2378+
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
2379+
}
2380+
2381+
auto settings = NTopic::TReadSessionSettings()
2382+
.AppendTopics(NTopic::TTopicReadSettings(topicName))
2383+
.ConsumerName("consumer-0");
2384+
auto topicReader = pqClient.CreateReadSession(settings);
2385+
2386+
TTestClient client(testServer.Port);
2387+
2388+
{
2389+
auto msg = client.ApiVersions();
2390+
2391+
UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
2392+
UNIT_ASSERT_VALUES_EQUAL(msg->ApiKeys.size(), 18u);
2393+
}
2394+
2395+
{
2396+
auto msg = client.SaslHandshake();
2397+
2398+
UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
2399+
UNIT_ASSERT_VALUES_EQUAL(msg->Mechanisms.size(), 1u);
2400+
UNIT_ASSERT_VALUES_EQUAL(*msg->Mechanisms[0], "PLAIN");
2401+
}
2402+
2403+
{
2404+
auto msg = client.SaslAuthenticate("/Root", "ApiKey-value-valid");
2405+
Cerr << msg->ErrorMessage << "\n";
2406+
UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
2407+
}
2408+
2409+
Sleep(TDuration::Seconds(1));
2410+
}
23032411
} // Y_UNIT_TEST_SUITE(KafkaProtocol)

ydb/core/persqueue/fetch_request_actor.cpp

Lines changed: 50 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,9 @@ struct TEvPrivate {
9393
TActorId RequesterId;
9494
ui64 PendingQuotaAmount;
9595

96+
std::unordered_map<TString, TString> PrivateTopicPathToCdcPath;
97+
std::unordered_map<TString, TString> CdcPathToPrivateTopicPath;
98+
9699
public:
97100
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
98101
return NKikimrServices::TActivity::PQ_FETCH_REQUEST;
@@ -176,16 +179,24 @@ struct TEvPrivate {
176179
schemeCacheRequest->DatabaseName = Settings.Database;
177180

178181
THashSet<TString> topicsRequested;
179-
for (const auto& part : Settings.Partitions) {
180-
auto ins = topicsRequested.insert(part.Topic).second;
181-
if (!ins)
182-
continue;
183-
auto split = NKikimr::SplitPath(part.Topic);
182+
183+
if (PrivateTopicPathToCdcPath.empty()) {
184+
for (const auto& part : Settings.Partitions) {
185+
topicsRequested.insert(part.Topic);
186+
}
187+
} else {
188+
for (const auto& [key, value] : PrivateTopicPathToCdcPath) {
189+
topicsRequested.insert(key);
190+
}
191+
}
192+
193+
for (const auto& topicName : topicsRequested) {
194+
auto split = NKikimr::SplitPath(topicName);
184195
TSchemeCacheNavigate::TEntry entry;
185196
entry.Path.insert(entry.Path.end(), split.begin(), split.end());
186197

187198
entry.SyncVersion = true;
188-
entry.ShowPrivatePath = false;
199+
entry.ShowPrivatePath = true;
189200
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList;
190201

191202
schemeCacheRequest->ResultSet.emplace_back(std::move(entry));
@@ -197,6 +208,7 @@ struct TEvPrivate {
197208
void HandleSchemeCacheResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
198209
LOG_DEBUG_S(ctx, NKikimrServices::PQ_FETCH_REQUEST, "Handle SchemeCache response");
199210
auto& result = ev->Get()->Request;
211+
bool anyCdcTopicInRequest = false;
200212
for (const auto& entry : result->ResultSet) {
201213
auto path = CanonizePath(NKikimr::JoinPath(entry.Path));
202214
switch (entry.Status) {
@@ -219,6 +231,16 @@ struct TEvPrivate {
219231
), ctx
220232
);
221233
}
234+
if (entry.Kind == NSchemeCache::TSchemeCacheNavigate::KindCdcStream) {
235+
anyCdcTopicInRequest = true;
236+
Y_ABORT_UNLESS(entry.ListNodeEntry->Children.size() == 1);
237+
auto privateTopicPath = CanonizePath(JoinPath(ChildPath(NKikimr::SplitPath(path), entry.ListNodeEntry->Children.at(0).Name)));
238+
PrivateTopicPathToCdcPath[privateTopicPath] = path;
239+
CdcPathToPrivateTopicPath[path] = privateTopicPath;
240+
TopicInfo[privateTopicPath] = TopicInfo[path];
241+
TopicInfo.erase(path);
242+
continue;
243+
}
222244
if (entry.Kind != TSchemeCacheNavigate::EKind::KindTopic) {
223245
return SendReplyAndDie(
224246
CreateErrorReply(
@@ -256,6 +278,12 @@ struct TEvPrivate {
256278
topicInfo.BalancerTabletId = description.GetBalancerTabletID();
257279
topicInfo.PQInfo = entry.PQGroupInfo;
258280
}
281+
282+
if (anyCdcTopicInRequest) {
283+
SendSchemeCacheRequest(ctx);
284+
return;
285+
}
286+
259287
for (auto& p: TopicInfo) {
260288
ProcessMetadata(p.first, p.second, ctx);
261289
}
@@ -393,8 +421,15 @@ struct TEvPrivate {
393421
return SendReplyAndDie(std::move(Response), ctx);
394422
}
395423
Y_ABORT_UNLESS(FetchRequestReadsDone < Settings.Partitions.size());
396-
const auto& req = Settings.Partitions[FetchRequestReadsDone];
397-
const auto& topic = req.Topic;
424+
auto& req = Settings.Partitions[FetchRequestReadsDone];
425+
426+
auto& topic = req.Topic;
427+
428+
auto cdcToPrivateIt = CdcPathToPrivateTopicPath.find(req.Topic);
429+
if (cdcToPrivateIt != CdcPathToPrivateTopicPath.end()) {
430+
topic = cdcToPrivateIt->second;
431+
}
432+
398433
const auto& offset = req.Offset;
399434
const auto& part = req.Partition;
400435
const auto& maxBytes = req.MaxBytes;
@@ -462,7 +497,13 @@ struct TEvPrivate {
462497
const auto& topic = req.Topic;
463498
const auto& part = req.Partition;
464499

465-
res->SetTopic(topic);
500+
auto privateTopicToCdcIt = PrivateTopicPathToCdcPath.find(topic);
501+
if (privateTopicToCdcIt == PrivateTopicPathToCdcPath.end()) {
502+
res->SetTopic(topic);
503+
} else {
504+
res->SetTopic(PrivateTopicPathToCdcPath[topic]);
505+
}
506+
466507
res->SetPartition(part);
467508
auto read = res->MutableReadResult();
468509
if (record.HasPartitionResponse() && record.GetPartitionResponse().HasCmdReadResult())

ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,8 @@ namespace {
338338
Cout << Endl << "MeteringMode: " << (TStringBuilder() << topicDescription.GetMeteringMode());
339339
if (!topicDescription.GetSupportedCodecs().empty()) {
340340
Cout << Endl << "SupportedCodecs: " << FormatCodecs(topicDescription.GetSupportedCodecs()) << Endl;
341+
} else {
342+
Cout << Endl;
341343
}
342344
}
343345

@@ -352,6 +354,8 @@ namespace {
352354
Cout << Endl << "DownUtilizationPercent: " << topicDescription.GetPartitioningSettings().GetAutoPartitioningSettings().GetDownUtilizationPercent();
353355
Cout << Endl << "UpUtilizationPercent: " << topicDescription.GetPartitioningSettings().GetAutoPartitioningSettings().GetUpUtilizationPercent();
354356
Cout << Endl << "StabilizationWindowSeconds: " << topicDescription.GetPartitioningSettings().GetAutoPartitioningSettings().GetStabilizationWindow().Seconds() << Endl;
357+
} else {
358+
Cout << Endl;
355359
}
356360
}
357361
}

ydb/services/lib/actors/pq_schema_actor.h

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,6 @@ namespace NKikimr::NGRpcProxy::V1 {
178178
if (ProcessCdc(response)) {
179179
return;
180180
}
181-
182181
AddIssue(
183182
FillIssue(
184183
TStringBuilder() << "path '" << path << "' is not compatible scheme object",
@@ -294,6 +293,7 @@ namespace NKikimr::NGRpcProxy::V1 {
294293
const TMaybe<TString>& GetCdcStreamName() const {
295294
return CdcStreamName;
296295
}
296+
297297
void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
298298
return static_cast<TDerived*>(this)->HandleCacheNavigateResponse(ev);
299299
}
@@ -541,11 +541,15 @@ namespace NKikimr::NGRpcProxy::V1 {
541541
virtual void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) = 0;
542542

543543
TString GetTopicPath() const override {
544-
return TBase::TopicPath;
544+
auto path = TBase::TopicPath;
545+
if (PrivateTopicName) {
546+
path = JoinPath(ChildPath(NKikimr::SplitPath(path), *PrivateTopicName));
547+
}
548+
return path;
545549
}
546550

547-
void SendDescribeProposeRequest() {
548-
return TBase::SendDescribeProposeRequest(this->ActorContext(), false);
551+
void SendDescribeProposeRequest(bool showPrivate = false) {
552+
return TBase::SendDescribeProposeRequest(this->ActorContext(), showPrivate);
549553
}
550554

551555
bool HandleCacheNavigateResponseBase(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
@@ -594,16 +598,28 @@ namespace NKikimr::NGRpcProxy::V1 {
594598
return false;
595599
}
596600

601+
bool ProcessCdc(const NSchemeCache::TSchemeCacheNavigate::TEntry& response) override {
602+
if constexpr (THasCdcStreamCompatibility<TDerived>::Value) {
603+
if (static_cast<TDerived*>(this)->IsCdcStreamCompatible()) {
604+
Y_ABORT_UNLESS(response.ListNodeEntry->Children.size() == 1);
605+
PrivateTopicName = response.ListNodeEntry->Children.at(0).Name;
606+
SendDescribeProposeRequest(true);
607+
return true;
608+
}
609+
}
610+
return false;
611+
}
612+
597613

598614
private:
599615
TRequest Request;
600616
TActorId Requester;
601-
TMaybe<TString> PrivateTopicName;
602617

603618
protected:
604619
THolder<TEvResponse> Response;
605620
TIntrusiveConstPtr<NSchemeCache::TSchemeCacheNavigate::TPQGroupInfo> PQGroupInfo;
606621
TIntrusiveConstPtr<NSchemeCache::TSchemeCacheNavigate::TDirEntryInfo> Self;
622+
TMaybe<TString> PrivateTopicName;
607623
};
608624

609625
}

ydb/services/persqueue_v1/actors/schema_actors.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,8 @@ class TAlterTopicActorInternal : public TPQInternalSchemaActor<TAlterTopicActorI
418418
class TPartitionsLocationActor : public TPQInternalSchemaActor<TPartitionsLocationActor,
419419
TGetPartitionsLocationRequest,
420420
TEvPQProxy::TEvPartitionLocationResponse>
421-
, public TDescribeTopicActorImpl {
421+
, public TDescribeTopicActorImpl
422+
, public TCdcStreamCompatible {
422423

423424
using TBase = TPQInternalSchemaActor<TPartitionsLocationActor, TGetPartitionsLocationRequest,
424425
TEvPQProxy::TEvPartitionLocationResponse>;

0 commit comments

Comments
 (0)