Skip to content

Commit 5cb33a5

Browse files
LOGBROKER 8891 fix not using optional fields in sqs json api properly (#8523)
1 parent fd6e5db commit 5cb33a5

File tree

3 files changed

+195
-32
lines changed

3 files changed

+195
-32
lines changed

ydb/core/http_proxy/ut/http_proxy_ut.h

Lines changed: 135 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1824,7 +1824,6 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) {
18241824
res = SendHttpRequest("/Root", "AmazonSQS.DeleteQueue", std::move(deleteQueueReq), FormAuthorizationStr("ru-central1"));
18251825
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
18261826

1827-
18281827
for (int i = 0; i < 61; ++i) {
18291828
req = CreateSqsGetQueueUrlRequest();
18301829
res = SendHttpRequest("/Root", "AmazonSQS.GetQueueUrl", std::move(req), FormAuthorizationStr("ru-central1"));
@@ -2052,4 +2051,139 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) {
20522051
UNIT_ASSERT_VALUES_EQUAL(json["QueueUrls"][0], resultQueueUrl);
20532052
}
20542053

2054+
Y_UNIT_TEST_F(TestChangeMessageVisibility, THttpProxyTestMock) {
2055+
auto createQueueReq = CreateSqsCreateQueueRequest();
2056+
auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", std::move(createQueueReq), FormAuthorizationStr("ru-central1"));
2057+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
2058+
NJson::TJsonValue json;
2059+
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
2060+
TString resultQueueUrl = GetByPath<TString>(json, "QueueUrl");
2061+
2062+
NJson::TJsonValue sendMessageReq;
2063+
sendMessageReq["QueueUrl"] = resultQueueUrl;
2064+
auto body = "MessageBody-0";
2065+
sendMessageReq["MessageBody"] = body;
2066+
2067+
res = SendHttpRequest("/Root", "AmazonSQS.SendMessage", std::move(sendMessageReq), FormAuthorizationStr("ru-central1"));
2068+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
2069+
2070+
for (int i = 0; i < 20; ++i) {
2071+
NJson::TJsonValue receiveMessageReq;
2072+
receiveMessageReq["QueueUrl"] = resultQueueUrl;
2073+
res = SendHttpRequest("/Root", "AmazonSQS.ReceiveMessage", std::move(receiveMessageReq), FormAuthorizationStr("ru-central1"));
2074+
if (res.Body != TString("{}")) {
2075+
break;
2076+
}
2077+
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
2078+
}
2079+
2080+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
2081+
2082+
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
2083+
2084+
auto receiptHandle = json["Messages"][0]["ReceiptHandle"].GetString();
2085+
UNIT_ASSERT(!receiptHandle.Empty());
2086+
2087+
NJson::TJsonValue changeMessageVisibility;
2088+
changeMessageVisibility["QueueUrl"] = resultQueueUrl;
2089+
changeMessageVisibility["ReceiptHandle"] = receiptHandle;
2090+
changeMessageVisibility["VisibilityTimeout"] = 1;
2091+
2092+
res = SendHttpRequest(
2093+
"/Root",
2094+
"AmazonSQS.ChangeMessageVisibility",
2095+
std::move(changeMessageVisibility),
2096+
FormAuthorizationStr("ru-central1")
2097+
);
2098+
2099+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
2100+
}
2101+
2102+
Y_UNIT_TEST_F(TestChangeMessageVisibilityBatch, THttpProxyTestMock) {
2103+
auto createQueueReq = CreateSqsCreateQueueRequest();
2104+
auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", std::move(createQueueReq), FormAuthorizationStr("ru-central1"));
2105+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
2106+
NJson::TJsonValue json;
2107+
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
2108+
TString resultQueueUrl = GetByPath<TString>(json, "QueueUrl");
2109+
UNIT_ASSERT(resultQueueUrl.EndsWith("ExampleQueueName"));
2110+
2111+
NJson::TJsonValue message0;
2112+
message0["Id"] = "Id-0";
2113+
message0["MessageBody"] = "MessageBody-0";
2114+
message0["MessageDeduplicationId"] = "MessageDeduplicationId-0";
2115+
2116+
NJson::TJsonValue message1;
2117+
message1["Id"] = "Id-1";
2118+
message1["MessageBody"] = "MessageBody-1";
2119+
message1["MessageDeduplicationId"] = "MessageDeduplicationId-1";
2120+
2121+
NJson::TJsonArray entries = {message0, message1};
2122+
2123+
NJson::TJsonValue sendMessageBatchReq;
2124+
sendMessageBatchReq["QueueUrl"] = resultQueueUrl;
2125+
sendMessageBatchReq["Entries"] = entries;
2126+
2127+
res = SendHttpRequest("/Root", "AmazonSQS.SendMessageBatch", std::move(sendMessageBatchReq), FormAuthorizationStr("ru-central1"));
2128+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
2129+
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
2130+
UNIT_ASSERT(json["Successful"].GetArray().size() == 2);
2131+
2132+
TVector<NJson::TJsonValue> messages;
2133+
for (int i = 0; i < 20; ++i) {
2134+
NJson::TJsonValue receiveMessageReq;
2135+
receiveMessageReq["QueueUrl"] = resultQueueUrl;
2136+
res = SendHttpRequest("/Root", "AmazonSQS.ReceiveMessage", std::move(receiveMessageReq), FormAuthorizationStr("ru-central1"));
2137+
if (res.Body != TString("{}")) {
2138+
NJson::ReadJsonTree(res.Body, &json);
2139+
if (json["Messages"].GetArray().size() == 2) {
2140+
messages.push_back(json["Messages"][0]);
2141+
messages.push_back(json["Messages"][1]);
2142+
break;
2143+
}
2144+
if (json["Messages"].GetArray().size() == 1) {
2145+
messages.push_back(json["Messages"][0]);
2146+
if (messages.size() == 2) {
2147+
break;
2148+
}
2149+
}
2150+
}
2151+
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
2152+
}
2153+
2154+
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2);
2155+
2156+
auto receiptHandle0 = messages[0]["ReceiptHandle"].GetString();
2157+
UNIT_ASSERT(!receiptHandle0.Empty());
2158+
auto receiptHandle1 = messages[1]["ReceiptHandle"].GetString();
2159+
UNIT_ASSERT(!receiptHandle1.Empty());
2160+
2161+
2162+
NJson::TJsonValue changeMessageVisibilityBatchReq;
2163+
changeMessageVisibilityBatchReq["QueueUrl"] = resultQueueUrl;
2164+
2165+
NJson::TJsonValue entry0;
2166+
entry0["Id"] = "Id-0";
2167+
entry0["ReceiptHandle"] = receiptHandle0;
2168+
entry0["VisibilityTimeout"] = 1;
2169+
2170+
NJson::TJsonValue entry1;
2171+
entry1["Id"] = "Id-1";
2172+
entry1["ReceiptHandle"] = receiptHandle1;
2173+
entry1["VisibilityTimeout"] = 2;
2174+
2175+
NJson::TJsonArray changeVisibilityEntries = {entry0, entry1};
2176+
changeMessageVisibilityBatchReq["Entries"] = changeVisibilityEntries;
2177+
2178+
res = SendHttpRequest(
2179+
"/Root", "AmazonSQS.ChangeMessageVisibilityBatch",
2180+
std::move(changeMessageVisibilityBatchReq),
2181+
FormAuthorizationStr("ru-central1")
2182+
);
2183+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
2184+
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
2185+
UNIT_ASSERT_VALUES_EQUAL(json["Successful"].GetArray().size(), 2);
2186+
UNIT_ASSERT_VALUES_EQUAL(json["Successful"][0]["Id"], "Id-0");
2187+
UNIT_ASSERT_VALUES_EQUAL(json["Successful"][1]["Id"], "Id-1");
2188+
}
20552189
} // Y_UNIT_TEST_SUITE(TestHttpProxy)

ydb/public/api/protos/draft/ymq.proto

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ message ChangeMessageVisibilityResult {
2525
message ChangeMessageVisibilityBatchRequestEntry {
2626
string id = 1;
2727
string receipt_handle = 2;
28-
int32 visibility_timeout = 3;
28+
optional int32 visibility_timeout = 3;
2929
}
3030

3131
message ChangeMessageVisibilityBatchRequest {
@@ -128,7 +128,7 @@ message GetQueueAttributesResult {
128128
message GetQueueUrlRequest {
129129
Ydb.Operations.OperationParams operation_params = 1;
130130
string queue_name = 2;
131-
string queue_owner_aws_account_id = 3;
131+
optional string queue_owner_aws_account_id = 3;
132132
}
133133

134134
message GetQueueUrlResponse {
@@ -141,9 +141,9 @@ message GetQueueUrlResult {
141141

142142
message ListQueuesRequest {
143143
Ydb.Operations.OperationParams operation_params = 1;
144-
int64 max_results = 2;
145-
string next_token = 3;
146-
string queue_name_prefix = 4;
144+
optional int64 max_results = 2;
145+
optional string next_token = 3;
146+
optional string queue_name_prefix = 4;
147147
}
148148

149149
message ListQueuesResponse {
@@ -178,13 +178,13 @@ message MessageAttribute {
178178
message ReceiveMessageRequest {
179179
Ydb.Operations.OperationParams operation_params = 1;
180180
repeated string attribute_names = 2;
181-
int32 max_number_of_messages = 3;
181+
optional int32 max_number_of_messages = 3;
182182
repeated string message_attribute_names = 4;
183183
repeated string message_system_attribute_names = 5;
184184
string queue_url = 6;
185-
string receive_request_attempt_id = 7;
186-
int32 visibility_timeout = 8;
187-
int32 wait_time_seconds = 9;
185+
optional string receive_request_attempt_id = 7;
186+
optional int32 visibility_timeout = 8;
187+
optional int32 wait_time_seconds = 9;
188188
}
189189

190190
message ReceiveMessageResponse {
@@ -207,11 +207,11 @@ message ReceiveMessageResult {
207207

208208
message SendMessageRequest {
209209
Ydb.Operations.OperationParams operation_params = 1;
210-
int32 delay_seconds = 2;
210+
optional int32 delay_seconds = 2;
211211
map<string, MessageAttribute> message_attributes = 3;
212212
string message_body = 4;
213-
string message_deduplication_id = 5;
214-
string message_group_id = 6;
213+
optional string message_deduplication_id = 5;
214+
optional string message_group_id = 6;
215215
map<string, MessageAttribute> message_system_attributes = 7;
216216
string queue_url = 8;
217217
}
@@ -237,11 +237,11 @@ message BatchResultErrorEntry {
237237

238238
message SendMessageBatchRequestEntry {
239239
string id = 1;
240-
int32 delay_seconds = 2;
240+
optional int32 delay_seconds = 2;
241241
map<string, MessageAttribute> message_attributes = 3;
242242
string message_body = 4;
243-
string message_deduplication_id = 5;
244-
string message_group_id = 6;
243+
optional string message_deduplication_id = 5;
244+
optional string message_group_id = 6;
245245
map<string, MessageAttribute> message_system_attributes = 7;
246246
string queue_url = 8;
247247
}
@@ -285,8 +285,8 @@ message SetQueueAttributesResult {
285285

286286
message ListDeadLetterSourceQueuesRequest {
287287
Ydb.Operations.OperationParams operation_params = 1;
288-
int32 max_results = 2;
289-
string next_token = 3;
288+
optional int32 max_results = 2;
289+
optional string next_token = 3;
290290
string queue_url = 4;
291291
}
292292

ydb/services/ymq/ymq_proxy.cpp

Lines changed: 43 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,16 @@ using grpc::Status;
3232

3333
namespace NKikimr::NYmq::V1 {
3434

35+
#define COPY_FIELD_IF_PRESENT(from, to) \
36+
if (GetProtoRequest()->Has##from()) { \
37+
result->Set##to(GetProtoRequest()->Get##from()); \
38+
}
39+
40+
#define COPY_FIELD_IF_PRESENT_IN_ENTRY(from, to) \
41+
if (requestEntry.Has##from()) { \
42+
entry->Set##to(requestEntry.Get##from()); \
43+
}
44+
3545
using namespace NGRpcService;
3646
using namespace NGRpcProxy::V1;
3747

@@ -267,17 +277,22 @@ namespace NKikimr::NYmq::V1 {
267277
private:
268278
NKikimr::NSQS::TSendMessageRequest* GetRequest(THolder<TSqsRequest>& requestHolder) override {
269279
auto result = requestHolder->MutableSendMessage();
280+
270281
for (auto& srcAttribute: GetProtoRequest()->Getmessage_attributes()) {
271282
auto dstAttribute = result->MutableMessageAttributes()->Add();
272283
dstAttribute->SetName(srcAttribute.first);
273284
dstAttribute->SetStringValue(srcAttribute.second.Getstring_value());
274285
dstAttribute->SetBinaryValue(srcAttribute.second.Getbinary_value());
275286
dstAttribute->SetDataType(srcAttribute.second.Getdata_type());
276287
}
277-
result->SetMessageDeduplicationId(GetProtoRequest()->message_deduplication_id());
278-
result->SetMessageGroupId(GetProtoRequest()->message_group_id());
279-
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
280-
result->SetMessageBody(GetProtoRequest()->message_body());
288+
289+
COPY_FIELD_IF_PRESENT(delay_seconds, DelaySeconds);
290+
COPY_FIELD_IF_PRESENT(message_deduplication_id, MessageDeduplicationId);
291+
COPY_FIELD_IF_PRESENT(message_group_id, MessageGroupId);
292+
293+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url())->second);
294+
295+
result->SetMessageBody(GetProtoRequest()->Getmessage_body());
281296

282297
return result;
283298
}
@@ -363,6 +378,14 @@ namespace NKikimr::NYmq::V1 {
363378
private:
364379
NKikimr::NSQS::TReceiveMessageRequest* GetRequest(THolder<TSqsRequest>& requestHolder) override {
365380
auto result = requestHolder->MutableReceiveMessage();
381+
382+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
383+
384+
COPY_FIELD_IF_PRESENT(max_number_of_messages, MaxNumberOfMessages);
385+
COPY_FIELD_IF_PRESENT(receive_request_attempt_id, ReceiveRequestAttemptId);
386+
COPY_FIELD_IF_PRESENT(visibility_timeout, VisibilityTimeout);
387+
COPY_FIELD_IF_PRESENT(wait_time_seconds, WaitTimeSeconds);
388+
366389
auto systemAttributeNames = GetProtoRequest()->Getmessage_system_attribute_names();
367390
// We ignore AttributeNames if SystemAttributeNames is present,
368391
// because AttributeNames is deprecated in favour of SystemAttributeNames
@@ -376,14 +399,11 @@ namespace NKikimr::NYmq::V1 {
376399
result->SetAttributeName(i, attributeNames[i]);
377400
}
378401
}
379-
result->SetMaxNumberOfMessages(GetProtoRequest()->max_number_of_messages());
402+
380403
for (int i = 0; i < GetProtoRequest()->Getmessage_attribute_names().size(); i++) {
381404
result->SetMessageAttributeName(i, GetProtoRequest()->Getmessage_attribute_names()[i]);
382405
}
383-
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
384-
result->SetReceiveRequestAttemptId(GetProtoRequest()->Getreceive_request_attempt_id());
385-
result->SetVisibilityTimeout(GetProtoRequest()->Getvisibility_timeout());
386-
result->SetWaitTimeSeconds(GetProtoRequest()->Getwait_time_seconds());
406+
387407
return result;
388408
}
389409

@@ -559,7 +579,7 @@ namespace NKikimr::NYmq::V1 {
559579
private:
560580
NKikimr::NSQS::TListQueuesRequest* GetRequest(THolder<TSqsRequest>& requestHolder) override {
561581
auto result = requestHolder->MutableListQueues();
562-
result->SetQueueNamePrefix(GetProtoRequest()->Getqueue_name_prefix());
582+
COPY_FIELD_IF_PRESENT(queue_name_prefix, QueueNamePrefix);
563583
return result;
564584
}
565585
};
@@ -819,20 +839,26 @@ namespace NKikimr::NYmq::V1 {
819839
private:
820840
NKikimr::NSQS::TSendMessageBatchRequest* GetRequest(THolder<TSqsRequest>& requestHolder) override {
821841
auto result = requestHolder->MutableSendMessageBatch();
842+
822843
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url())->second);
844+
823845
for (auto& requestEntry : GetProtoRequest()->Getentries()) {
824846
auto entry = requestHolder->MutableSendMessageBatch()->MutableEntries()->Add();
847+
825848
entry->SetId(requestEntry.Getid());
849+
entry->SetMessageBody(requestEntry.Getmessage_body());
850+
851+
COPY_FIELD_IF_PRESENT_IN_ENTRY(delay_seconds, DelaySeconds);
852+
COPY_FIELD_IF_PRESENT_IN_ENTRY(message_deduplication_id, MessageDeduplicationId);
853+
COPY_FIELD_IF_PRESENT_IN_ENTRY(message_group_id, MessageGroupId);
854+
826855
for (auto& srcAttribute: requestEntry.Getmessage_attributes()) {
827856
auto dstAttribute = entry->MutableMessageAttributes()->Add();
828857
dstAttribute->SetName(srcAttribute.first);
829858
dstAttribute->SetStringValue(srcAttribute.second.Getstring_value());
830859
dstAttribute->SetBinaryValue(srcAttribute.second.Getbinary_value());
831860
dstAttribute->SetDataType(srcAttribute.second.Getdata_type());
832861
}
833-
entry->SetMessageDeduplicationId(requestEntry.Getmessage_deduplication_id());
834-
entry->SetMessageGroupId(requestEntry.Getmessage_group_id());
835-
entry->SetMessageBody(requestEntry.Getmessage_body());
836862
}
837863
return result;
838864
}
@@ -936,12 +962,15 @@ namespace NKikimr::NYmq::V1 {
936962
for (auto& requestEntry : GetProtoRequest()->Getentries()) {
937963
auto entry = requestHolder->MutableChangeMessageVisibilityBatch()->MutableEntries()->Add();
938964
entry->SetId(requestEntry.Getid());
939-
entry->SetVisibilityTimeout(requestEntry.Getvisibility_timeout());
940965
entry->SetReceiptHandle(requestEntry.Getreceipt_handle());
966+
COPY_FIELD_IF_PRESENT_IN_ENTRY(visibility_timeout, VisibilityTimeout)
941967
}
942968
return result;
943969
}
944970
};
971+
972+
#undef COPY_FIELD_IF_PRESENT
973+
#undef COPY_FIELD_IF_PRESENT_IN_ENTRY
945974
}
946975

947976
namespace NKikimr::NGRpcService {

0 commit comments

Comments
 (0)