Skip to content

Commit e8c9cca

Browse files
authored
Fix reading without consumer from federation (#9136)
1 parent cf0fbac commit e8c9cca

File tree

2 files changed

+78
-4
lines changed

2 files changed

+78
-4
lines changed

ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ void TReadInitAndAuthActor::HandleTopicsDescribeResponse(TEvDescribeTopicsRespon
180180
}
181181

182182
// ToDo[migration] - separate option - ?
183-
bool doCheckClientAcl = DoCheckACL && !AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen();
183+
bool doCheckClientAcl = DoCheckACL && !AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen() && !SkipReadRuleCheck;
184184
if (doCheckClientAcl) {
185185
CheckClientACL(ctx);
186186
} else {

ydb/services/persqueue_v1/persqueue_ut.cpp

Lines changed: 77 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -712,7 +712,6 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
712712
UNIT_ASSERT_C(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kReadResponse, resp);
713713
}
714714

715-
716715
Y_UNIT_TEST(UpdatePartitionLocation) {
717716
TPersQueueV1TestServer server;
718717
SET_LOCALS;
@@ -6661,7 +6660,6 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}};
66616660

66626661
Y_UNIT_TEST(PartitionsMapping) {
66636662
NPersQueue::TTestServer server;
6664-
66656663
TString topic = "topic1";
66666664
TString topicFullName = "rt3.dc1--" + topic;
66676665

@@ -7027,7 +7025,83 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}};
70277025
driver->Stop();
70287026
}
70297027

7030-
Y_UNIT_TEST(ReadWithoutConsumer) {
7028+
Y_UNIT_TEST(ReadWithoutConsumerFederation) {
7029+
const ui32 partititonsCount = 5;
7030+
const auto topic = "rt3.dc1--topic2";
7031+
7032+
TPersQueueV1TestServer server;
7033+
server.Server->AnnoyingClient->CreateTopic(topic, partititonsCount);
7034+
7035+
NACLib::TDiffACL acl;
7036+
acl.AddAccess(NACLib::EAccessType::Allow, NACLib::GenericFull, "user@" BUILTIN_ACL_DOMAIN);
7037+
server.Server->AnnoyingClient->ModifyACL("/Root/PQ", topic, acl.SerializeAsString());
7038+
7039+
auto writeSettings = NYdb::NPersQueue::TWriteSessionSettings()
7040+
.Path(topic)
7041+
.MessageGroupId("src_id");
7042+
7043+
auto writer = server.PersQueueClient->CreateSimpleBlockingWriteSession(writeSettings);
7044+
7045+
auto res = writer->Write("some_data");
7046+
UNIT_ASSERT(res);
7047+
writer->Close();
7048+
7049+
std::shared_ptr<grpc::Channel> Channel_;
7050+
std::unique_ptr<Ydb::Topic::V1::TopicService::Stub> StubP_;
7051+
7052+
Channel_ = grpc::CreateChannel("localhost:" + ToString(server.Server->GrpcPort), grpc::InsecureChannelCredentials());
7053+
StubP_ = Ydb::Topic::V1::TopicService::NewStub(Channel_);
7054+
7055+
grpc::ClientContext rcontext;
7056+
rcontext.AddMetadata("x-ydb-auth-ticket", "user@" BUILTIN_ACL_DOMAIN);
7057+
auto readStream = StubP_->StreamRead(&rcontext);
7058+
UNIT_ASSERT(readStream);
7059+
7060+
{
7061+
Ydb::Topic::StreamReadMessage::FromClient req;
7062+
Ydb::Topic::StreamReadMessage::FromServer resp;
7063+
auto topicReadSettings = req.mutable_init_request()->add_topics_read_settings();
7064+
topicReadSettings->set_path(topic);
7065+
for (ui32 i = 0; i < partititonsCount; i++) {
7066+
topicReadSettings->add_partition_ids(i);
7067+
}
7068+
7069+
req.mutable_init_request()->set_consumer("");
7070+
7071+
if (!readStream->Write(req)) {
7072+
ythrow yexception() << "write fail";
7073+
}
7074+
7075+
UNIT_ASSERT(readStream->Read(&resp));
7076+
UNIT_ASSERT(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kInitResponse);
7077+
}
7078+
ui32 partitionsSigned = 0;
7079+
7080+
while (partitionsSigned != partititonsCount) {
7081+
7082+
Ydb::Topic::StreamReadMessage::FromServer resp;
7083+
UNIT_ASSERT(readStream->Read(&resp));
7084+
UNIT_ASSERT_C(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kStartPartitionSessionRequest, resp);
7085+
auto assignId = resp.start_partition_session_request().partition_session().partition_session_id();
7086+
7087+
Ydb::Topic::StreamReadMessage::FromClient req;
7088+
req.mutable_start_partition_session_response()->set_partition_session_id(assignId);
7089+
req.mutable_start_partition_session_response()->set_read_offset(0);
7090+
auto res = readStream->Write(req);
7091+
UNIT_ASSERT(res);
7092+
partitionsSigned++;
7093+
}
7094+
7095+
Ydb::Topic::StreamReadMessage::FromClient req;
7096+
req.mutable_read_request()->set_bytes_size(1);
7097+
readStream->Write(req);
7098+
7099+
Ydb::Topic::StreamReadMessage::FromServer resp;
7100+
UNIT_ASSERT(readStream->Read(&resp));
7101+
UNIT_ASSERT_C(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kReadResponse, resp);
7102+
}
7103+
7104+
Y_UNIT_TEST(ReadWithoutConsumerFirstClassCitizen) {
70317105
auto readToEndThenCommit = [] (NPersQueue::TTestServer& server, ui32 partitions, ui32 maxOffset, TString consumer, ui32 readByBytes) {
70327106
std::shared_ptr<grpc::Channel> Channel_;
70337107
std::unique_ptr<Ydb::Topic::V1::TopicService::Stub> StubP_;

0 commit comments

Comments
 (0)