Skip to content

LOGBROKER 8891 list dead letter source queues #8330

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/grpc_services/service_ymq.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,6 @@ void DoYmqSetQueueAttributesRequest(std::unique_ptr<IRequestOpCtx> p, const IFac
void DoYmqSendMessageBatchRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
void DoYmqDeleteMessageBatchRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
void DoYmqChangeMessageVisibilityBatchRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
void DoYmqListDeadLetterSourceQueuesRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
}
}
3 changes: 3 additions & 0 deletions ydb/core/http_proxy/http_req.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,8 @@ namespace NKikimr::NHttpProxy {
action = NSQS::EAction::DeleteMessageBatch;
} else if (Method == "ChangeMessageVisibilityBatch") {
action = NSQS::EAction::ChangeMessageVisibilityBatch;
} else if (Method == "ListDeadLetterSourceQueues") {
action = NSQS::EAction::ListDeadLetterSourceQueues;
}

requestHolder->SetRequestId(HttpContext.RequestId);
Expand Down Expand Up @@ -1075,6 +1077,7 @@ namespace NKikimr::NHttpProxy {
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(SendMessageBatch);
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(DeleteMessageBatch);
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(ChangeMessageVisibilityBatch);
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(ListDeadLetterSourceQueues);
#undef DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN
}

Expand Down
1 change: 1 addition & 0 deletions ydb/core/http_proxy/ut/datastreams_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {

appConfig.MutableSqsConfig()->SetEnableSqs(true);
appConfig.MutableSqsConfig()->SetYandexCloudMode(true);
appConfig.MutableSqsConfig()->SetEnableDeadLetterQueues(true);

auto limit = appConfig.MutablePQConfig()->AddValidRetentionLimits();
limit->SetMinPeriodSeconds(0);
Expand Down
46 changes: 46 additions & 0 deletions ydb/core/http_proxy/ut/http_proxy_ut.h
Original file line number Diff line number Diff line change
Expand Up @@ -1996,4 +1996,50 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) {

}

Y_UNIT_TEST_F(TestListDeadLetterSourceQueues, THttpProxyTestMock) {
auto createQueueReq = CreateSqsCreateQueueRequest();
auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", std::move(createQueueReq), FormAuthorizationStr("ru-central1"));
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
NJson::TJsonValue json;
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));

TString resultQueueUrl = GetByPath<TString>(json, "QueueUrl");

auto createDlqReq = CreateSqsCreateQueueRequest();
createQueueReq["QueueName"] = "DlqName";
res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", std::move(createQueueReq), FormAuthorizationStr("ru-central1"));
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));

TString dlqUrl = GetByPath<TString>(json, "QueueUrl");

NJson::TJsonValue getQueueAttributes;
getQueueAttributes["QueueUrl"] = dlqUrl;
NJson::TJsonArray attributeNames = {"QueueArn"};
getQueueAttributes["AttributeNames"] = attributeNames;
res = SendHttpRequest("/Root", "AmazonSQS.GetQueueAttributes", std::move(getQueueAttributes), FormAuthorizationStr("ru-central1"));
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));

TString dlqArn = GetByPath<TString>(json["Attributes"], "QueueArn");

NJson::TJsonValue setQueueAttributes;
setQueueAttributes["QueueUrl"] = resultQueueUrl;
NJson::TJsonValue attributes = {};
auto redrivePolicy = TStringBuilder()
<< "{\"deadLetterTargetArn\" : \"" << dlqArn << "\", \"maxReceiveCount\" : 100}";
attributes["RedrivePolicy"] = redrivePolicy;
setQueueAttributes["Attributes"] = attributes;

res = SendHttpRequest("/Root", "AmazonSQS.SetQueueAttributes", std::move(setQueueAttributes), FormAuthorizationStr("ru-central1"));
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);

NJson::TJsonValue listDeadLetterSourceQueues;
listDeadLetterSourceQueues["QueueUrl"] = dlqUrl;
res = SendHttpRequest("/Root", "AmazonSQS.ListDeadLetterSourceQueues", std::move(listDeadLetterSourceQueues), FormAuthorizationStr("ru-central1"));
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
UNIT_ASSERT_VALUES_EQUAL(json["QueueUrls"][0], resultQueueUrl);
}

} // Y_UNIT_TEST_SUITE(TestHttpProxy)
1 change: 1 addition & 0 deletions ydb/public/api/grpc/draft/ydb_ymq_v1.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ service YmqService {
rpc SendMessageBatch(SendMessageBatchRequest) returns (SendMessageBatchResponse);
rpc DeleteMessageBatch(DeleteMessageBatchRequest) returns (DeleteMessageBatchResponse);
rpc ChangeMessageVisibilityBatch(ChangeMessageVisibilityBatchRequest) returns (ChangeMessageVisibilityBatchResponse);
rpc ListDeadLetterSourceQueues(ListDeadLetterSourceQueuesRequest) returns (ListDeadLetterSourceQueuesResponse);
}
1 change: 1 addition & 0 deletions ydb/services/ymq/grpc_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ void TGRpcYmqService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger)
ADD_REQUEST(SendMessageBatch, DoYmqSendMessageBatchRequest, nullptr, Off)
ADD_REQUEST(DeleteMessageBatch, DoYmqDeleteMessageBatchRequest, nullptr, Off)
ADD_REQUEST(ChangeMessageVisibilityBatch, DoYmqChangeMessageVisibilityBatchRequest, nullptr, Off)
ADD_REQUEST(ListDeadLetterSourceQueues, DoYmqListDeadLetterSourceQueuesRequest, nullptr, Off)

#undef ADD_REQUEST
}
Expand Down
36 changes: 36 additions & 0 deletions ydb/services/ymq/ymq_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -730,9 +730,44 @@ namespace NKikimr::NYmq::V1 {
private:
NKikimr::NSQS::TSetQueueAttributesRequest* GetRequest(THolder<TSqsRequest>& requestHolder) override {
auto result = requestHolder->MutableSetQueueAttributes();
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
for (auto& [name, value]: GetProtoRequest()->Getattributes()) {
AddAttribute(requestHolder, name, value);
}
return result;
}
};

class TListDeadLetterSourceQueuesReplyCallback : public TReplyCallback<
NKikimr::NSQS::TListDeadLetterSourceQueuesResponse,
Ydb::Ymq::V1::ListDeadLetterSourceQueuesResult> {
public:
using TReplyCallback::TReplyCallback;

private:
const NKikimr::NSQS::TListDeadLetterSourceQueuesResponse& GetResponse(const NKikimrClient::TSqsResponse& resp) override {
return resp.GetListDeadLetterSourceQueues();
}

Ydb::Ymq::V1::ListDeadLetterSourceQueuesResult GetResult(const NKikimrClient::TSqsResponse& response) override {
Ydb::Ymq::V1::ListDeadLetterSourceQueuesResult result;
for (const auto& queue : response.GetListDeadLetterSourceQueues().GetQueues()) {
result.Mutablequeue_urls()->Add()->assign(queue.GetQueueUrl());
}
return result;
}
};

class TListDeadLetterSourceQueuesActor : public TRpcRequestActor<
TEvYmqListDeadLetterSourceQueuesRequest,
NKikimr::NSQS::TListDeadLetterSourceQueuesRequest,
TListDeadLetterSourceQueuesReplyCallback> {
public:
using TRpcRequestActor::TRpcRequestActor;

private:
NKikimr::NSQS::TListDeadLetterSourceQueuesRequest* GetRequest(THolder<TSqsRequest>& requestHolder) override {
auto result = requestHolder->MutableListDeadLetterSourceQueues();
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
return result;
}
Expand Down Expand Up @@ -933,5 +968,6 @@ DECLARE_RPC(SetQueueAttributes);
DECLARE_RPC(SendMessageBatch);
DECLARE_RPC(DeleteMessageBatch);
DECLARE_RPC(ChangeMessageVisibilityBatch);
DECLARE_RPC(ListDeadLetterSourceQueues);

}
1 change: 1 addition & 0 deletions ydb/services/ymq/ymq_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ using TEvYmqSetQueueAttributesRequest = TGrpcRequestOperationCall<Ydb::Ymq::V1::
using TEvYmqSendMessageBatchRequest = TGrpcRequestOperationCall<Ydb::Ymq::V1::SendMessageBatchRequest, Ydb::Ymq::V1::SendMessageBatchResponse>;
using TEvYmqDeleteMessageBatchRequest = TGrpcRequestOperationCall<Ydb::Ymq::V1::DeleteMessageBatchRequest, Ydb::Ymq::V1::DeleteMessageBatchResponse>;
using TEvYmqChangeMessageVisibilityBatchRequest = TGrpcRequestOperationCall<Ydb::Ymq::V1::ChangeMessageVisibilityBatchRequest, Ydb::Ymq::V1::ChangeMessageVisibilityBatchResponse>;
using TEvYmqListDeadLetterSourceQueuesRequest = TGrpcRequestOperationCall<Ydb::Ymq::V1::ListDeadLetterSourceQueuesRequest, Ydb::Ymq::V1::ListDeadLetterSourceQueuesResponse>;

}
}
Loading