From 18956c443ce7205ab37aa31bad0c26813406a538 Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Thu, 12 Sep 2024 10:11:06 +0000 Subject: [PATCH 1/2] Fix reading without consumer from federation --- .../actors/read_init_auth_actor.cpp | 2 +- ydb/services/persqueue_v1/persqueue_ut.cpp | 84 +++++++++++++++++++ 2 files changed, 85 insertions(+), 1 deletion(-) diff --git a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp index 5ded05725522..b8aed7e78868 100644 --- a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp +++ b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp @@ -180,7 +180,7 @@ void TReadInitAndAuthActor::HandleTopicsDescribeResponse(TEvDescribeTopicsRespon } // ToDo[migration] - separate option - ? - bool doCheckClientAcl = DoCheckACL && !AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen(); + bool doCheckClientAcl = DoCheckACL && !AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen() && !SkipReadRuleCheck; if (doCheckClientAcl) { CheckClientACL(ctx); } else { diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 4966ba9330a3..6ba7ed425ace 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -7027,6 +7027,90 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}}; driver->Stop(); } + Y_UNIT_TEST(ReadWithoutConsumerFederation) { + const ui32 partititonsCount = 5; + + TPersQueueV1TestServer server; + server.Server->AnnoyingClient->CreateTopic("rt3.dc1--topic2", partititonsCount); + + auto writeSettings = NYdb::NPersQueue::TWriteSessionSettings() + .Path("rt3.dc1--topic2") + .MessageGroupId("src_id"); + + auto writer = server.PersQueueClient->CreateSimpleBlockingWriteSession(writeSettings); + + auto res = writer->Write("some_data"); + UNIT_ASSERT(res); + writer->Close(); + + std::shared_ptr Channel_; + std::unique_ptr StubP_; + + Channel_ = grpc::CreateChannel("localhost:" + ToString(server.Server->GrpcPort), grpc::InsecureChannelCredentials()); + StubP_ = Ydb::Topic::V1::TopicService::NewStub(Channel_); + + grpc::ClientContext rcontext; + auto readStream = StubP_->StreamRead(&rcontext); + UNIT_ASSERT(readStream); + + { + Ydb::Topic::StreamReadMessage::FromClient req; + Ydb::Topic::StreamReadMessage::FromServer resp; + + auto topicReadSettings = req.mutable_init_request()->add_topics_read_settings(); + topicReadSettings->set_path("rt3.dc1--topic2"); + for (ui32 i = 0; i < partititonsCount; i++) { + topicReadSettings->add_partition_ids(i); + } + + req.mutable_init_request()->set_consumer(""); + + if (!readStream->Write(req)) { + ythrow yexception() << "write fail"; + } + + UNIT_ASSERT(readStream->Read(&resp)); + UNIT_ASSERT(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kInitResponse); + } + ui32 partitionsSigned = 0; + + while (partitionsSigned != partititonsCount) { + + Ydb::Topic::StreamReadMessage::FromServer resp; + UNIT_ASSERT(readStream->Read(&resp)); + UNIT_ASSERT_C(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kStartPartitionSessionRequest, resp); + auto assignId = resp.start_partition_session_request().partition_session().partition_session_id(); + + Ydb::Topic::StreamReadMessage::FromClient req; + req.mutable_start_partition_session_response()->set_partition_session_id(assignId); + req.mutable_start_partition_session_response()->set_read_offset(0); + auto res = readStream->Write(req); + UNIT_ASSERT(res); + partitionsSigned++; + } + ui32 offset = 0; + ui32 session = 0; + + Ydb::Topic::StreamReadMessage::FromClient req; + req.mutable_read_request()->set_bytes_size(1); + readStream->Write(req); + + Ydb::Topic::StreamReadMessage::FromServer resp; + UNIT_ASSERT(readStream->Read(&resp)); + UNIT_ASSERT_C(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kReadResponse, resp); + Cerr << "\n" << "Bytes readed: " << resp.read_response().bytes_size() << "\n"; + for (int j = 0; j < resp.read_response().partition_data_size(); j++) { + for (int k = 0; k < resp.read_response().partition_data(j).batches_size(); k++) { + for (int l = 0; l < resp.read_response().partition_data(j).batches(k).message_data_size(); l++) { + offset = resp.read_response().partition_data(j).batches(k).message_data(l).offset(); + session = resp.read_response().partition_data(j).partition_session_id(); + Cerr << "\n" << "Offset: " << offset << " from session " << session << "\n"; + } + } + } + + } + Y_UNIT_TEST(ReadWithoutConsumer) { auto readToEndThenCommit = [] (NPersQueue::TTestServer& server, ui32 partitions, ui32 maxOffset, TString consumer, ui32 readByBytes) { std::shared_ptr Channel_; From 9e1fa47a744accc24b748227a8b67c889e01ede3 Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Thu, 12 Sep 2024 10:17:19 +0000 Subject: [PATCH 2/2] Fix --- ydb/services/persqueue_v1/persqueue_ut.cpp | 32 ++++++++-------------- 1 file changed, 11 insertions(+), 21 deletions(-) diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 6ba7ed425ace..05dbbdbc47dd 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -712,7 +712,6 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { UNIT_ASSERT_C(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kReadResponse, resp); } - Y_UNIT_TEST(UpdatePartitionLocation) { TPersQueueV1TestServer server; SET_LOCALS; @@ -6661,7 +6660,6 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}}; Y_UNIT_TEST(PartitionsMapping) { NPersQueue::TTestServer server; - TString topic = "topic1"; TString topicFullName = "rt3.dc1--" + topic; @@ -7029,13 +7027,18 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}}; Y_UNIT_TEST(ReadWithoutConsumerFederation) { const ui32 partititonsCount = 5; + const auto topic = "rt3.dc1--topic2"; TPersQueueV1TestServer server; - server.Server->AnnoyingClient->CreateTopic("rt3.dc1--topic2", partititonsCount); + server.Server->AnnoyingClient->CreateTopic(topic, partititonsCount); + + NACLib::TDiffACL acl; + acl.AddAccess(NACLib::EAccessType::Allow, NACLib::GenericFull, "user@" BUILTIN_ACL_DOMAIN); + server.Server->AnnoyingClient->ModifyACL("/Root/PQ", topic, acl.SerializeAsString()); auto writeSettings = NYdb::NPersQueue::TWriteSessionSettings() - .Path("rt3.dc1--topic2") - .MessageGroupId("src_id"); + .Path(topic) + .MessageGroupId("src_id"); auto writer = server.PersQueueClient->CreateSimpleBlockingWriteSession(writeSettings); @@ -7050,15 +7053,15 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}}; StubP_ = Ydb::Topic::V1::TopicService::NewStub(Channel_); grpc::ClientContext rcontext; + rcontext.AddMetadata("x-ydb-auth-ticket", "user@" BUILTIN_ACL_DOMAIN); auto readStream = StubP_->StreamRead(&rcontext); UNIT_ASSERT(readStream); { Ydb::Topic::StreamReadMessage::FromClient req; Ydb::Topic::StreamReadMessage::FromServer resp; - auto topicReadSettings = req.mutable_init_request()->add_topics_read_settings(); - topicReadSettings->set_path("rt3.dc1--topic2"); + topicReadSettings->set_path(topic); for (ui32 i = 0; i < partititonsCount; i++) { topicReadSettings->add_partition_ids(i); } @@ -7088,8 +7091,6 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}}; UNIT_ASSERT(res); partitionsSigned++; } - ui32 offset = 0; - ui32 session = 0; Ydb::Topic::StreamReadMessage::FromClient req; req.mutable_read_request()->set_bytes_size(1); @@ -7098,20 +7099,9 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}}; Ydb::Topic::StreamReadMessage::FromServer resp; UNIT_ASSERT(readStream->Read(&resp)); UNIT_ASSERT_C(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kReadResponse, resp); - Cerr << "\n" << "Bytes readed: " << resp.read_response().bytes_size() << "\n"; - for (int j = 0; j < resp.read_response().partition_data_size(); j++) { - for (int k = 0; k < resp.read_response().partition_data(j).batches_size(); k++) { - for (int l = 0; l < resp.read_response().partition_data(j).batches(k).message_data_size(); l++) { - offset = resp.read_response().partition_data(j).batches(k).message_data(l).offset(); - session = resp.read_response().partition_data(j).partition_session_id(); - Cerr << "\n" << "Offset: " << offset << " from session " << session << "\n"; - } - } - } - } - Y_UNIT_TEST(ReadWithoutConsumer) { + Y_UNIT_TEST(ReadWithoutConsumerFirstClassCitizen) { auto readToEndThenCommit = [] (NPersQueue::TTestServer& server, ui32 partitions, ui32 maxOffset, TString consumer, ui32 readByBytes) { std::shared_ptr Channel_; std::unique_ptr StubP_;