diff --git a/ydb/core/http_proxy/ut/http_proxy_ut.h b/ydb/core/http_proxy/ut/http_proxy_ut.h index 0ab8b81d9200..bede6ffa67f2 100644 --- a/ydb/core/http_proxy/ut/http_proxy_ut.h +++ b/ydb/core/http_proxy/ut/http_proxy_ut.h @@ -1824,7 +1824,6 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) { res = SendHttpRequest("/Root", "AmazonSQS.DeleteQueue", std::move(deleteQueueReq), FormAuthorizationStr("ru-central1")); UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200); - for (int i = 0; i < 61; ++i) { req = CreateSqsGetQueueUrlRequest(); res = SendHttpRequest("/Root", "AmazonSQS.GetQueueUrl", std::move(req), FormAuthorizationStr("ru-central1")); @@ -2042,4 +2041,139 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) { UNIT_ASSERT_VALUES_EQUAL(json["QueueUrls"][0], resultQueueUrl); } + Y_UNIT_TEST_F(TestChangeMessageVisibility, THttpProxyTestMock) { + auto createQueueReq = CreateSqsCreateQueueRequest(); + auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", std::move(createQueueReq), FormAuthorizationStr("ru-central1")); + UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200); + NJson::TJsonValue json; + UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json)); + TString resultQueueUrl = GetByPath(json, "QueueUrl"); + + NJson::TJsonValue sendMessageReq; + sendMessageReq["QueueUrl"] = resultQueueUrl; + auto body = "MessageBody-0"; + sendMessageReq["MessageBody"] = body; + + res = SendHttpRequest("/Root", "AmazonSQS.SendMessage", std::move(sendMessageReq), FormAuthorizationStr("ru-central1")); + UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200); + + for (int i = 0; i < 20; ++i) { + NJson::TJsonValue receiveMessageReq; + receiveMessageReq["QueueUrl"] = resultQueueUrl; + res = SendHttpRequest("/Root", "AmazonSQS.ReceiveMessage", std::move(receiveMessageReq), FormAuthorizationStr("ru-central1")); + if (res.Body != TString("{}")) { + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + } + + UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200); + + UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json)); + + auto receiptHandle = json["Messages"][0]["ReceiptHandle"].GetString(); + UNIT_ASSERT(!receiptHandle.Empty()); + + NJson::TJsonValue changeMessageVisibility; + changeMessageVisibility["QueueUrl"] = resultQueueUrl; + changeMessageVisibility["ReceiptHandle"] = receiptHandle; + changeMessageVisibility["VisibilityTimeout"] = 1; + + res = SendHttpRequest( + "/Root", + "AmazonSQS.ChangeMessageVisibility", + std::move(changeMessageVisibility), + FormAuthorizationStr("ru-central1") + ); + + UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200); + } + + Y_UNIT_TEST_F(TestChangeMessageVisibilityBatch, THttpProxyTestMock) { + auto createQueueReq = CreateSqsCreateQueueRequest(); + auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", std::move(createQueueReq), FormAuthorizationStr("ru-central1")); + UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200); + NJson::TJsonValue json; + UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json)); + TString resultQueueUrl = GetByPath(json, "QueueUrl"); + UNIT_ASSERT(resultQueueUrl.EndsWith("ExampleQueueName")); + + NJson::TJsonValue message0; + message0["Id"] = "Id-0"; + message0["MessageBody"] = "MessageBody-0"; + message0["MessageDeduplicationId"] = "MessageDeduplicationId-0"; + + NJson::TJsonValue message1; + message1["Id"] = "Id-1"; + message1["MessageBody"] = "MessageBody-1"; + message1["MessageDeduplicationId"] = "MessageDeduplicationId-1"; + + NJson::TJsonArray entries = {message0, message1}; + + NJson::TJsonValue sendMessageBatchReq; + sendMessageBatchReq["QueueUrl"] = resultQueueUrl; + sendMessageBatchReq["Entries"] = entries; + + res = SendHttpRequest("/Root", "AmazonSQS.SendMessageBatch", std::move(sendMessageBatchReq), FormAuthorizationStr("ru-central1")); + UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200); + UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json)); + UNIT_ASSERT(json["Successful"].GetArray().size() == 2); + + TVector messages; + for (int i = 0; i < 20; ++i) { + NJson::TJsonValue receiveMessageReq; + receiveMessageReq["QueueUrl"] = resultQueueUrl; + res = SendHttpRequest("/Root", "AmazonSQS.ReceiveMessage", std::move(receiveMessageReq), FormAuthorizationStr("ru-central1")); + if (res.Body != TString("{}")) { + NJson::ReadJsonTree(res.Body, &json); + if (json["Messages"].GetArray().size() == 2) { + messages.push_back(json["Messages"][0]); + messages.push_back(json["Messages"][1]); + break; + } + if (json["Messages"].GetArray().size() == 1) { + messages.push_back(json["Messages"][0]); + if (messages.size() == 2) { + break; + } + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + } + + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2); + + auto receiptHandle0 = messages[0]["ReceiptHandle"].GetString(); + UNIT_ASSERT(!receiptHandle0.Empty()); + auto receiptHandle1 = messages[1]["ReceiptHandle"].GetString(); + UNIT_ASSERT(!receiptHandle1.Empty()); + + + NJson::TJsonValue changeMessageVisibilityBatchReq; + changeMessageVisibilityBatchReq["QueueUrl"] = resultQueueUrl; + + NJson::TJsonValue entry0; + entry0["Id"] = "Id-0"; + entry0["ReceiptHandle"] = receiptHandle0; + entry0["VisibilityTimeout"] = 1; + + NJson::TJsonValue entry1; + entry1["Id"] = "Id-1"; + entry1["ReceiptHandle"] = receiptHandle1; + entry1["VisibilityTimeout"] = 2; + + NJson::TJsonArray changeVisibilityEntries = {entry0, entry1}; + changeMessageVisibilityBatchReq["Entries"] = changeVisibilityEntries; + + res = SendHttpRequest( + "/Root", "AmazonSQS.ChangeMessageVisibilityBatch", + std::move(changeMessageVisibilityBatchReq), + FormAuthorizationStr("ru-central1") + ); + UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200); + UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json)); + UNIT_ASSERT_VALUES_EQUAL(json["Successful"].GetArray().size(), 2); + UNIT_ASSERT_VALUES_EQUAL(json["Successful"][0]["Id"], "Id-0"); + UNIT_ASSERT_VALUES_EQUAL(json["Successful"][1]["Id"], "Id-1"); + } } // Y_UNIT_TEST_SUITE(TestHttpProxy) diff --git a/ydb/public/api/protos/draft/ymq.proto b/ydb/public/api/protos/draft/ymq.proto index 41997da7fc39..b9d202e995d4 100644 --- a/ydb/public/api/protos/draft/ymq.proto +++ b/ydb/public/api/protos/draft/ymq.proto @@ -25,7 +25,7 @@ message ChangeMessageVisibilityResult { message ChangeMessageVisibilityBatchRequestEntry { string id = 1; string receipt_handle = 2; - int32 visibility_timeout = 3; + optional int32 visibility_timeout = 3; } message ChangeMessageVisibilityBatchRequest { @@ -128,7 +128,7 @@ message GetQueueAttributesResult { message GetQueueUrlRequest { Ydb.Operations.OperationParams operation_params = 1; string queue_name = 2; - string queue_owner_aws_account_id = 3; + optional string queue_owner_aws_account_id = 3; } message GetQueueUrlResponse { @@ -141,9 +141,9 @@ message GetQueueUrlResult { message ListQueuesRequest { Ydb.Operations.OperationParams operation_params = 1; - int64 max_results = 2; - string next_token = 3; - string queue_name_prefix = 4; + optional int64 max_results = 2; + optional string next_token = 3; + optional string queue_name_prefix = 4; } message ListQueuesResponse { @@ -178,13 +178,13 @@ message MessageAttribute { message ReceiveMessageRequest { Ydb.Operations.OperationParams operation_params = 1; repeated string attribute_names = 2; - int32 max_number_of_messages = 3; + optional int32 max_number_of_messages = 3; repeated string message_attribute_names = 4; repeated string message_system_attribute_names = 5; string queue_url = 6; - string receive_request_attempt_id = 7; - int32 visibility_timeout = 8; - int32 wait_time_seconds = 9; + optional string receive_request_attempt_id = 7; + optional int32 visibility_timeout = 8; + optional int32 wait_time_seconds = 9; } message ReceiveMessageResponse { @@ -207,11 +207,11 @@ message ReceiveMessageResult { message SendMessageRequest { Ydb.Operations.OperationParams operation_params = 1; - int32 delay_seconds = 2; + optional int32 delay_seconds = 2; map message_attributes = 3; string message_body = 4; - string message_deduplication_id = 5; - string message_group_id = 6; + optional string message_deduplication_id = 5; + optional string message_group_id = 6; map message_system_attributes = 7; string queue_url = 8; } @@ -237,11 +237,11 @@ message BatchResultErrorEntry { message SendMessageBatchRequestEntry { string id = 1; - int32 delay_seconds = 2; + optional int32 delay_seconds = 2; map message_attributes = 3; string message_body = 4; - string message_deduplication_id = 5; - string message_group_id = 6; + optional string message_deduplication_id = 5; + optional string message_group_id = 6; map message_system_attributes = 7; string queue_url = 8; } @@ -285,8 +285,8 @@ message SetQueueAttributesResult { message ListDeadLetterSourceQueuesRequest { Ydb.Operations.OperationParams operation_params = 1; - int32 max_results = 2; - string next_token = 3; + optional int32 max_results = 2; + optional string next_token = 3; string queue_url = 4; } diff --git a/ydb/services/ymq/ymq_proxy.cpp b/ydb/services/ymq/ymq_proxy.cpp index c16bfdaf7c69..0ee29a98e88b 100644 --- a/ydb/services/ymq/ymq_proxy.cpp +++ b/ydb/services/ymq/ymq_proxy.cpp @@ -32,6 +32,16 @@ using grpc::Status; namespace NKikimr::NYmq::V1 { + #define COPY_FIELD_IF_PRESENT(from, to) \ + if (GetProtoRequest()->Has##from()) { \ + result->Set##to(GetProtoRequest()->Get##from()); \ + } + + #define COPY_FIELD_IF_PRESENT_IN_ENTRY(from, to) \ + if (requestEntry.Has##from()) { \ + entry->Set##to(requestEntry.Get##from()); \ + } + using namespace NGRpcService; using namespace NGRpcProxy::V1; @@ -267,6 +277,7 @@ namespace NKikimr::NYmq::V1 { private: NKikimr::NSQS::TSendMessageRequest* GetRequest(THolder& requestHolder) override { auto result = requestHolder->MutableSendMessage(); + for (auto& srcAttribute: GetProtoRequest()->Getmessage_attributes()) { auto dstAttribute = result->MutableMessageAttributes()->Add(); dstAttribute->SetName(srcAttribute.first); @@ -274,10 +285,14 @@ namespace NKikimr::NYmq::V1 { dstAttribute->SetBinaryValue(srcAttribute.second.Getbinary_value()); dstAttribute->SetDataType(srcAttribute.second.Getdata_type()); } - result->SetMessageDeduplicationId(GetProtoRequest()->message_deduplication_id()); - result->SetMessageGroupId(GetProtoRequest()->message_group_id()); - result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second); - result->SetMessageBody(GetProtoRequest()->message_body()); + + COPY_FIELD_IF_PRESENT(delay_seconds, DelaySeconds); + COPY_FIELD_IF_PRESENT(message_deduplication_id, MessageDeduplicationId); + COPY_FIELD_IF_PRESENT(message_group_id, MessageGroupId); + + result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url())->second); + + result->SetMessageBody(GetProtoRequest()->Getmessage_body()); return result; } @@ -363,6 +378,14 @@ namespace NKikimr::NYmq::V1 { private: NKikimr::NSQS::TReceiveMessageRequest* GetRequest(THolder& requestHolder) override { auto result = requestHolder->MutableReceiveMessage(); + + result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second); + + COPY_FIELD_IF_PRESENT(max_number_of_messages, MaxNumberOfMessages); + COPY_FIELD_IF_PRESENT(receive_request_attempt_id, ReceiveRequestAttemptId); + COPY_FIELD_IF_PRESENT(visibility_timeout, VisibilityTimeout); + COPY_FIELD_IF_PRESENT(wait_time_seconds, WaitTimeSeconds); + auto systemAttributeNames = GetProtoRequest()->Getmessage_system_attribute_names(); // We ignore AttributeNames if SystemAttributeNames is present, // because AttributeNames is deprecated in favour of SystemAttributeNames @@ -376,14 +399,11 @@ namespace NKikimr::NYmq::V1 { result->SetAttributeName(i, attributeNames[i]); } } - result->SetMaxNumberOfMessages(GetProtoRequest()->max_number_of_messages()); + for (int i = 0; i < GetProtoRequest()->Getmessage_attribute_names().size(); i++) { result->SetMessageAttributeName(i, GetProtoRequest()->Getmessage_attribute_names()[i]); } - result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second); - result->SetReceiveRequestAttemptId(GetProtoRequest()->Getreceive_request_attempt_id()); - result->SetVisibilityTimeout(GetProtoRequest()->Getvisibility_timeout()); - result->SetWaitTimeSeconds(GetProtoRequest()->Getwait_time_seconds()); + return result; } @@ -559,7 +579,7 @@ namespace NKikimr::NYmq::V1 { private: NKikimr::NSQS::TListQueuesRequest* GetRequest(THolder& requestHolder) override { auto result = requestHolder->MutableListQueues(); - result->SetQueueNamePrefix(GetProtoRequest()->Getqueue_name_prefix()); + COPY_FIELD_IF_PRESENT(queue_name_prefix, QueueNamePrefix); return result; } }; @@ -818,10 +838,19 @@ namespace NKikimr::NYmq::V1 { private: NKikimr::NSQS::TSendMessageBatchRequest* GetRequest(THolder& requestHolder) override { auto result = requestHolder->MutableSendMessageBatch(); + result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url())->second); + for (auto& requestEntry : GetProtoRequest()->Getentries()) { auto entry = requestHolder->MutableSendMessageBatch()->MutableEntries()->Add(); + entry->SetId(requestEntry.Getid()); + entry->SetMessageBody(requestEntry.Getmessage_body()); + + COPY_FIELD_IF_PRESENT_IN_ENTRY(delay_seconds, DelaySeconds); + COPY_FIELD_IF_PRESENT_IN_ENTRY(message_deduplication_id, MessageDeduplicationId); + COPY_FIELD_IF_PRESENT_IN_ENTRY(message_group_id, MessageGroupId); + for (auto& srcAttribute: requestEntry.Getmessage_attributes()) { auto dstAttribute = entry->MutableMessageAttributes()->Add(); dstAttribute->SetName(srcAttribute.first); @@ -829,9 +858,6 @@ namespace NKikimr::NYmq::V1 { dstAttribute->SetBinaryValue(srcAttribute.second.Getbinary_value()); dstAttribute->SetDataType(srcAttribute.second.Getdata_type()); } - entry->SetMessageDeduplicationId(requestEntry.Getmessage_deduplication_id()); - entry->SetMessageGroupId(requestEntry.Getmessage_group_id()); - entry->SetMessageBody(requestEntry.Getmessage_body()); } return result; } @@ -935,12 +961,15 @@ namespace NKikimr::NYmq::V1 { for (auto& requestEntry : GetProtoRequest()->Getentries()) { auto entry = requestHolder->MutableChangeMessageVisibilityBatch()->MutableEntries()->Add(); entry->SetId(requestEntry.Getid()); - entry->SetVisibilityTimeout(requestEntry.Getvisibility_timeout()); entry->SetReceiptHandle(requestEntry.Getreceipt_handle()); + COPY_FIELD_IF_PRESENT_IN_ENTRY(visibility_timeout, VisibilityTimeout) } return result; } }; + + #undef COPY_FIELD_IF_PRESENT + #undef COPY_FIELD_IF_PRESENT_IN_ENTRY } namespace NKikimr::NGRpcService {