diff --git a/ydb/core/grpc_services/service_ymq.h b/ydb/core/grpc_services/service_ymq.h index 4d35c1b49db8..6ccf8137df2f 100644 --- a/ydb/core/grpc_services/service_ymq.h +++ b/ydb/core/grpc_services/service_ymq.h @@ -25,5 +25,6 @@ void DoYmqSetQueueAttributesRequest(std::unique_ptr p, const IFac void DoYmqSendMessageBatchRequest(std::unique_ptr p, const IFacilityProvider& f); void DoYmqDeleteMessageBatchRequest(std::unique_ptr p, const IFacilityProvider& f); void DoYmqChangeMessageVisibilityBatchRequest(std::unique_ptr p, const IFacilityProvider& f); +void DoYmqListDeadLetterSourceQueuesRequest(std::unique_ptr p, const IFacilityProvider& f); } } diff --git a/ydb/core/http_proxy/http_req.cpp b/ydb/core/http_proxy/http_req.cpp index dee6f7177be0..6894ec5af22a 100644 --- a/ydb/core/http_proxy/http_req.cpp +++ b/ydb/core/http_proxy/http_req.cpp @@ -536,6 +536,8 @@ namespace NKikimr::NHttpProxy { action = NSQS::EAction::DeleteMessageBatch; } else if (Method == "ChangeMessageVisibilityBatch") { action = NSQS::EAction::ChangeMessageVisibilityBatch; + } else if (Method == "ListDeadLetterSourceQueues") { + action = NSQS::EAction::ListDeadLetterSourceQueues; } requestHolder->SetRequestId(HttpContext.RequestId); @@ -1075,6 +1077,7 @@ namespace NKikimr::NHttpProxy { DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(SendMessageBatch); DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(DeleteMessageBatch); DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(ChangeMessageVisibilityBatch); + DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(ListDeadLetterSourceQueues); #undef DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN } diff --git a/ydb/core/http_proxy/ut/datastreams_fixture.h b/ydb/core/http_proxy/ut/datastreams_fixture.h index bbf982488c42..5a4c98fcd68e 100644 --- a/ydb/core/http_proxy/ut/datastreams_fixture.h +++ b/ydb/core/http_proxy/ut/datastreams_fixture.h @@ -376,6 +376,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture { appConfig.MutableSqsConfig()->SetEnableSqs(true); appConfig.MutableSqsConfig()->SetYandexCloudMode(true); + appConfig.MutableSqsConfig()->SetEnableDeadLetterQueues(true); auto limit = appConfig.MutablePQConfig()->AddValidRetentionLimits(); limit->SetMinPeriodSeconds(0); diff --git a/ydb/core/http_proxy/ut/http_proxy_ut.h b/ydb/core/http_proxy/ut/http_proxy_ut.h index 8b43a0f98969..0ab8b81d9200 100644 --- a/ydb/core/http_proxy/ut/http_proxy_ut.h +++ b/ydb/core/http_proxy/ut/http_proxy_ut.h @@ -1996,4 +1996,50 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) { } + Y_UNIT_TEST_F(TestListDeadLetterSourceQueues, THttpProxyTestMock) { + auto createQueueReq = CreateSqsCreateQueueRequest(); + auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", std::move(createQueueReq), FormAuthorizationStr("ru-central1")); + UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200); + NJson::TJsonValue json; + UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json)); + + TString resultQueueUrl = GetByPath(json, "QueueUrl"); + + auto createDlqReq = CreateSqsCreateQueueRequest(); + createQueueReq["QueueName"] = "DlqName"; + res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", std::move(createQueueReq), FormAuthorizationStr("ru-central1")); + UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200); + UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json)); + + TString dlqUrl = GetByPath(json, "QueueUrl"); + + NJson::TJsonValue getQueueAttributes; + getQueueAttributes["QueueUrl"] = dlqUrl; + NJson::TJsonArray attributeNames = {"QueueArn"}; + getQueueAttributes["AttributeNames"] = attributeNames; + res = SendHttpRequest("/Root", "AmazonSQS.GetQueueAttributes", std::move(getQueueAttributes), FormAuthorizationStr("ru-central1")); + UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200); + UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json)); + + TString dlqArn = GetByPath(json["Attributes"], "QueueArn"); + + NJson::TJsonValue setQueueAttributes; + setQueueAttributes["QueueUrl"] = resultQueueUrl; + NJson::TJsonValue attributes = {}; + auto redrivePolicy = TStringBuilder() + << "{\"deadLetterTargetArn\" : \"" << dlqArn << "\", \"maxReceiveCount\" : 100}"; + attributes["RedrivePolicy"] = redrivePolicy; + setQueueAttributes["Attributes"] = attributes; + + res = SendHttpRequest("/Root", "AmazonSQS.SetQueueAttributes", std::move(setQueueAttributes), FormAuthorizationStr("ru-central1")); + UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200); + + NJson::TJsonValue listDeadLetterSourceQueues; + listDeadLetterSourceQueues["QueueUrl"] = dlqUrl; + res = SendHttpRequest("/Root", "AmazonSQS.ListDeadLetterSourceQueues", std::move(listDeadLetterSourceQueues), FormAuthorizationStr("ru-central1")); + UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200); + UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json)); + UNIT_ASSERT_VALUES_EQUAL(json["QueueUrls"][0], resultQueueUrl); + } + } // Y_UNIT_TEST_SUITE(TestHttpProxy) diff --git a/ydb/public/api/grpc/draft/ydb_ymq_v1.proto b/ydb/public/api/grpc/draft/ydb_ymq_v1.proto index 58f104289f33..6586bd54f4ac 100644 --- a/ydb/public/api/grpc/draft/ydb_ymq_v1.proto +++ b/ydb/public/api/grpc/draft/ydb_ymq_v1.proto @@ -24,4 +24,5 @@ service YmqService { rpc SendMessageBatch(SendMessageBatchRequest) returns (SendMessageBatchResponse); rpc DeleteMessageBatch(DeleteMessageBatchRequest) returns (DeleteMessageBatchResponse); rpc ChangeMessageVisibilityBatch(ChangeMessageVisibilityBatchRequest) returns (ChangeMessageVisibilityBatchResponse); + rpc ListDeadLetterSourceQueues(ListDeadLetterSourceQueuesRequest) returns (ListDeadLetterSourceQueuesResponse); } diff --git a/ydb/services/ymq/grpc_service.cpp b/ydb/services/ymq/grpc_service.cpp index 20d891253dae..daae514b2813 100644 --- a/ydb/services/ymq/grpc_service.cpp +++ b/ydb/services/ymq/grpc_service.cpp @@ -46,6 +46,7 @@ void TGRpcYmqService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) ADD_REQUEST(SendMessageBatch, DoYmqSendMessageBatchRequest, nullptr, Off) ADD_REQUEST(DeleteMessageBatch, DoYmqDeleteMessageBatchRequest, nullptr, Off) ADD_REQUEST(ChangeMessageVisibilityBatch, DoYmqChangeMessageVisibilityBatchRequest, nullptr, Off) + ADD_REQUEST(ListDeadLetterSourceQueues, DoYmqListDeadLetterSourceQueuesRequest, nullptr, Off) #undef ADD_REQUEST } diff --git a/ydb/services/ymq/ymq_proxy.cpp b/ydb/services/ymq/ymq_proxy.cpp index 0453f6f5dfb0..c16bfdaf7c69 100644 --- a/ydb/services/ymq/ymq_proxy.cpp +++ b/ydb/services/ymq/ymq_proxy.cpp @@ -730,9 +730,44 @@ namespace NKikimr::NYmq::V1 { private: NKikimr::NSQS::TSetQueueAttributesRequest* GetRequest(THolder& requestHolder) override { auto result = requestHolder->MutableSetQueueAttributes(); + result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second); for (auto& [name, value]: GetProtoRequest()->Getattributes()) { AddAttribute(requestHolder, name, value); } + return result; + } + }; + + class TListDeadLetterSourceQueuesReplyCallback : public TReplyCallback< + NKikimr::NSQS::TListDeadLetterSourceQueuesResponse, + Ydb::Ymq::V1::ListDeadLetterSourceQueuesResult> { + public: + using TReplyCallback::TReplyCallback; + + private: + const NKikimr::NSQS::TListDeadLetterSourceQueuesResponse& GetResponse(const NKikimrClient::TSqsResponse& resp) override { + return resp.GetListDeadLetterSourceQueues(); + } + + Ydb::Ymq::V1::ListDeadLetterSourceQueuesResult GetResult(const NKikimrClient::TSqsResponse& response) override { + Ydb::Ymq::V1::ListDeadLetterSourceQueuesResult result; + for (const auto& queue : response.GetListDeadLetterSourceQueues().GetQueues()) { + result.Mutablequeue_urls()->Add()->assign(queue.GetQueueUrl()); + } + return result; + } + }; + + class TListDeadLetterSourceQueuesActor : public TRpcRequestActor< + TEvYmqListDeadLetterSourceQueuesRequest, + NKikimr::NSQS::TListDeadLetterSourceQueuesRequest, + TListDeadLetterSourceQueuesReplyCallback> { + public: + using TRpcRequestActor::TRpcRequestActor; + + private: + NKikimr::NSQS::TListDeadLetterSourceQueuesRequest* GetRequest(THolder& requestHolder) override { + auto result = requestHolder->MutableListDeadLetterSourceQueues(); result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second); return result; } @@ -933,5 +968,6 @@ DECLARE_RPC(SetQueueAttributes); DECLARE_RPC(SendMessageBatch); DECLARE_RPC(DeleteMessageBatch); DECLARE_RPC(ChangeMessageVisibilityBatch); +DECLARE_RPC(ListDeadLetterSourceQueues); } diff --git a/ydb/services/ymq/ymq_proxy.h b/ydb/services/ymq/ymq_proxy.h index 01636f81ce36..f7e44a8aa36c 100644 --- a/ydb/services/ymq/ymq_proxy.h +++ b/ydb/services/ymq/ymq_proxy.h @@ -27,6 +27,7 @@ using TEvYmqSetQueueAttributesRequest = TGrpcRequestOperationCall; using TEvYmqDeleteMessageBatchRequest = TGrpcRequestOperationCall; using TEvYmqChangeMessageVisibilityBatchRequest = TGrpcRequestOperationCall; +using TEvYmqListDeadLetterSourceQueuesRequest = TGrpcRequestOperationCall; } }