Skip to content

Commit 9f97eb5

Browse files
Add SendMessageBatch request to SQS Json API
1 parent e9b5b49 commit 9f97eb5

File tree

10 files changed

+161
-15
lines changed

10 files changed

+161
-15
lines changed

ydb/core/grpc_services/service_ymq.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,6 @@ void DoYmqPurgeQueueRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityPro
2222
void DoYmqDeleteQueueRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
2323
void DoYmqChangeMessageVisibilityRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
2424
void DoYmqSetQueueAttributesRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
25+
void DoYmqSendMessageBatchRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
2526
}
2627
}

ydb/core/http_proxy/http_req.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -530,6 +530,8 @@ namespace NKikimr::NHttpProxy {
530530
action = NSQS::EAction::ChangeMessageVisibility;
531531
} else if (Method == "SetQueueAttributes") {
532532
action = NSQS::EAction::SetQueueAttributes;
533+
} else if (Method == "SendMessageBatch") {
534+
action = NSQS::EAction::SendMessageBatch;
533535
}
534536

535537
requestHolder->SetRequestId(HttpContext.RequestId);
@@ -1066,6 +1068,7 @@ namespace NKikimr::NHttpProxy {
10661068
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(DeleteQueue);
10671069
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(ChangeMessageVisibility);
10681070
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(SetQueueAttributes);
1071+
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(SendMessageBatch);
10691072
#undef DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN
10701073
}
10711074

ydb/core/http_proxy/ut/http_proxy_ut.h

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1844,7 +1844,6 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) {
18441844
NJson::TJsonValue attributes;
18451845
attributes["DelaySeconds"] = "1";
18461846
createQueueReq["Attributes"] = attributes;
1847-
18481847
auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", std::move(createQueueReq), FormAuthorizationStr("ru-central1"));
18491848
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
18501849
NJson::TJsonValue json;
@@ -1872,4 +1871,39 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) {
18721871
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &resultJson));
18731872
UNIT_ASSERT_VALUES_EQUAL(resultJson["Attributes"]["DelaySeconds"], "2");
18741873
}
1874+
1875+
Y_UNIT_TEST_F(TestSendMessageBatch, THttpProxyTestMock) {
1876+
auto createQueueReq = CreateSqsCreateQueueRequest();
1877+
auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", std::move(createQueueReq), FormAuthorizationStr("ru-central1"));
1878+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
1879+
NJson::TJsonValue json;
1880+
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
1881+
TString resultQueueUrl = GetByPath<TString>(json, "QueueUrl");
1882+
UNIT_ASSERT(resultQueueUrl.EndsWith("ExampleQueueName"));
1883+
1884+
NJson::TJsonValue message0;
1885+
message0["Id"] = "Id-0";
1886+
message0["MessageBody"] = "MessageBody-0";
1887+
message0["MessageDeduplicationId"] = "MessageDeduplicationId-0";
1888+
1889+
NJson::TJsonValue message1;
1890+
message1["Id"] = "Id-1";
1891+
message1["MessageBody"] = "MessageBody-1";
1892+
message1["MessageDeduplicationId"] = "MessageDeduplicationId-1";
1893+
1894+
NJson::TJsonArray entries = {message0, message1};
1895+
1896+
NJson::TJsonValue sendMessageBatchReq;
1897+
sendMessageBatchReq["QueueUrl"] = resultQueueUrl;
1898+
sendMessageBatchReq["Entries"] = entries;
1899+
1900+
res = SendHttpRequest("/Root", "AmazonSQS.SendMessageBatch", std::move(sendMessageBatchReq), FormAuthorizationStr("ru-central1"));
1901+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
1902+
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
1903+
UNIT_ASSERT(json["Successful"].GetArray().size() == 2);
1904+
auto succesful0 = json["Successful"][0];
1905+
UNIT_ASSERT(succesful0["Id"] == "Id-0");
1906+
UNIT_ASSERT(!GetByPath<TString>(succesful0, "Md5OfMessageBody").empty());
1907+
UNIT_ASSERT(!GetByPath<TString>(succesful0, "MessageId").empty());
1908+
}
18751909
} // Y_UNIT_TEST_SUITE(TestHttpProxy)

ydb/library/http_proxy/error/error.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "error.h"
2+
#include "util/generic/maybe.h"
23

34
namespace NKikimr::NSQS {
45

@@ -42,6 +43,13 @@ ui32 TErrorClass::GetId(const TString& code) {
4243
: it->second;
4344
};
4445

46+
TMaybe<ui32> TErrorClass::GetHttpStatus(const TString& code) {
47+
auto idIt = NKikimr::NSQS::TErrorClass::ErrorToId.find(code);
48+
if (idIt == NKikimr::NSQS::TErrorClass::ErrorToId.end()) {
49+
return Nothing();
50+
}
51+
return get<1>(IdToErrorAndCode.find(idIt->second)->second);
52+
};
4553

4654
namespace NErrors {
4755
extern const TErrorClass ACCESS_DENIED = {

ydb/library/http_proxy/error/error.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ struct TErrorClass {
2323

2424
static const std::tuple<TString, ui32> GetErrorAndCode(ui32 id);
2525
static ui32 GetId(const TString& code);
26+
static TMaybe<ui32> GetHttpStatus(const TString& code);
2627

2728
private:
2829
static THashSet<TString> RegisteredCodes;

ydb/public/api/grpc/draft/ydb_ymq_v1.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,5 @@ service YmqService {
2121
rpc DeleteQueue(DeleteQueueRequest) returns (DeleteQueueResponse);
2222
rpc ChangeMessageVisibility(ChangeMessageVisibilityRequest) returns (ChangeMessageVisibilityResponse);
2323
rpc SetQueueAttributes(SetQueueAttributesRequest) returns (SetQueueAttributesResponse);
24+
rpc SendMessageBatch(SendMessageBatchRequest) returns (SendMessageBatchResponse);
2425
}

ydb/public/api/protos/draft/ymq.proto

Lines changed: 43 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -228,28 +228,41 @@ message SendMessageResult {
228228
string sequence_number = 5;
229229
}
230230

231-
message SendMessageBatchRequest {
232-
Ydb.Operations.OperationParams operation_params = 1;
233-
repeated SendMessageRequest entries = 2;
234-
}
235-
236-
message SendMessageBatchResponse {
237-
Ydb.Operations.Operation operation = 1;
238-
}
239-
240231
message BatchResultErrorEntry {
241232
string code = 1;
242233
string id = 2;
243234
bool sender_fault = 3;
244235
string message = 4;
245236
}
246237

238+
message SendMessageBatchRequestEntry {
239+
string id = 1;
240+
int32 delay_seconds = 2;
241+
map<string, MessageAttribute> message_attributes = 3;
242+
string message_body = 4;
243+
string message_deduplication_id = 5;
244+
string message_group_id = 6;
245+
map<string, MessageAttribute> message_system_attributes = 7;
246+
string queue_url = 8;
247+
}
248+
247249
message SendMessageBatchResultEntry {
248-
string md5_of_message_attributes = 1;
249-
string md5_of_message_body= 2;
250-
string md5_of_message_system_attributes= 3;
251-
string message_id = 4;
252-
string sequence_number = 5;
250+
string id = 1;
251+
string md5_of_message_body = 2;
252+
string message_id = 3;
253+
string md5_of_message_attributes = 4;
254+
string md5_of_message_system_attributes = 5;
255+
string sequence_number = 6;
256+
}
257+
258+
message SendMessageBatchRequest {
259+
Ydb.Operations.OperationParams operation_params = 1;
260+
repeated SendMessageBatchRequestEntry entries = 2;
261+
string queue_url = 3;
262+
}
263+
264+
message SendMessageBatchResponse {
265+
Ydb.Operations.Operation operation = 1;
253266
}
254267

255268
message SendMessageBatchResult {
@@ -269,3 +282,19 @@ message SetQueueAttributesResponse {
269282

270283
message SetQueueAttributesResult {
271284
}
285+
286+
message ListDeadLetterSourceQueuesRequest {
287+
Ydb.Operations.OperationParams operation_params = 1;
288+
int32 max_results = 2;
289+
string next_token = 3;
290+
string queue_url = 4;
291+
}
292+
293+
message ListDeadLetterSourceQueuesResponse {
294+
Ydb.Operations.Operation operation = 1;
295+
}
296+
297+
message ListDeadLetterSourceQueuesResult {
298+
string next_token = 1;
299+
repeated string queue_urls = 2;
300+
}

ydb/services/ymq/grpc_service.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ void TGRpcYmqService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger)
4343
ADD_REQUEST(DeleteQueue, DoYmqDeleteQueueRequest, nullptr, Off)
4444
ADD_REQUEST(ChangeMessageVisibility, DoYmqChangeMessageVisibilityRequest, nullptr, Off)
4545
ADD_REQUEST(SetQueueAttributes, DoYmqSetQueueAttributesRequest, nullptr, Off)
46+
ADD_REQUEST(SendMessageBatch, DoYmqSendMessageBatchRequest, nullptr, Off)
4647

4748
#undef ADD_REQUEST
4849
}

ydb/services/ymq/ymq_proxy.cpp

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -708,10 +708,12 @@ namespace NKikimr::NYmq::V1 {
708708

709709
Ydb::Ymq::V1::SetQueueAttributesResult GetResult(const NKikimrClient::TSqsResponse&) override {
710710
Ydb::Ymq::V1::SetQueueAttributesResult result;
711+
711712
return result;
712713
}
713714
};
714715

716+
715717
void AddAttribute(THolder<TSqsRequest>& requestHolder, const TString& name, TString value) {
716718
auto attribute = requestHolder->MutableSetQueueAttributes()->MutableAttributes()->Add();
717719
attribute->SetName(name);
@@ -735,6 +737,70 @@ namespace NKikimr::NYmq::V1 {
735737
return result;
736738
}
737739
};
740+
741+
class TSendMessageBatchReplyCallback : public TReplyCallback<
742+
NKikimr::NSQS::TSendMessageBatchResponse,
743+
Ydb::Ymq::V1::SendMessageBatchResult> {
744+
public:
745+
using TReplyCallback::TReplyCallback;
746+
747+
private:
748+
const NKikimr::NSQS::TSendMessageBatchResponse& GetResponse(const NKikimrClient::TSqsResponse& resp) override {
749+
return resp.GetSendMessageBatch();
750+
}
751+
752+
Ydb::Ymq::V1::SendMessageBatchResult GetResult(const NKikimrClient::TSqsResponse& response) override {
753+
Ydb::Ymq::V1::SendMessageBatchResult result;
754+
response.GetSendMessageBatch();
755+
for (auto& entry : response.GetSendMessageBatch().GetEntries()) {
756+
if (entry.GetError().HasErrorCode()) {
757+
auto currentFailed = result.Addfailed();
758+
currentFailed->Setcode(entry.GetError().GetErrorCode());
759+
currentFailed->Setid(entry.GetId());
760+
currentFailed->Setmessage(entry.GetError().GetMessage());
761+
762+
ui32 httpStatus = NSQS::TErrorClass::GetHttpStatus(entry.GetError().GetErrorCode()).GetOrElse(400);
763+
currentFailed->Setsender_fault(400 <= httpStatus && httpStatus < 500);
764+
} else {
765+
auto currentSuccessful = result.Addsuccessful();
766+
currentSuccessful->Setid(entry.GetId());
767+
currentSuccessful->Setmd5_of_message_body(entry.GetMD5OfMessageBody());
768+
currentSuccessful->Setmessage_id(entry.GetMessageId());
769+
currentSuccessful->Setsequence_number(std::to_string(entry.GetSequenceNumber()));
770+
}
771+
}
772+
return result;
773+
}
774+
};
775+
776+
class TSendMessageBatchActor : public TRpcRequestActor<
777+
TEvYmqSendMessageBatchRequest,
778+
NKikimr::NSQS::TSendMessageBatchRequest,
779+
TSendMessageBatchReplyCallback> {
780+
public:
781+
using TRpcRequestActor::TRpcRequestActor;
782+
783+
private:
784+
NKikimr::NSQS::TSendMessageBatchRequest* GetRequest(THolder<TSqsRequest>& requestHolder) override {
785+
auto result = requestHolder->MutableSendMessageBatch();
786+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url())->second);
787+
for (auto& requestEntry : GetProtoRequest()->Getentries()) {
788+
auto entry = requestHolder->MutableSendMessageBatch()->AddEntries();
789+
entry->SetId(requestEntry.Getid());
790+
for (auto& srcAttribute: requestEntry.Getmessage_attributes()) {
791+
auto dstAttribute = entry->MutableMessageAttributes()->Add();
792+
dstAttribute->SetName(srcAttribute.first);
793+
dstAttribute->SetStringValue(srcAttribute.second.Getstring_value());
794+
dstAttribute->SetBinaryValue(srcAttribute.second.Getbinary_value());
795+
dstAttribute->SetDataType(srcAttribute.second.Getdata_type());
796+
}
797+
entry->SetMessageDeduplicationId(requestEntry.Getmessage_deduplication_id());
798+
entry->SetMessageGroupId(requestEntry.Getmessage_group_id());
799+
entry->SetMessageBody(requestEntry.Getmessage_body());
800+
}
801+
return result;
802+
}
803+
};
738804
}
739805

740806
namespace NKikimr::NGRpcService {
@@ -759,5 +825,6 @@ DECLARE_RPC(PurgeQueue);
759825
DECLARE_RPC(DeleteQueue);
760826
DECLARE_RPC(ChangeMessageVisibility);
761827
DECLARE_RPC(SetQueueAttributes);
828+
DECLARE_RPC(SendMessageBatch);
762829

763830
}

ydb/services/ymq/ymq_proxy.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ using TEvYmqPurgeQueueRequest = TGrpcRequestOperationCall<Ydb::Ymq::V1::PurgeQue
2424
using TEvYmqDeleteQueueRequest = TGrpcRequestOperationCall<Ydb::Ymq::V1::DeleteQueueRequest, Ydb::Ymq::V1::DeleteQueueResponse>;
2525
using TEvYmqChangeMessageVisibilityRequest = TGrpcRequestOperationCall<Ydb::Ymq::V1::ChangeMessageVisibilityRequest, Ydb::Ymq::V1::ChangeMessageVisibilityResponse>;
2626
using TEvYmqSetQueueAttributesRequest = TGrpcRequestOperationCall<Ydb::Ymq::V1::SetQueueAttributesRequest, Ydb::Ymq::V1::SetQueueAttributesResponse>;
27+
using TEvYmqSendMessageBatchRequest = TGrpcRequestOperationCall<Ydb::Ymq::V1::SendMessageBatchRequest, Ydb::Ymq::V1::SendMessageBatchResponse>;
2728

2829
}
2930
}

0 commit comments

Comments
 (0)