From e9b5b495be78a0597657b1615e1ef9d4dd734ad7 Mon Sep 17 00:00:00 2001 From: Sergey Veselov Date: Mon, 12 Aug 2024 12:38:48 +0000 Subject: [PATCH 1/4] SetAttributes request --- ydb/core/grpc_services/service_ymq.h | 1 + ydb/core/http_proxy/http_req.cpp | 3 ++ ydb/core/http_proxy/ut/http_proxy_ut.h | 34 ++++++++++++++++++ ydb/public/api/grpc/draft/ydb_ymq_v1.proto | 1 + ydb/public/api/protos/draft/ymq.proto | 13 +++++++ ydb/services/ymq/grpc_service.cpp | 1 + ydb/services/ymq/ymq_proxy.cpp | 42 ++++++++++++++++++++++ ydb/services/ymq/ymq_proxy.h | 1 + 8 files changed, 96 insertions(+) diff --git a/ydb/core/grpc_services/service_ymq.h b/ydb/core/grpc_services/service_ymq.h index 809ed01096fc..e660dede46bb 100644 --- a/ydb/core/grpc_services/service_ymq.h +++ b/ydb/core/grpc_services/service_ymq.h @@ -21,5 +21,6 @@ void DoYmqDeleteMessageRequest(std::unique_ptr p, const IFacility void DoYmqPurgeQueueRequest(std::unique_ptr p, const IFacilityProvider& f); void DoYmqDeleteQueueRequest(std::unique_ptr p, const IFacilityProvider& f); void DoYmqChangeMessageVisibilityRequest(std::unique_ptr p, const IFacilityProvider& f); +void DoYmqSetQueueAttributesRequest(std::unique_ptr p, const IFacilityProvider& f); } } diff --git a/ydb/core/http_proxy/http_req.cpp b/ydb/core/http_proxy/http_req.cpp index 931d6a2102c6..8a5989566bb6 100644 --- a/ydb/core/http_proxy/http_req.cpp +++ b/ydb/core/http_proxy/http_req.cpp @@ -528,6 +528,8 @@ namespace NKikimr::NHttpProxy { action = NSQS::EAction::DeleteQueue; } else if (Method == "ChangeMessageVisibility") { action = NSQS::EAction::ChangeMessageVisibility; + } else if (Method == "SetQueueAttributes") { + action = NSQS::EAction::SetQueueAttributes; } requestHolder->SetRequestId(HttpContext.RequestId); @@ -1063,6 +1065,7 @@ namespace NKikimr::NHttpProxy { DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(PurgeQueue); DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(DeleteQueue); DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(ChangeMessageVisibility); + DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(SetQueueAttributes); #undef DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN } diff --git a/ydb/core/http_proxy/ut/http_proxy_ut.h b/ydb/core/http_proxy/ut/http_proxy_ut.h index 5a4f0b11ebcc..176d5d7e2acd 100644 --- a/ydb/core/http_proxy/ut/http_proxy_ut.h +++ b/ydb/core/http_proxy/ut/http_proxy_ut.h @@ -1838,4 +1838,38 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) { UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json)); UNIT_ASSERT_VALUES_EQUAL(GetByPath(json, "__type"), "AWS.SimpleQueueService.NonExistentQueue"); } + + Y_UNIT_TEST_F(TestSetQueueAttributes, THttpProxyTestMock) { + auto createQueueReq = CreateSqsCreateQueueRequest(); + NJson::TJsonValue attributes; + attributes["DelaySeconds"] = "1"; + createQueueReq["Attributes"] = attributes; + + auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", std::move(createQueueReq), FormAuthorizationStr("ru-central1")); + UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200); + NJson::TJsonValue json; + UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json)); + + TString resultQueueUrl = GetByPath(json, "QueueUrl"); + + NJson::TJsonValue setQueueAttributes; + setQueueAttributes["QueueUrl"] = resultQueueUrl; + attributes = {}; + attributes["DelaySeconds"] = "2"; + setQueueAttributes["Attributes"] = attributes; + + res = SendHttpRequest("/Root", "AmazonSQS.SetQueueAttributes", std::move(setQueueAttributes), FormAuthorizationStr("ru-central1")); + UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200); + + NJson::TJsonValue getQueueAttributes; + getQueueAttributes["QueueUrl"] = resultQueueUrl; + NJson::TJsonArray attributeNames = {"DelaySeconds"}; + getQueueAttributes["AttributeNames"] = attributeNames; + + res = SendHttpRequest("/Root", "AmazonSQS.GetQueueAttributes", std::move(getQueueAttributes), FormAuthorizationStr("ru-central1")); + UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200); + NJson::TJsonValue resultJson; + UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &resultJson)); + UNIT_ASSERT_VALUES_EQUAL(resultJson["Attributes"]["DelaySeconds"], "2"); + } } // Y_UNIT_TEST_SUITE(TestHttpProxy) diff --git a/ydb/public/api/grpc/draft/ydb_ymq_v1.proto b/ydb/public/api/grpc/draft/ydb_ymq_v1.proto index 2c7a413590a8..1f1745306f9b 100644 --- a/ydb/public/api/grpc/draft/ydb_ymq_v1.proto +++ b/ydb/public/api/grpc/draft/ydb_ymq_v1.proto @@ -20,4 +20,5 @@ service YmqService { rpc PurgeQueue(PurgeQueueRequest) returns (PurgeQueueResponse); rpc DeleteQueue(DeleteQueueRequest) returns (DeleteQueueResponse); rpc ChangeMessageVisibility(ChangeMessageVisibilityRequest) returns (ChangeMessageVisibilityResponse); + rpc SetQueueAttributes(SetQueueAttributesRequest) returns (SetQueueAttributesResponse); } diff --git a/ydb/public/api/protos/draft/ymq.proto b/ydb/public/api/protos/draft/ymq.proto index 081a97c48b40..be5e53b91efc 100644 --- a/ydb/public/api/protos/draft/ymq.proto +++ b/ydb/public/api/protos/draft/ymq.proto @@ -256,3 +256,16 @@ message SendMessageBatchResult { repeated BatchResultErrorEntry failed = 1; repeated SendMessageBatchResultEntry successful = 2; } + +message SetQueueAttributesRequest { + Ydb.Operations.OperationParams operation_params = 1; + map attributes = 2; + string queue_url = 3; +} + +message SetQueueAttributesResponse { + Ydb.Operations.Operation operation = 1; +} + +message SetQueueAttributesResult { +} diff --git a/ydb/services/ymq/grpc_service.cpp b/ydb/services/ymq/grpc_service.cpp index 570d44cab1da..a3f747f1e356 100644 --- a/ydb/services/ymq/grpc_service.cpp +++ b/ydb/services/ymq/grpc_service.cpp @@ -42,6 +42,7 @@ void TGRpcYmqService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) ADD_REQUEST(PurgeQueue, DoYmqPurgeQueueRequest, nullptr, Off) ADD_REQUEST(DeleteQueue, DoYmqDeleteQueueRequest, nullptr, Off) ADD_REQUEST(ChangeMessageVisibility, DoYmqChangeMessageVisibilityRequest, nullptr, Off) + ADD_REQUEST(SetQueueAttributes, DoYmqSetQueueAttributesRequest, nullptr, Off) #undef ADD_REQUEST } diff --git a/ydb/services/ymq/ymq_proxy.cpp b/ydb/services/ymq/ymq_proxy.cpp index 095ec8dec5f1..20bca5c6ca19 100644 --- a/ydb/services/ymq/ymq_proxy.cpp +++ b/ydb/services/ymq/ymq_proxy.cpp @@ -694,6 +694,47 @@ namespace NKikimr::NYmq::V1 { return result; } }; + + class TSetQueueAttributesReplyCallback : public TReplyCallback< + NKikimr::NSQS::TSetQueueAttributesResponse, + Ydb::Ymq::V1::SetQueueAttributesResult> { + public: + using TReplyCallback::TReplyCallback; + + private: + const NKikimr::NSQS::TSetQueueAttributesResponse& GetResponse(const NKikimrClient::TSqsResponse& resp) override { + return resp.GetSetQueueAttributes(); + } + + Ydb::Ymq::V1::SetQueueAttributesResult GetResult(const NKikimrClient::TSqsResponse&) override { + Ydb::Ymq::V1::SetQueueAttributesResult result; + return result; + } + }; + + void AddAttribute(THolder& requestHolder, const TString& name, TString value) { + auto attribute = requestHolder->MutableSetQueueAttributes()->MutableAttributes()->Add(); + attribute->SetName(name); + attribute->SetValue(value); + }; + + class TSetQueueAttributesActor : public TRpcRequestActor< + TEvYmqSetQueueAttributesRequest, + NKikimr::NSQS::TSetQueueAttributesRequest, + TSetQueueAttributesReplyCallback> { + public: + using TRpcRequestActor::TRpcRequestActor; + + private: + NKikimr::NSQS::TSetQueueAttributesRequest* GetRequest(THolder& requestHolder) override { + auto result = requestHolder->MutableSetQueueAttributes(); + for (auto& [name, value]: GetProtoRequest()->Getattributes()) { + AddAttribute(requestHolder, name, value); + } + result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second); + return result; + } + }; } namespace NKikimr::NGRpcService { @@ -717,5 +758,6 @@ DECLARE_RPC(DeleteMessage); DECLARE_RPC(PurgeQueue); DECLARE_RPC(DeleteQueue); DECLARE_RPC(ChangeMessageVisibility); +DECLARE_RPC(SetQueueAttributes); } diff --git a/ydb/services/ymq/ymq_proxy.h b/ydb/services/ymq/ymq_proxy.h index 18a12fdfe6e5..4e324424c209 100644 --- a/ydb/services/ymq/ymq_proxy.h +++ b/ydb/services/ymq/ymq_proxy.h @@ -23,6 +23,7 @@ using TEvYmqDeleteMessageRequest = TGrpcRequestOperationCall; using TEvYmqDeleteQueueRequest = TGrpcRequestOperationCall; using TEvYmqChangeMessageVisibilityRequest = TGrpcRequestOperationCall; +using TEvYmqSetQueueAttributesRequest = TGrpcRequestOperationCall; } } From 9f97eb5bdec3711e2c53c503538cb385fda811e6 Mon Sep 17 00:00:00 2001 From: Sergey Veselov Date: Wed, 21 Aug 2024 08:41:23 +0000 Subject: [PATCH 2/4] Add SendMessageBatch request to SQS Json API --- ydb/core/grpc_services/service_ymq.h | 1 + ydb/core/http_proxy/http_req.cpp | 3 + ydb/core/http_proxy/ut/http_proxy_ut.h | 36 +++++++++++- ydb/library/http_proxy/error/error.cpp | 8 +++ ydb/library/http_proxy/error/error.h | 1 + ydb/public/api/grpc/draft/ydb_ymq_v1.proto | 1 + ydb/public/api/protos/draft/ymq.proto | 57 +++++++++++++----- ydb/services/ymq/grpc_service.cpp | 1 + ydb/services/ymq/ymq_proxy.cpp | 67 ++++++++++++++++++++++ ydb/services/ymq/ymq_proxy.h | 1 + 10 files changed, 161 insertions(+), 15 deletions(-) diff --git a/ydb/core/grpc_services/service_ymq.h b/ydb/core/grpc_services/service_ymq.h index e660dede46bb..11cb208654b7 100644 --- a/ydb/core/grpc_services/service_ymq.h +++ b/ydb/core/grpc_services/service_ymq.h @@ -22,5 +22,6 @@ void DoYmqPurgeQueueRequest(std::unique_ptr p, const IFacilityPro void DoYmqDeleteQueueRequest(std::unique_ptr p, const IFacilityProvider& f); void DoYmqChangeMessageVisibilityRequest(std::unique_ptr p, const IFacilityProvider& f); void DoYmqSetQueueAttributesRequest(std::unique_ptr p, const IFacilityProvider& f); +void DoYmqSendMessageBatchRequest(std::unique_ptr p, const IFacilityProvider& f); } } diff --git a/ydb/core/http_proxy/http_req.cpp b/ydb/core/http_proxy/http_req.cpp index 8a5989566bb6..60b8867924b6 100644 --- a/ydb/core/http_proxy/http_req.cpp +++ b/ydb/core/http_proxy/http_req.cpp @@ -530,6 +530,8 @@ namespace NKikimr::NHttpProxy { action = NSQS::EAction::ChangeMessageVisibility; } else if (Method == "SetQueueAttributes") { action = NSQS::EAction::SetQueueAttributes; + } else if (Method == "SendMessageBatch") { + action = NSQS::EAction::SendMessageBatch; } requestHolder->SetRequestId(HttpContext.RequestId); @@ -1066,6 +1068,7 @@ namespace NKikimr::NHttpProxy { DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(DeleteQueue); DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(ChangeMessageVisibility); DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(SetQueueAttributes); + DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(SendMessageBatch); #undef DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN } diff --git a/ydb/core/http_proxy/ut/http_proxy_ut.h b/ydb/core/http_proxy/ut/http_proxy_ut.h index 176d5d7e2acd..955df94e7bb0 100644 --- a/ydb/core/http_proxy/ut/http_proxy_ut.h +++ b/ydb/core/http_proxy/ut/http_proxy_ut.h @@ -1844,7 +1844,6 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) { NJson::TJsonValue attributes; attributes["DelaySeconds"] = "1"; createQueueReq["Attributes"] = attributes; - auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", std::move(createQueueReq), FormAuthorizationStr("ru-central1")); UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200); NJson::TJsonValue json; @@ -1872,4 +1871,39 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) { UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &resultJson)); UNIT_ASSERT_VALUES_EQUAL(resultJson["Attributes"]["DelaySeconds"], "2"); } + + Y_UNIT_TEST_F(TestSendMessageBatch, THttpProxyTestMock) { + auto createQueueReq = CreateSqsCreateQueueRequest(); + auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", std::move(createQueueReq), FormAuthorizationStr("ru-central1")); + UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200); + NJson::TJsonValue json; + UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json)); + TString resultQueueUrl = GetByPath(json, "QueueUrl"); + UNIT_ASSERT(resultQueueUrl.EndsWith("ExampleQueueName")); + + NJson::TJsonValue message0; + message0["Id"] = "Id-0"; + message0["MessageBody"] = "MessageBody-0"; + message0["MessageDeduplicationId"] = "MessageDeduplicationId-0"; + + NJson::TJsonValue message1; + message1["Id"] = "Id-1"; + message1["MessageBody"] = "MessageBody-1"; + message1["MessageDeduplicationId"] = "MessageDeduplicationId-1"; + + NJson::TJsonArray entries = {message0, message1}; + + NJson::TJsonValue sendMessageBatchReq; + sendMessageBatchReq["QueueUrl"] = resultQueueUrl; + sendMessageBatchReq["Entries"] = entries; + + res = SendHttpRequest("/Root", "AmazonSQS.SendMessageBatch", std::move(sendMessageBatchReq), FormAuthorizationStr("ru-central1")); + UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200); + UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json)); + UNIT_ASSERT(json["Successful"].GetArray().size() == 2); + auto succesful0 = json["Successful"][0]; + UNIT_ASSERT(succesful0["Id"] == "Id-0"); + UNIT_ASSERT(!GetByPath(succesful0, "Md5OfMessageBody").empty()); + UNIT_ASSERT(!GetByPath(succesful0, "MessageId").empty()); + } } // Y_UNIT_TEST_SUITE(TestHttpProxy) diff --git a/ydb/library/http_proxy/error/error.cpp b/ydb/library/http_proxy/error/error.cpp index 68ff471976be..cd0ff000a0d9 100644 --- a/ydb/library/http_proxy/error/error.cpp +++ b/ydb/library/http_proxy/error/error.cpp @@ -1,4 +1,5 @@ #include "error.h" +#include "util/generic/maybe.h" namespace NKikimr::NSQS { @@ -42,6 +43,13 @@ ui32 TErrorClass::GetId(const TString& code) { : it->second; }; +TMaybe TErrorClass::GetHttpStatus(const TString& code) { + auto idIt = NKikimr::NSQS::TErrorClass::ErrorToId.find(code); + if (idIt == NKikimr::NSQS::TErrorClass::ErrorToId.end()) { + return Nothing(); + } + return get<1>(IdToErrorAndCode.find(idIt->second)->second); +}; namespace NErrors { extern const TErrorClass ACCESS_DENIED = { diff --git a/ydb/library/http_proxy/error/error.h b/ydb/library/http_proxy/error/error.h index e2a1a1d1d0bc..d9207ccc2133 100644 --- a/ydb/library/http_proxy/error/error.h +++ b/ydb/library/http_proxy/error/error.h @@ -23,6 +23,7 @@ struct TErrorClass { static const std::tuple GetErrorAndCode(ui32 id); static ui32 GetId(const TString& code); + static TMaybe GetHttpStatus(const TString& code); private: static THashSet RegisteredCodes; diff --git a/ydb/public/api/grpc/draft/ydb_ymq_v1.proto b/ydb/public/api/grpc/draft/ydb_ymq_v1.proto index 1f1745306f9b..1befea770e49 100644 --- a/ydb/public/api/grpc/draft/ydb_ymq_v1.proto +++ b/ydb/public/api/grpc/draft/ydb_ymq_v1.proto @@ -21,4 +21,5 @@ service YmqService { rpc DeleteQueue(DeleteQueueRequest) returns (DeleteQueueResponse); rpc ChangeMessageVisibility(ChangeMessageVisibilityRequest) returns (ChangeMessageVisibilityResponse); rpc SetQueueAttributes(SetQueueAttributesRequest) returns (SetQueueAttributesResponse); + rpc SendMessageBatch(SendMessageBatchRequest) returns (SendMessageBatchResponse); } diff --git a/ydb/public/api/protos/draft/ymq.proto b/ydb/public/api/protos/draft/ymq.proto index be5e53b91efc..41997da7fc39 100644 --- a/ydb/public/api/protos/draft/ymq.proto +++ b/ydb/public/api/protos/draft/ymq.proto @@ -228,15 +228,6 @@ message SendMessageResult { string sequence_number = 5; } -message SendMessageBatchRequest { - Ydb.Operations.OperationParams operation_params = 1; - repeated SendMessageRequest entries = 2; -} - -message SendMessageBatchResponse { - Ydb.Operations.Operation operation = 1; -} - message BatchResultErrorEntry { string code = 1; string id = 2; @@ -244,12 +235,34 @@ message BatchResultErrorEntry { string message = 4; } +message SendMessageBatchRequestEntry { + string id = 1; + int32 delay_seconds = 2; + map message_attributes = 3; + string message_body = 4; + string message_deduplication_id = 5; + string message_group_id = 6; + map message_system_attributes = 7; + string queue_url = 8; +} + message SendMessageBatchResultEntry { - string md5_of_message_attributes = 1; - string md5_of_message_body= 2; - string md5_of_message_system_attributes= 3; - string message_id = 4; - string sequence_number = 5; + string id = 1; + string md5_of_message_body = 2; + string message_id = 3; + string md5_of_message_attributes = 4; + string md5_of_message_system_attributes = 5; + string sequence_number = 6; +} + +message SendMessageBatchRequest { + Ydb.Operations.OperationParams operation_params = 1; + repeated SendMessageBatchRequestEntry entries = 2; + string queue_url = 3; +} + +message SendMessageBatchResponse { + Ydb.Operations.Operation operation = 1; } message SendMessageBatchResult { @@ -269,3 +282,19 @@ message SetQueueAttributesResponse { message SetQueueAttributesResult { } + +message ListDeadLetterSourceQueuesRequest { + Ydb.Operations.OperationParams operation_params = 1; + int32 max_results = 2; + string next_token = 3; + string queue_url = 4; +} + +message ListDeadLetterSourceQueuesResponse { + Ydb.Operations.Operation operation = 1; +} + +message ListDeadLetterSourceQueuesResult { + string next_token = 1; + repeated string queue_urls = 2; +} diff --git a/ydb/services/ymq/grpc_service.cpp b/ydb/services/ymq/grpc_service.cpp index a3f747f1e356..e86fabf54245 100644 --- a/ydb/services/ymq/grpc_service.cpp +++ b/ydb/services/ymq/grpc_service.cpp @@ -43,6 +43,7 @@ void TGRpcYmqService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) ADD_REQUEST(DeleteQueue, DoYmqDeleteQueueRequest, nullptr, Off) ADD_REQUEST(ChangeMessageVisibility, DoYmqChangeMessageVisibilityRequest, nullptr, Off) ADD_REQUEST(SetQueueAttributes, DoYmqSetQueueAttributesRequest, nullptr, Off) + ADD_REQUEST(SendMessageBatch, DoYmqSendMessageBatchRequest, nullptr, Off) #undef ADD_REQUEST } diff --git a/ydb/services/ymq/ymq_proxy.cpp b/ydb/services/ymq/ymq_proxy.cpp index 20bca5c6ca19..e00b8503494d 100644 --- a/ydb/services/ymq/ymq_proxy.cpp +++ b/ydb/services/ymq/ymq_proxy.cpp @@ -708,10 +708,12 @@ namespace NKikimr::NYmq::V1 { Ydb::Ymq::V1::SetQueueAttributesResult GetResult(const NKikimrClient::TSqsResponse&) override { Ydb::Ymq::V1::SetQueueAttributesResult result; + return result; } }; + void AddAttribute(THolder& requestHolder, const TString& name, TString value) { auto attribute = requestHolder->MutableSetQueueAttributes()->MutableAttributes()->Add(); attribute->SetName(name); @@ -735,6 +737,70 @@ namespace NKikimr::NYmq::V1 { return result; } }; + + class TSendMessageBatchReplyCallback : public TReplyCallback< + NKikimr::NSQS::TSendMessageBatchResponse, + Ydb::Ymq::V1::SendMessageBatchResult> { + public: + using TReplyCallback::TReplyCallback; + + private: + const NKikimr::NSQS::TSendMessageBatchResponse& GetResponse(const NKikimrClient::TSqsResponse& resp) override { + return resp.GetSendMessageBatch(); + } + + Ydb::Ymq::V1::SendMessageBatchResult GetResult(const NKikimrClient::TSqsResponse& response) override { + Ydb::Ymq::V1::SendMessageBatchResult result; + response.GetSendMessageBatch(); + for (auto& entry : response.GetSendMessageBatch().GetEntries()) { + if (entry.GetError().HasErrorCode()) { + auto currentFailed = result.Addfailed(); + currentFailed->Setcode(entry.GetError().GetErrorCode()); + currentFailed->Setid(entry.GetId()); + currentFailed->Setmessage(entry.GetError().GetMessage()); + + ui32 httpStatus = NSQS::TErrorClass::GetHttpStatus(entry.GetError().GetErrorCode()).GetOrElse(400); + currentFailed->Setsender_fault(400 <= httpStatus && httpStatus < 500); + } else { + auto currentSuccessful = result.Addsuccessful(); + currentSuccessful->Setid(entry.GetId()); + currentSuccessful->Setmd5_of_message_body(entry.GetMD5OfMessageBody()); + currentSuccessful->Setmessage_id(entry.GetMessageId()); + currentSuccessful->Setsequence_number(std::to_string(entry.GetSequenceNumber())); + } + } + return result; + } + }; + + class TSendMessageBatchActor : public TRpcRequestActor< + TEvYmqSendMessageBatchRequest, + NKikimr::NSQS::TSendMessageBatchRequest, + TSendMessageBatchReplyCallback> { + public: + using TRpcRequestActor::TRpcRequestActor; + + private: + NKikimr::NSQS::TSendMessageBatchRequest* GetRequest(THolder& requestHolder) override { + auto result = requestHolder->MutableSendMessageBatch(); + result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url())->second); + for (auto& requestEntry : GetProtoRequest()->Getentries()) { + auto entry = requestHolder->MutableSendMessageBatch()->AddEntries(); + entry->SetId(requestEntry.Getid()); + for (auto& srcAttribute: requestEntry.Getmessage_attributes()) { + auto dstAttribute = entry->MutableMessageAttributes()->Add(); + dstAttribute->SetName(srcAttribute.first); + dstAttribute->SetStringValue(srcAttribute.second.Getstring_value()); + dstAttribute->SetBinaryValue(srcAttribute.second.Getbinary_value()); + dstAttribute->SetDataType(srcAttribute.second.Getdata_type()); + } + entry->SetMessageDeduplicationId(requestEntry.Getmessage_deduplication_id()); + entry->SetMessageGroupId(requestEntry.Getmessage_group_id()); + entry->SetMessageBody(requestEntry.Getmessage_body()); + } + return result; + } + }; } namespace NKikimr::NGRpcService { @@ -759,5 +825,6 @@ DECLARE_RPC(PurgeQueue); DECLARE_RPC(DeleteQueue); DECLARE_RPC(ChangeMessageVisibility); DECLARE_RPC(SetQueueAttributes); +DECLARE_RPC(SendMessageBatch); } diff --git a/ydb/services/ymq/ymq_proxy.h b/ydb/services/ymq/ymq_proxy.h index 4e324424c209..430e7d7c7443 100644 --- a/ydb/services/ymq/ymq_proxy.h +++ b/ydb/services/ymq/ymq_proxy.h @@ -24,6 +24,7 @@ using TEvYmqPurgeQueueRequest = TGrpcRequestOperationCall; using TEvYmqChangeMessageVisibilityRequest = TGrpcRequestOperationCall; using TEvYmqSetQueueAttributesRequest = TGrpcRequestOperationCall; +using TEvYmqSendMessageBatchRequest = TGrpcRequestOperationCall; } } From 8e9eab83046efc46ca7067bb2bc52a3a97bf7ee3 Mon Sep 17 00:00:00 2001 From: Sergey Veselov Date: Wed, 21 Aug 2024 11:01:40 +0000 Subject: [PATCH 3/4] Add DeleteMessageBatch request to SQS Json API --- ydb/core/grpc_services/service_ymq.h | 1 + ydb/core/http_proxy/http_req.cpp | 3 + ydb/core/http_proxy/ut/http_proxy_ut.h | 89 ++++++++++++++++++++++ ydb/public/api/grpc/draft/ydb_ymq_v1.proto | 1 + ydb/services/ymq/grpc_service.cpp | 1 + ydb/services/ymq/ymq_proxy.cpp | 56 +++++++++++++- ydb/services/ymq/ymq_proxy.h | 1 + 7 files changed, 151 insertions(+), 1 deletion(-) diff --git a/ydb/core/grpc_services/service_ymq.h b/ydb/core/grpc_services/service_ymq.h index 11cb208654b7..f4d429ace574 100644 --- a/ydb/core/grpc_services/service_ymq.h +++ b/ydb/core/grpc_services/service_ymq.h @@ -23,5 +23,6 @@ void DoYmqDeleteQueueRequest(std::unique_ptr p, const IFacilityPr void DoYmqChangeMessageVisibilityRequest(std::unique_ptr p, const IFacilityProvider& f); void DoYmqSetQueueAttributesRequest(std::unique_ptr p, const IFacilityProvider& f); void DoYmqSendMessageBatchRequest(std::unique_ptr p, const IFacilityProvider& f); +void DoYmqDeleteMessageBatchRequest(std::unique_ptr p, const IFacilityProvider& f); } } diff --git a/ydb/core/http_proxy/http_req.cpp b/ydb/core/http_proxy/http_req.cpp index 60b8867924b6..cc46421396f5 100644 --- a/ydb/core/http_proxy/http_req.cpp +++ b/ydb/core/http_proxy/http_req.cpp @@ -532,6 +532,8 @@ namespace NKikimr::NHttpProxy { action = NSQS::EAction::SetQueueAttributes; } else if (Method == "SendMessageBatch") { action = NSQS::EAction::SendMessageBatch; + }else if (Method == "DeleteMessageBatch") { + action = NSQS::EAction::DeleteMessageBatch; } requestHolder->SetRequestId(HttpContext.RequestId); @@ -1069,6 +1071,7 @@ namespace NKikimr::NHttpProxy { DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(ChangeMessageVisibility); DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(SetQueueAttributes); DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(SendMessageBatch); + DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(DeleteMessageBatch); #undef DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN } diff --git a/ydb/core/http_proxy/ut/http_proxy_ut.h b/ydb/core/http_proxy/ut/http_proxy_ut.h index 955df94e7bb0..ed1f81d86916 100644 --- a/ydb/core/http_proxy/ut/http_proxy_ut.h +++ b/ydb/core/http_proxy/ut/http_proxy_ut.h @@ -1906,4 +1906,93 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) { UNIT_ASSERT(!GetByPath(succesful0, "Md5OfMessageBody").empty()); UNIT_ASSERT(!GetByPath(succesful0, "MessageId").empty()); } + + Y_UNIT_TEST_F(TestDeleteMessageBatch, THttpProxyTestMock) { + auto createQueueReq = CreateSqsCreateQueueRequest(); + auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", std::move(createQueueReq), FormAuthorizationStr("ru-central1")); + UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200); + NJson::TJsonValue json; + UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json)); + TString resultQueueUrl = GetByPath(json, "QueueUrl"); + UNIT_ASSERT(resultQueueUrl.EndsWith("ExampleQueueName")); + + NJson::TJsonValue message0; + message0["Id"] = "Id-0"; + message0["MessageBody"] = "MessageBody-0"; + message0["MessageDeduplicationId"] = "MessageDeduplicationId-0"; + + NJson::TJsonValue message1; + message1["Id"] = "Id-1"; + message1["MessageBody"] = "MessageBody-1"; + message1["MessageDeduplicationId"] = "MessageDeduplicationId-1"; + + NJson::TJsonArray entries = {message0, message1}; + + NJson::TJsonValue sendMessageBatchReq; + sendMessageBatchReq["QueueUrl"] = resultQueueUrl; + sendMessageBatchReq["Entries"] = entries; + + res = SendHttpRequest("/Root", "AmazonSQS.SendMessageBatch", std::move(sendMessageBatchReq), FormAuthorizationStr("ru-central1")); + UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200); + UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json)); + UNIT_ASSERT(json["Successful"].GetArray().size() == 2); + + TVector messages; + for (int i = 0; i < 20; ++i) { + NJson::TJsonValue receiveMessageReq; + receiveMessageReq["QueueUrl"] = resultQueueUrl; + res = SendHttpRequest("/Root", "AmazonSQS.ReceiveMessage", std::move(receiveMessageReq), FormAuthorizationStr("ru-central1")); + if (res.Body != TString("{}")) { + NJson::ReadJsonTree(res.Body, &json); + if (json["Messages"].GetArray().size() == 2) { + messages.push_back(json["Messages"][0]); + messages.push_back(json["Messages"][1]); + break; + } + if (json["Messages"].GetArray().size() == 1) { + messages.push_back(json["Messages"][0]); + if (messages.size() == 2) { + break; + } + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + } + + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2); + + auto receiptHandle0 = messages[0]["ReceiptHandle"].GetString(); + UNIT_ASSERT(!receiptHandle0.Empty()); + auto receiptHandle1 = messages[1]["ReceiptHandle"].GetString(); + UNIT_ASSERT(!receiptHandle1.Empty()); + + NJson::TJsonValue deleteMessageBatchReq; + deleteMessageBatchReq["QueueUrl"] = resultQueueUrl; + + NJson::TJsonValue entry0; + entry0["Id"] = "Id-0"; + entry0["ReceiptHandle"] = receiptHandle0; + + NJson::TJsonValue entry1; + entry1["Id"] = "Id-1"; + entry1["ReceiptHandle"] = receiptHandle1; + + NJson::TJsonArray deleteEntries = {entry0, entry1}; + deleteMessageBatchReq["Entries"] = deleteEntries; + + res = SendHttpRequest("/Root", "AmazonSQS.DeleteMessageBatch", std::move(deleteMessageBatchReq), FormAuthorizationStr("ru-central1")); + UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200); + UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json)); + UNIT_ASSERT_VALUES_EQUAL(json["Successful"].GetArray().size(), 2); + UNIT_ASSERT_VALUES_EQUAL(json["Successful"][0]["Id"], "Id-0"); + UNIT_ASSERT_VALUES_EQUAL(json["Successful"][1]["Id"], "Id-1"); + + NJson::TJsonValue receiveMessageReq; + receiveMessageReq["QueueUrl"] = resultQueueUrl; + res = SendHttpRequest("/Root", "AmazonSQS.ReceiveMessage", std::move(receiveMessageReq), FormAuthorizationStr("ru-central1")); + UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json)); + UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200); + UNIT_ASSERT_VALUES_EQUAL(json["Messages"].GetArray().size(), 0); + + } } // Y_UNIT_TEST_SUITE(TestHttpProxy) diff --git a/ydb/public/api/grpc/draft/ydb_ymq_v1.proto b/ydb/public/api/grpc/draft/ydb_ymq_v1.proto index 1befea770e49..a098cdd5e2bb 100644 --- a/ydb/public/api/grpc/draft/ydb_ymq_v1.proto +++ b/ydb/public/api/grpc/draft/ydb_ymq_v1.proto @@ -22,4 +22,5 @@ service YmqService { rpc ChangeMessageVisibility(ChangeMessageVisibilityRequest) returns (ChangeMessageVisibilityResponse); rpc SetQueueAttributes(SetQueueAttributesRequest) returns (SetQueueAttributesResponse); rpc SendMessageBatch(SendMessageBatchRequest) returns (SendMessageBatchResponse); + rpc DeleteMessageBatch(DeleteMessageBatchRequest) returns (DeleteMessageBatchResponse); } diff --git a/ydb/services/ymq/grpc_service.cpp b/ydb/services/ymq/grpc_service.cpp index e86fabf54245..de063e5b58a4 100644 --- a/ydb/services/ymq/grpc_service.cpp +++ b/ydb/services/ymq/grpc_service.cpp @@ -44,6 +44,7 @@ void TGRpcYmqService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) ADD_REQUEST(ChangeMessageVisibility, DoYmqChangeMessageVisibilityRequest, nullptr, Off) ADD_REQUEST(SetQueueAttributes, DoYmqSetQueueAttributesRequest, nullptr, Off) ADD_REQUEST(SendMessageBatch, DoYmqSendMessageBatchRequest, nullptr, Off) + ADD_REQUEST(DeleteMessageBatch, DoYmqDeleteMessageBatchRequest, nullptr, Off) #undef ADD_REQUEST } diff --git a/ydb/services/ymq/ymq_proxy.cpp b/ydb/services/ymq/ymq_proxy.cpp index e00b8503494d..b54837c212bd 100644 --- a/ydb/services/ymq/ymq_proxy.cpp +++ b/ydb/services/ymq/ymq_proxy.cpp @@ -785,7 +785,7 @@ namespace NKikimr::NYmq::V1 { auto result = requestHolder->MutableSendMessageBatch(); result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url())->second); for (auto& requestEntry : GetProtoRequest()->Getentries()) { - auto entry = requestHolder->MutableSendMessageBatch()->AddEntries(); + auto entry = requestHolder->MutableSendMessageBatch()->MutableEntries()->Add(); entry->SetId(requestEntry.Getid()); for (auto& srcAttribute: requestEntry.Getmessage_attributes()) { auto dstAttribute = entry->MutableMessageAttributes()->Add(); @@ -801,6 +801,59 @@ namespace NKikimr::NYmq::V1 { return result; } }; + + class TDeleteMessageBatchReplyCallback : public TReplyCallback< + NKikimr::NSQS::TDeleteMessageBatchResponse, + Ydb::Ymq::V1::DeleteMessageBatchResult> { + public: + using TReplyCallback::TReplyCallback; + + private: + const NKikimr::NSQS::TDeleteMessageBatchResponse& GetResponse(const NKikimrClient::TSqsResponse& resp) override { + return resp.GetDeleteMessageBatch(); + } + + Ydb::Ymq::V1::DeleteMessageBatchResult GetResult(const NKikimrClient::TSqsResponse& response) override { + Ydb::Ymq::V1::DeleteMessageBatchResult result; + auto entries = response.GetDeleteMessageBatch().GetEntries(); + for (auto i = 0; i < entries.size(); i++) { + auto &entry = response.GetDeleteMessageBatch().GetEntries()[i]; + if (entry.GetError().HasErrorCode()) { + auto currentFailed = result.Addfailed(); + currentFailed->Setcode(entry.GetError().GetErrorCode()); + currentFailed->Setid(entry.GetId()); + currentFailed->Setmessage(entry.GetError().GetMessage()); + + ui32 httpStatus = NSQS::TErrorClass::GetHttpStatus(entry.GetError().GetErrorCode()).GetOrElse(400); + currentFailed->Setsender_fault(400 <= httpStatus && httpStatus < 500); + } else { + auto currentSuccessful = result.Addsuccessful(); + currentSuccessful->Setid(entry.GetId()); + } + } + return result; + } + }; + + class TDeleteMessageBatchActor : public TRpcRequestActor< + TEvYmqDeleteMessageBatchRequest, + NKikimr::NSQS::TDeleteMessageBatchRequest, + TDeleteMessageBatchReplyCallback> { + public: + using TRpcRequestActor::TRpcRequestActor; + + private: + NKikimr::NSQS::TDeleteMessageBatchRequest* GetRequest(THolder& requestHolder) override { + auto result = requestHolder->MutableDeleteMessageBatch(); + result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url())->second); + for (auto& requestEntry : GetProtoRequest()->Getentries()) { + auto entry = requestHolder->MutableDeleteMessageBatch()->AddEntries(); + entry->SetId(requestEntry.Getid()); + entry->SetReceiptHandle(requestEntry.Getreceipt_handle()); + } + return result; + } + }; } namespace NKikimr::NGRpcService { @@ -826,5 +879,6 @@ DECLARE_RPC(DeleteQueue); DECLARE_RPC(ChangeMessageVisibility); DECLARE_RPC(SetQueueAttributes); DECLARE_RPC(SendMessageBatch); +DECLARE_RPC(DeleteMessageBatch); } diff --git a/ydb/services/ymq/ymq_proxy.h b/ydb/services/ymq/ymq_proxy.h index 430e7d7c7443..4e94d164eb46 100644 --- a/ydb/services/ymq/ymq_proxy.h +++ b/ydb/services/ymq/ymq_proxy.h @@ -25,6 +25,7 @@ using TEvYmqDeleteQueueRequest = TGrpcRequestOperationCall; using TEvYmqSetQueueAttributesRequest = TGrpcRequestOperationCall; using TEvYmqSendMessageBatchRequest = TGrpcRequestOperationCall; +using TEvYmqDeleteMessageBatchRequest = TGrpcRequestOperationCall; } } From 246ab1da05a30d98d7a5e0e6c2343ea71895c200 Mon Sep 17 00:00:00 2001 From: Sergey Veselov Date: Wed, 21 Aug 2024 12:06:19 +0000 Subject: [PATCH 4/4] Add ChangeMessageVisibilityBatch request to SQS Json API progress --- ydb/core/grpc_services/service_ymq.h | 1 + ydb/core/http_proxy/http_req.cpp | 3 ++ ydb/core/http_proxy/ut/http_proxy_ut.h | 1 + ydb/public/api/grpc/draft/ydb_ymq_v1.proto | 1 + ydb/services/ymq/grpc_service.cpp | 1 + ydb/services/ymq/ymq_proxy.cpp | 53 ++++++++++++++++++++++ ydb/services/ymq/ymq_proxy.h | 1 + 7 files changed, 61 insertions(+) diff --git a/ydb/core/grpc_services/service_ymq.h b/ydb/core/grpc_services/service_ymq.h index f4d429ace574..4d35c1b49db8 100644 --- a/ydb/core/grpc_services/service_ymq.h +++ b/ydb/core/grpc_services/service_ymq.h @@ -24,5 +24,6 @@ void DoYmqChangeMessageVisibilityRequest(std::unique_ptr p, const void DoYmqSetQueueAttributesRequest(std::unique_ptr p, const IFacilityProvider& f); void DoYmqSendMessageBatchRequest(std::unique_ptr p, const IFacilityProvider& f); void DoYmqDeleteMessageBatchRequest(std::unique_ptr p, const IFacilityProvider& f); +void DoYmqChangeMessageVisibilityBatchRequest(std::unique_ptr p, const IFacilityProvider& f); } } diff --git a/ydb/core/http_proxy/http_req.cpp b/ydb/core/http_proxy/http_req.cpp index cc46421396f5..dee6f7177be0 100644 --- a/ydb/core/http_proxy/http_req.cpp +++ b/ydb/core/http_proxy/http_req.cpp @@ -534,6 +534,8 @@ namespace NKikimr::NHttpProxy { action = NSQS::EAction::SendMessageBatch; }else if (Method == "DeleteMessageBatch") { action = NSQS::EAction::DeleteMessageBatch; + } else if (Method == "ChangeMessageVisibilityBatch") { + action = NSQS::EAction::ChangeMessageVisibilityBatch; } requestHolder->SetRequestId(HttpContext.RequestId); @@ -1072,6 +1074,7 @@ namespace NKikimr::NHttpProxy { DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(SetQueueAttributes); DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(SendMessageBatch); DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(DeleteMessageBatch); + DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(ChangeMessageVisibilityBatch); #undef DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN } diff --git a/ydb/core/http_proxy/ut/http_proxy_ut.h b/ydb/core/http_proxy/ut/http_proxy_ut.h index ed1f81d86916..8b43a0f98969 100644 --- a/ydb/core/http_proxy/ut/http_proxy_ut.h +++ b/ydb/core/http_proxy/ut/http_proxy_ut.h @@ -1995,4 +1995,5 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) { UNIT_ASSERT_VALUES_EQUAL(json["Messages"].GetArray().size(), 0); } + } // Y_UNIT_TEST_SUITE(TestHttpProxy) diff --git a/ydb/public/api/grpc/draft/ydb_ymq_v1.proto b/ydb/public/api/grpc/draft/ydb_ymq_v1.proto index a098cdd5e2bb..58f104289f33 100644 --- a/ydb/public/api/grpc/draft/ydb_ymq_v1.proto +++ b/ydb/public/api/grpc/draft/ydb_ymq_v1.proto @@ -23,4 +23,5 @@ service YmqService { rpc SetQueueAttributes(SetQueueAttributesRequest) returns (SetQueueAttributesResponse); rpc SendMessageBatch(SendMessageBatchRequest) returns (SendMessageBatchResponse); rpc DeleteMessageBatch(DeleteMessageBatchRequest) returns (DeleteMessageBatchResponse); + rpc ChangeMessageVisibilityBatch(ChangeMessageVisibilityBatchRequest) returns (ChangeMessageVisibilityBatchResponse); } diff --git a/ydb/services/ymq/grpc_service.cpp b/ydb/services/ymq/grpc_service.cpp index de063e5b58a4..20d891253dae 100644 --- a/ydb/services/ymq/grpc_service.cpp +++ b/ydb/services/ymq/grpc_service.cpp @@ -45,6 +45,7 @@ void TGRpcYmqService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) ADD_REQUEST(SetQueueAttributes, DoYmqSetQueueAttributesRequest, nullptr, Off) ADD_REQUEST(SendMessageBatch, DoYmqSendMessageBatchRequest, nullptr, Off) ADD_REQUEST(DeleteMessageBatch, DoYmqDeleteMessageBatchRequest, nullptr, Off) + ADD_REQUEST(ChangeMessageVisibilityBatch, DoYmqChangeMessageVisibilityBatchRequest, nullptr, Off) #undef ADD_REQUEST } diff --git a/ydb/services/ymq/ymq_proxy.cpp b/ydb/services/ymq/ymq_proxy.cpp index b54837c212bd..0453f6f5dfb0 100644 --- a/ydb/services/ymq/ymq_proxy.cpp +++ b/ydb/services/ymq/ymq_proxy.cpp @@ -854,6 +854,58 @@ namespace NKikimr::NYmq::V1 { return result; } }; + + class TChangeMessageVisibilityBatchReplyCallback : public TReplyCallback< + NKikimr::NSQS::TChangeMessageVisibilityBatchResponse, + Ydb::Ymq::V1::ChangeMessageVisibilityBatchResult> { + public: + using TReplyCallback::TReplyCallback; + + private: + const NKikimr::NSQS::TChangeMessageVisibilityBatchResponse& GetResponse(const NKikimrClient::TSqsResponse& resp) override { + return resp.GetChangeMessageVisibilityBatch(); + } + + Ydb::Ymq::V1::ChangeMessageVisibilityBatchResult GetResult(const NKikimrClient::TSqsResponse& response) override { + Ydb::Ymq::V1::ChangeMessageVisibilityBatchResult result; + for (auto& entry : response.GetChangeMessageVisibilityBatch().GetEntries()) { + if (entry.GetError().HasErrorCode()) { + auto currentFailed = result.Addfailed(); + currentFailed->Setcode(entry.GetError().GetErrorCode()); + currentFailed->Setid(entry.GetId()); + currentFailed->Setmessage(entry.GetError().GetMessage()); + + ui32 httpStatus = NSQS::TErrorClass::GetHttpStatus(entry.GetError().GetErrorCode()).GetOrElse(400); + currentFailed->Setsender_fault(400 <= httpStatus && httpStatus < 500); + } else { + auto currentSuccessful = result.Addsuccessful(); + currentSuccessful->Setid(entry.GetId()); + } + } + return result; + } + }; + + class TChangeMessageVisibilityBatchActor : public TRpcRequestActor< + TEvYmqChangeMessageVisibilityBatchRequest, + NKikimr::NSQS::TChangeMessageVisibilityBatchRequest, + TChangeMessageVisibilityBatchReplyCallback> { + public: + using TRpcRequestActor::TRpcRequestActor; + + private: + NKikimr::NSQS::TChangeMessageVisibilityBatchRequest* GetRequest(THolder& requestHolder) override { + auto result = requestHolder->MutableChangeMessageVisibilityBatch(); + result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url())->second); + for (auto& requestEntry : GetProtoRequest()->Getentries()) { + auto entry = requestHolder->MutableChangeMessageVisibilityBatch()->MutableEntries()->Add(); + entry->SetId(requestEntry.Getid()); + entry->SetVisibilityTimeout(requestEntry.Getvisibility_timeout()); + entry->SetReceiptHandle(requestEntry.Getreceipt_handle()); + } + return result; + } + }; } namespace NKikimr::NGRpcService { @@ -880,5 +932,6 @@ DECLARE_RPC(ChangeMessageVisibility); DECLARE_RPC(SetQueueAttributes); DECLARE_RPC(SendMessageBatch); DECLARE_RPC(DeleteMessageBatch); +DECLARE_RPC(ChangeMessageVisibilityBatch); } diff --git a/ydb/services/ymq/ymq_proxy.h b/ydb/services/ymq/ymq_proxy.h index 4e94d164eb46..01636f81ce36 100644 --- a/ydb/services/ymq/ymq_proxy.h +++ b/ydb/services/ymq/ymq_proxy.h @@ -26,6 +26,7 @@ using TEvYmqChangeMessageVisibilityRequest = TGrpcRequestOperationCall; using TEvYmqSendMessageBatchRequest = TGrpcRequestOperationCall; using TEvYmqDeleteMessageBatchRequest = TGrpcRequestOperationCall; +using TEvYmqChangeMessageVisibilityBatchRequest = TGrpcRequestOperationCall; } }