Skip to content

Commit e1a64d9

Browse files
LOGBROKER 8891 fix not using optional fields in sqs json api properly (ydb-platform#8523)
1 parent 0718e63 commit e1a64d9

File tree

3 files changed

+195
-32
lines changed

3 files changed

+195
-32
lines changed

ydb/core/http_proxy/ut/http_proxy_ut.h

Lines changed: 135 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1824,7 +1824,6 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) {
18241824
res = SendHttpRequest("/Root", "AmazonSQS.DeleteQueue", std::move(deleteQueueReq), FormAuthorizationStr("ru-central1"));
18251825
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
18261826

1827-
18281827
for (int i = 0; i < 61; ++i) {
18291828
req = CreateSqsGetQueueUrlRequest();
18301829
res = SendHttpRequest("/Root", "AmazonSQS.GetQueueUrl", std::move(req), FormAuthorizationStr("ru-central1"));
@@ -2042,4 +2041,139 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) {
20422041
UNIT_ASSERT_VALUES_EQUAL(json["QueueUrls"][0], resultQueueUrl);
20432042
}
20442043

2044+
Y_UNIT_TEST_F(TestChangeMessageVisibility, THttpProxyTestMock) {
2045+
auto createQueueReq = CreateSqsCreateQueueRequest();
2046+
auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", std::move(createQueueReq), FormAuthorizationStr("ru-central1"));
2047+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
2048+
NJson::TJsonValue json;
2049+
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
2050+
TString resultQueueUrl = GetByPath<TString>(json, "QueueUrl");
2051+
2052+
NJson::TJsonValue sendMessageReq;
2053+
sendMessageReq["QueueUrl"] = resultQueueUrl;
2054+
auto body = "MessageBody-0";
2055+
sendMessageReq["MessageBody"] = body;
2056+
2057+
res = SendHttpRequest("/Root", "AmazonSQS.SendMessage", std::move(sendMessageReq), FormAuthorizationStr("ru-central1"));
2058+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
2059+
2060+
for (int i = 0; i < 20; ++i) {
2061+
NJson::TJsonValue receiveMessageReq;
2062+
receiveMessageReq["QueueUrl"] = resultQueueUrl;
2063+
res = SendHttpRequest("/Root", "AmazonSQS.ReceiveMessage", std::move(receiveMessageReq), FormAuthorizationStr("ru-central1"));
2064+
if (res.Body != TString("{}")) {
2065+
break;
2066+
}
2067+
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
2068+
}
2069+
2070+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
2071+
2072+
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
2073+
2074+
auto receiptHandle = json["Messages"][0]["ReceiptHandle"].GetString();
2075+
UNIT_ASSERT(!receiptHandle.Empty());
2076+
2077+
NJson::TJsonValue changeMessageVisibility;
2078+
changeMessageVisibility["QueueUrl"] = resultQueueUrl;
2079+
changeMessageVisibility["ReceiptHandle"] = receiptHandle;
2080+
changeMessageVisibility["VisibilityTimeout"] = 1;
2081+
2082+
res = SendHttpRequest(
2083+
"/Root",
2084+
"AmazonSQS.ChangeMessageVisibility",
2085+
std::move(changeMessageVisibility),
2086+
FormAuthorizationStr("ru-central1")
2087+
);
2088+
2089+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
2090+
}
2091+
2092+
Y_UNIT_TEST_F(TestChangeMessageVisibilityBatch, THttpProxyTestMock) {
2093+
auto createQueueReq = CreateSqsCreateQueueRequest();
2094+
auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", std::move(createQueueReq), FormAuthorizationStr("ru-central1"));
2095+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
2096+
NJson::TJsonValue json;
2097+
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
2098+
TString resultQueueUrl = GetByPath<TString>(json, "QueueUrl");
2099+
UNIT_ASSERT(resultQueueUrl.EndsWith("ExampleQueueName"));
2100+
2101+
NJson::TJsonValue message0;
2102+
message0["Id"] = "Id-0";
2103+
message0["MessageBody"] = "MessageBody-0";
2104+
message0["MessageDeduplicationId"] = "MessageDeduplicationId-0";
2105+
2106+
NJson::TJsonValue message1;
2107+
message1["Id"] = "Id-1";
2108+
message1["MessageBody"] = "MessageBody-1";
2109+
message1["MessageDeduplicationId"] = "MessageDeduplicationId-1";
2110+
2111+
NJson::TJsonArray entries = {message0, message1};
2112+
2113+
NJson::TJsonValue sendMessageBatchReq;
2114+
sendMessageBatchReq["QueueUrl"] = resultQueueUrl;
2115+
sendMessageBatchReq["Entries"] = entries;
2116+
2117+
res = SendHttpRequest("/Root", "AmazonSQS.SendMessageBatch", std::move(sendMessageBatchReq), FormAuthorizationStr("ru-central1"));
2118+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
2119+
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
2120+
UNIT_ASSERT(json["Successful"].GetArray().size() == 2);
2121+
2122+
TVector<NJson::TJsonValue> messages;
2123+
for (int i = 0; i < 20; ++i) {
2124+
NJson::TJsonValue receiveMessageReq;
2125+
receiveMessageReq["QueueUrl"] = resultQueueUrl;
2126+
res = SendHttpRequest("/Root", "AmazonSQS.ReceiveMessage", std::move(receiveMessageReq), FormAuthorizationStr("ru-central1"));
2127+
if (res.Body != TString("{}")) {
2128+
NJson::ReadJsonTree(res.Body, &json);
2129+
if (json["Messages"].GetArray().size() == 2) {
2130+
messages.push_back(json["Messages"][0]);
2131+
messages.push_back(json["Messages"][1]);
2132+
break;
2133+
}
2134+
if (json["Messages"].GetArray().size() == 1) {
2135+
messages.push_back(json["Messages"][0]);
2136+
if (messages.size() == 2) {
2137+
break;
2138+
}
2139+
}
2140+
}
2141+
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
2142+
}
2143+
2144+
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2);
2145+
2146+
auto receiptHandle0 = messages[0]["ReceiptHandle"].GetString();
2147+
UNIT_ASSERT(!receiptHandle0.Empty());
2148+
auto receiptHandle1 = messages[1]["ReceiptHandle"].GetString();
2149+
UNIT_ASSERT(!receiptHandle1.Empty());
2150+
2151+
2152+
NJson::TJsonValue changeMessageVisibilityBatchReq;
2153+
changeMessageVisibilityBatchReq["QueueUrl"] = resultQueueUrl;
2154+
2155+
NJson::TJsonValue entry0;
2156+
entry0["Id"] = "Id-0";
2157+
entry0["ReceiptHandle"] = receiptHandle0;
2158+
entry0["VisibilityTimeout"] = 1;
2159+
2160+
NJson::TJsonValue entry1;
2161+
entry1["Id"] = "Id-1";
2162+
entry1["ReceiptHandle"] = receiptHandle1;
2163+
entry1["VisibilityTimeout"] = 2;
2164+
2165+
NJson::TJsonArray changeVisibilityEntries = {entry0, entry1};
2166+
changeMessageVisibilityBatchReq["Entries"] = changeVisibilityEntries;
2167+
2168+
res = SendHttpRequest(
2169+
"/Root", "AmazonSQS.ChangeMessageVisibilityBatch",
2170+
std::move(changeMessageVisibilityBatchReq),
2171+
FormAuthorizationStr("ru-central1")
2172+
);
2173+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
2174+
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
2175+
UNIT_ASSERT_VALUES_EQUAL(json["Successful"].GetArray().size(), 2);
2176+
UNIT_ASSERT_VALUES_EQUAL(json["Successful"][0]["Id"], "Id-0");
2177+
UNIT_ASSERT_VALUES_EQUAL(json["Successful"][1]["Id"], "Id-1");
2178+
}
20452179
} // Y_UNIT_TEST_SUITE(TestHttpProxy)

ydb/public/api/protos/draft/ymq.proto

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ message ChangeMessageVisibilityResult {
2525
message ChangeMessageVisibilityBatchRequestEntry {
2626
string id = 1;
2727
string receipt_handle = 2;
28-
int32 visibility_timeout = 3;
28+
optional int32 visibility_timeout = 3;
2929
}
3030

3131
message ChangeMessageVisibilityBatchRequest {
@@ -128,7 +128,7 @@ message GetQueueAttributesResult {
128128
message GetQueueUrlRequest {
129129
Ydb.Operations.OperationParams operation_params = 1;
130130
string queue_name = 2;
131-
string queue_owner_aws_account_id = 3;
131+
optional string queue_owner_aws_account_id = 3;
132132
}
133133

134134
message GetQueueUrlResponse {
@@ -141,9 +141,9 @@ message GetQueueUrlResult {
141141

142142
message ListQueuesRequest {
143143
Ydb.Operations.OperationParams operation_params = 1;
144-
int64 max_results = 2;
145-
string next_token = 3;
146-
string queue_name_prefix = 4;
144+
optional int64 max_results = 2;
145+
optional string next_token = 3;
146+
optional string queue_name_prefix = 4;
147147
}
148148

149149
message ListQueuesResponse {
@@ -178,13 +178,13 @@ message MessageAttribute {
178178
message ReceiveMessageRequest {
179179
Ydb.Operations.OperationParams operation_params = 1;
180180
repeated string attribute_names = 2;
181-
int32 max_number_of_messages = 3;
181+
optional int32 max_number_of_messages = 3;
182182
repeated string message_attribute_names = 4;
183183
repeated string message_system_attribute_names = 5;
184184
string queue_url = 6;
185-
string receive_request_attempt_id = 7;
186-
int32 visibility_timeout = 8;
187-
int32 wait_time_seconds = 9;
185+
optional string receive_request_attempt_id = 7;
186+
optional int32 visibility_timeout = 8;
187+
optional int32 wait_time_seconds = 9;
188188
}
189189

190190
message ReceiveMessageResponse {
@@ -207,11 +207,11 @@ message ReceiveMessageResult {
207207

208208
message SendMessageRequest {
209209
Ydb.Operations.OperationParams operation_params = 1;
210-
int32 delay_seconds = 2;
210+
optional int32 delay_seconds = 2;
211211
map<string, MessageAttribute> message_attributes = 3;
212212
string message_body = 4;
213-
string message_deduplication_id = 5;
214-
string message_group_id = 6;
213+
optional string message_deduplication_id = 5;
214+
optional string message_group_id = 6;
215215
map<string, MessageAttribute> message_system_attributes = 7;
216216
string queue_url = 8;
217217
}
@@ -237,11 +237,11 @@ message BatchResultErrorEntry {
237237

238238
message SendMessageBatchRequestEntry {
239239
string id = 1;
240-
int32 delay_seconds = 2;
240+
optional int32 delay_seconds = 2;
241241
map<string, MessageAttribute> message_attributes = 3;
242242
string message_body = 4;
243-
string message_deduplication_id = 5;
244-
string message_group_id = 6;
243+
optional string message_deduplication_id = 5;
244+
optional string message_group_id = 6;
245245
map<string, MessageAttribute> message_system_attributes = 7;
246246
string queue_url = 8;
247247
}
@@ -285,8 +285,8 @@ message SetQueueAttributesResult {
285285

286286
message ListDeadLetterSourceQueuesRequest {
287287
Ydb.Operations.OperationParams operation_params = 1;
288-
int32 max_results = 2;
289-
string next_token = 3;
288+
optional int32 max_results = 2;
289+
optional string next_token = 3;
290290
string queue_url = 4;
291291
}
292292

ydb/services/ymq/ymq_proxy.cpp

Lines changed: 43 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,16 @@ using grpc::Status;
3232

3333
namespace NKikimr::NYmq::V1 {
3434

35+
#define COPY_FIELD_IF_PRESENT(from, to) \
36+
if (GetProtoRequest()->Has##from()) { \
37+
result->Set##to(GetProtoRequest()->Get##from()); \
38+
}
39+
40+
#define COPY_FIELD_IF_PRESENT_IN_ENTRY(from, to) \
41+
if (requestEntry.Has##from()) { \
42+
entry->Set##to(requestEntry.Get##from()); \
43+
}
44+
3545
using namespace NGRpcService;
3646
using namespace NGRpcProxy::V1;
3747

@@ -267,17 +277,22 @@ namespace NKikimr::NYmq::V1 {
267277
private:
268278
NKikimr::NSQS::TSendMessageRequest* GetRequest(THolder<TSqsRequest>& requestHolder) override {
269279
auto result = requestHolder->MutableSendMessage();
280+
270281
for (auto& srcAttribute: GetProtoRequest()->Getmessage_attributes()) {
271282
auto dstAttribute = result->MutableMessageAttributes()->Add();
272283
dstAttribute->SetName(srcAttribute.first);
273284
dstAttribute->SetStringValue(srcAttribute.second.Getstring_value());
274285
dstAttribute->SetBinaryValue(srcAttribute.second.Getbinary_value());
275286
dstAttribute->SetDataType(srcAttribute.second.Getdata_type());
276287
}
277-
result->SetMessageDeduplicationId(GetProtoRequest()->message_deduplication_id());
278-
result->SetMessageGroupId(GetProtoRequest()->message_group_id());
279-
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
280-
result->SetMessageBody(GetProtoRequest()->message_body());
288+
289+
COPY_FIELD_IF_PRESENT(delay_seconds, DelaySeconds);
290+
COPY_FIELD_IF_PRESENT(message_deduplication_id, MessageDeduplicationId);
291+
COPY_FIELD_IF_PRESENT(message_group_id, MessageGroupId);
292+
293+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url())->second);
294+
295+
result->SetMessageBody(GetProtoRequest()->Getmessage_body());
281296

282297
return result;
283298
}
@@ -363,6 +378,14 @@ namespace NKikimr::NYmq::V1 {
363378
private:
364379
NKikimr::NSQS::TReceiveMessageRequest* GetRequest(THolder<TSqsRequest>& requestHolder) override {
365380
auto result = requestHolder->MutableReceiveMessage();
381+
382+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
383+
384+
COPY_FIELD_IF_PRESENT(max_number_of_messages, MaxNumberOfMessages);
385+
COPY_FIELD_IF_PRESENT(receive_request_attempt_id, ReceiveRequestAttemptId);
386+
COPY_FIELD_IF_PRESENT(visibility_timeout, VisibilityTimeout);
387+
COPY_FIELD_IF_PRESENT(wait_time_seconds, WaitTimeSeconds);
388+
366389
auto systemAttributeNames = GetProtoRequest()->Getmessage_system_attribute_names();
367390
// We ignore AttributeNames if SystemAttributeNames is present,
368391
// because AttributeNames is deprecated in favour of SystemAttributeNames
@@ -376,14 +399,11 @@ namespace NKikimr::NYmq::V1 {
376399
result->SetAttributeName(i, attributeNames[i]);
377400
}
378401
}
379-
result->SetMaxNumberOfMessages(GetProtoRequest()->max_number_of_messages());
402+
380403
for (int i = 0; i < GetProtoRequest()->Getmessage_attribute_names().size(); i++) {
381404
result->SetMessageAttributeName(i, GetProtoRequest()->Getmessage_attribute_names()[i]);
382405
}
383-
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
384-
result->SetReceiveRequestAttemptId(GetProtoRequest()->Getreceive_request_attempt_id());
385-
result->SetVisibilityTimeout(GetProtoRequest()->Getvisibility_timeout());
386-
result->SetWaitTimeSeconds(GetProtoRequest()->Getwait_time_seconds());
406+
387407
return result;
388408
}
389409

@@ -559,7 +579,7 @@ namespace NKikimr::NYmq::V1 {
559579
private:
560580
NKikimr::NSQS::TListQueuesRequest* GetRequest(THolder<TSqsRequest>& requestHolder) override {
561581
auto result = requestHolder->MutableListQueues();
562-
result->SetQueueNamePrefix(GetProtoRequest()->Getqueue_name_prefix());
582+
COPY_FIELD_IF_PRESENT(queue_name_prefix, QueueNamePrefix);
563583
return result;
564584
}
565585
};
@@ -818,20 +838,26 @@ namespace NKikimr::NYmq::V1 {
818838
private:
819839
NKikimr::NSQS::TSendMessageBatchRequest* GetRequest(THolder<TSqsRequest>& requestHolder) override {
820840
auto result = requestHolder->MutableSendMessageBatch();
841+
821842
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url())->second);
843+
822844
for (auto& requestEntry : GetProtoRequest()->Getentries()) {
823845
auto entry = requestHolder->MutableSendMessageBatch()->MutableEntries()->Add();
846+
824847
entry->SetId(requestEntry.Getid());
848+
entry->SetMessageBody(requestEntry.Getmessage_body());
849+
850+
COPY_FIELD_IF_PRESENT_IN_ENTRY(delay_seconds, DelaySeconds);
851+
COPY_FIELD_IF_PRESENT_IN_ENTRY(message_deduplication_id, MessageDeduplicationId);
852+
COPY_FIELD_IF_PRESENT_IN_ENTRY(message_group_id, MessageGroupId);
853+
825854
for (auto& srcAttribute: requestEntry.Getmessage_attributes()) {
826855
auto dstAttribute = entry->MutableMessageAttributes()->Add();
827856
dstAttribute->SetName(srcAttribute.first);
828857
dstAttribute->SetStringValue(srcAttribute.second.Getstring_value());
829858
dstAttribute->SetBinaryValue(srcAttribute.second.Getbinary_value());
830859
dstAttribute->SetDataType(srcAttribute.second.Getdata_type());
831860
}
832-
entry->SetMessageDeduplicationId(requestEntry.Getmessage_deduplication_id());
833-
entry->SetMessageGroupId(requestEntry.Getmessage_group_id());
834-
entry->SetMessageBody(requestEntry.Getmessage_body());
835861
}
836862
return result;
837863
}
@@ -935,12 +961,15 @@ namespace NKikimr::NYmq::V1 {
935961
for (auto& requestEntry : GetProtoRequest()->Getentries()) {
936962
auto entry = requestHolder->MutableChangeMessageVisibilityBatch()->MutableEntries()->Add();
937963
entry->SetId(requestEntry.Getid());
938-
entry->SetVisibilityTimeout(requestEntry.Getvisibility_timeout());
939964
entry->SetReceiptHandle(requestEntry.Getreceipt_handle());
965+
COPY_FIELD_IF_PRESENT_IN_ENTRY(visibility_timeout, VisibilityTimeout)
940966
}
941967
return result;
942968
}
943969
};
970+
971+
#undef COPY_FIELD_IF_PRESENT
972+
#undef COPY_FIELD_IF_PRESENT_IN_ENTRY
944973
}
945974

946975
namespace NKikimr::NGRpcService {

0 commit comments

Comments
 (0)