diff --git a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp index 5ded05725522..b8aed7e78868 100644 --- a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp +++ b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp @@ -180,7 +180,7 @@ void TReadInitAndAuthActor::HandleTopicsDescribeResponse(TEvDescribeTopicsRespon } // ToDo[migration] - separate option - ? - bool doCheckClientAcl = DoCheckACL && !AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen(); + bool doCheckClientAcl = DoCheckACL && !AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen() && !SkipReadRuleCheck; if (doCheckClientAcl) { CheckClientACL(ctx); } else { diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 4966ba9330a3..05dbbdbc47dd 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -712,7 +712,6 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { UNIT_ASSERT_C(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kReadResponse, resp); } - Y_UNIT_TEST(UpdatePartitionLocation) { TPersQueueV1TestServer server; SET_LOCALS; @@ -6661,7 +6660,6 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}}; Y_UNIT_TEST(PartitionsMapping) { NPersQueue::TTestServer server; - TString topic = "topic1"; TString topicFullName = "rt3.dc1--" + topic; @@ -7027,7 +7025,83 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}}; driver->Stop(); } - Y_UNIT_TEST(ReadWithoutConsumer) { + Y_UNIT_TEST(ReadWithoutConsumerFederation) { + const ui32 partititonsCount = 5; + const auto topic = "rt3.dc1--topic2"; + + TPersQueueV1TestServer server; + server.Server->AnnoyingClient->CreateTopic(topic, partititonsCount); + + NACLib::TDiffACL acl; + acl.AddAccess(NACLib::EAccessType::Allow, NACLib::GenericFull, "user@" BUILTIN_ACL_DOMAIN); + server.Server->AnnoyingClient->ModifyACL("/Root/PQ", topic, acl.SerializeAsString()); + + auto writeSettings = NYdb::NPersQueue::TWriteSessionSettings() + .Path(topic) + .MessageGroupId("src_id"); + + auto writer = server.PersQueueClient->CreateSimpleBlockingWriteSession(writeSettings); + + auto res = writer->Write("some_data"); + UNIT_ASSERT(res); + writer->Close(); + + std::shared_ptr Channel_; + std::unique_ptr StubP_; + + Channel_ = grpc::CreateChannel("localhost:" + ToString(server.Server->GrpcPort), grpc::InsecureChannelCredentials()); + StubP_ = Ydb::Topic::V1::TopicService::NewStub(Channel_); + + grpc::ClientContext rcontext; + rcontext.AddMetadata("x-ydb-auth-ticket", "user@" BUILTIN_ACL_DOMAIN); + auto readStream = StubP_->StreamRead(&rcontext); + UNIT_ASSERT(readStream); + + { + Ydb::Topic::StreamReadMessage::FromClient req; + Ydb::Topic::StreamReadMessage::FromServer resp; + auto topicReadSettings = req.mutable_init_request()->add_topics_read_settings(); + topicReadSettings->set_path(topic); + for (ui32 i = 0; i < partititonsCount; i++) { + topicReadSettings->add_partition_ids(i); + } + + req.mutable_init_request()->set_consumer(""); + + if (!readStream->Write(req)) { + ythrow yexception() << "write fail"; + } + + UNIT_ASSERT(readStream->Read(&resp)); + UNIT_ASSERT(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kInitResponse); + } + ui32 partitionsSigned = 0; + + while (partitionsSigned != partititonsCount) { + + Ydb::Topic::StreamReadMessage::FromServer resp; + UNIT_ASSERT(readStream->Read(&resp)); + UNIT_ASSERT_C(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kStartPartitionSessionRequest, resp); + auto assignId = resp.start_partition_session_request().partition_session().partition_session_id(); + + Ydb::Topic::StreamReadMessage::FromClient req; + req.mutable_start_partition_session_response()->set_partition_session_id(assignId); + req.mutable_start_partition_session_response()->set_read_offset(0); + auto res = readStream->Write(req); + UNIT_ASSERT(res); + partitionsSigned++; + } + + Ydb::Topic::StreamReadMessage::FromClient req; + req.mutable_read_request()->set_bytes_size(1); + readStream->Write(req); + + Ydb::Topic::StreamReadMessage::FromServer resp; + UNIT_ASSERT(readStream->Read(&resp)); + UNIT_ASSERT_C(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kReadResponse, resp); + } + + Y_UNIT_TEST(ReadWithoutConsumerFirstClassCitizen) { auto readToEndThenCommit = [] (NPersQueue::TTestServer& server, ui32 partitions, ui32 maxOffset, TString consumer, ui32 readByBytes) { std::shared_ptr Channel_; std::unique_ptr StubP_;