Skip to content

Commit 8e9eab8

Browse files
Add DeleteMessageBatch request to SQS Json API
1 parent 9f97eb5 commit 8e9eab8

File tree

7 files changed

+151
-1
lines changed

7 files changed

+151
-1
lines changed

ydb/core/grpc_services/service_ymq.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,6 @@ void DoYmqDeleteQueueRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityPr
2323
void DoYmqChangeMessageVisibilityRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
2424
void DoYmqSetQueueAttributesRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
2525
void DoYmqSendMessageBatchRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
26+
void DoYmqDeleteMessageBatchRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
2627
}
2728
}

ydb/core/http_proxy/http_req.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,8 @@ namespace NKikimr::NHttpProxy {
532532
action = NSQS::EAction::SetQueueAttributes;
533533
} else if (Method == "SendMessageBatch") {
534534
action = NSQS::EAction::SendMessageBatch;
535+
}else if (Method == "DeleteMessageBatch") {
536+
action = NSQS::EAction::DeleteMessageBatch;
535537
}
536538

537539
requestHolder->SetRequestId(HttpContext.RequestId);
@@ -1069,6 +1071,7 @@ namespace NKikimr::NHttpProxy {
10691071
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(ChangeMessageVisibility);
10701072
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(SetQueueAttributes);
10711073
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(SendMessageBatch);
1074+
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(DeleteMessageBatch);
10721075
#undef DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN
10731076
}
10741077

ydb/core/http_proxy/ut/http_proxy_ut.h

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1906,4 +1906,93 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) {
19061906
UNIT_ASSERT(!GetByPath<TString>(succesful0, "Md5OfMessageBody").empty());
19071907
UNIT_ASSERT(!GetByPath<TString>(succesful0, "MessageId").empty());
19081908
}
1909+
1910+
Y_UNIT_TEST_F(TestDeleteMessageBatch, THttpProxyTestMock) {
1911+
auto createQueueReq = CreateSqsCreateQueueRequest();
1912+
auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", std::move(createQueueReq), FormAuthorizationStr("ru-central1"));
1913+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
1914+
NJson::TJsonValue json;
1915+
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
1916+
TString resultQueueUrl = GetByPath<TString>(json, "QueueUrl");
1917+
UNIT_ASSERT(resultQueueUrl.EndsWith("ExampleQueueName"));
1918+
1919+
NJson::TJsonValue message0;
1920+
message0["Id"] = "Id-0";
1921+
message0["MessageBody"] = "MessageBody-0";
1922+
message0["MessageDeduplicationId"] = "MessageDeduplicationId-0";
1923+
1924+
NJson::TJsonValue message1;
1925+
message1["Id"] = "Id-1";
1926+
message1["MessageBody"] = "MessageBody-1";
1927+
message1["MessageDeduplicationId"] = "MessageDeduplicationId-1";
1928+
1929+
NJson::TJsonArray entries = {message0, message1};
1930+
1931+
NJson::TJsonValue sendMessageBatchReq;
1932+
sendMessageBatchReq["QueueUrl"] = resultQueueUrl;
1933+
sendMessageBatchReq["Entries"] = entries;
1934+
1935+
res = SendHttpRequest("/Root", "AmazonSQS.SendMessageBatch", std::move(sendMessageBatchReq), FormAuthorizationStr("ru-central1"));
1936+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
1937+
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
1938+
UNIT_ASSERT(json["Successful"].GetArray().size() == 2);
1939+
1940+
TVector<NJson::TJsonValue> messages;
1941+
for (int i = 0; i < 20; ++i) {
1942+
NJson::TJsonValue receiveMessageReq;
1943+
receiveMessageReq["QueueUrl"] = resultQueueUrl;
1944+
res = SendHttpRequest("/Root", "AmazonSQS.ReceiveMessage", std::move(receiveMessageReq), FormAuthorizationStr("ru-central1"));
1945+
if (res.Body != TString("{}")) {
1946+
NJson::ReadJsonTree(res.Body, &json);
1947+
if (json["Messages"].GetArray().size() == 2) {
1948+
messages.push_back(json["Messages"][0]);
1949+
messages.push_back(json["Messages"][1]);
1950+
break;
1951+
}
1952+
if (json["Messages"].GetArray().size() == 1) {
1953+
messages.push_back(json["Messages"][0]);
1954+
if (messages.size() == 2) {
1955+
break;
1956+
}
1957+
}
1958+
}
1959+
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
1960+
}
1961+
1962+
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2);
1963+
1964+
auto receiptHandle0 = messages[0]["ReceiptHandle"].GetString();
1965+
UNIT_ASSERT(!receiptHandle0.Empty());
1966+
auto receiptHandle1 = messages[1]["ReceiptHandle"].GetString();
1967+
UNIT_ASSERT(!receiptHandle1.Empty());
1968+
1969+
NJson::TJsonValue deleteMessageBatchReq;
1970+
deleteMessageBatchReq["QueueUrl"] = resultQueueUrl;
1971+
1972+
NJson::TJsonValue entry0;
1973+
entry0["Id"] = "Id-0";
1974+
entry0["ReceiptHandle"] = receiptHandle0;
1975+
1976+
NJson::TJsonValue entry1;
1977+
entry1["Id"] = "Id-1";
1978+
entry1["ReceiptHandle"] = receiptHandle1;
1979+
1980+
NJson::TJsonArray deleteEntries = {entry0, entry1};
1981+
deleteMessageBatchReq["Entries"] = deleteEntries;
1982+
1983+
res = SendHttpRequest("/Root", "AmazonSQS.DeleteMessageBatch", std::move(deleteMessageBatchReq), FormAuthorizationStr("ru-central1"));
1984+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
1985+
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
1986+
UNIT_ASSERT_VALUES_EQUAL(json["Successful"].GetArray().size(), 2);
1987+
UNIT_ASSERT_VALUES_EQUAL(json["Successful"][0]["Id"], "Id-0");
1988+
UNIT_ASSERT_VALUES_EQUAL(json["Successful"][1]["Id"], "Id-1");
1989+
1990+
NJson::TJsonValue receiveMessageReq;
1991+
receiveMessageReq["QueueUrl"] = resultQueueUrl;
1992+
res = SendHttpRequest("/Root", "AmazonSQS.ReceiveMessage", std::move(receiveMessageReq), FormAuthorizationStr("ru-central1"));
1993+
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
1994+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
1995+
UNIT_ASSERT_VALUES_EQUAL(json["Messages"].GetArray().size(), 0);
1996+
1997+
}
19091998
} // Y_UNIT_TEST_SUITE(TestHttpProxy)

ydb/public/api/grpc/draft/ydb_ymq_v1.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,5 @@ service YmqService {
2222
rpc ChangeMessageVisibility(ChangeMessageVisibilityRequest) returns (ChangeMessageVisibilityResponse);
2323
rpc SetQueueAttributes(SetQueueAttributesRequest) returns (SetQueueAttributesResponse);
2424
rpc SendMessageBatch(SendMessageBatchRequest) returns (SendMessageBatchResponse);
25+
rpc DeleteMessageBatch(DeleteMessageBatchRequest) returns (DeleteMessageBatchResponse);
2526
}

ydb/services/ymq/grpc_service.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ void TGRpcYmqService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger)
4444
ADD_REQUEST(ChangeMessageVisibility, DoYmqChangeMessageVisibilityRequest, nullptr, Off)
4545
ADD_REQUEST(SetQueueAttributes, DoYmqSetQueueAttributesRequest, nullptr, Off)
4646
ADD_REQUEST(SendMessageBatch, DoYmqSendMessageBatchRequest, nullptr, Off)
47+
ADD_REQUEST(DeleteMessageBatch, DoYmqDeleteMessageBatchRequest, nullptr, Off)
4748

4849
#undef ADD_REQUEST
4950
}

ydb/services/ymq/ymq_proxy.cpp

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -785,7 +785,7 @@ namespace NKikimr::NYmq::V1 {
785785
auto result = requestHolder->MutableSendMessageBatch();
786786
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url())->second);
787787
for (auto& requestEntry : GetProtoRequest()->Getentries()) {
788-
auto entry = requestHolder->MutableSendMessageBatch()->AddEntries();
788+
auto entry = requestHolder->MutableSendMessageBatch()->MutableEntries()->Add();
789789
entry->SetId(requestEntry.Getid());
790790
for (auto& srcAttribute: requestEntry.Getmessage_attributes()) {
791791
auto dstAttribute = entry->MutableMessageAttributes()->Add();
@@ -801,6 +801,59 @@ namespace NKikimr::NYmq::V1 {
801801
return result;
802802
}
803803
};
804+
805+
class TDeleteMessageBatchReplyCallback : public TReplyCallback<
806+
NKikimr::NSQS::TDeleteMessageBatchResponse,
807+
Ydb::Ymq::V1::DeleteMessageBatchResult> {
808+
public:
809+
using TReplyCallback::TReplyCallback;
810+
811+
private:
812+
const NKikimr::NSQS::TDeleteMessageBatchResponse& GetResponse(const NKikimrClient::TSqsResponse& resp) override {
813+
return resp.GetDeleteMessageBatch();
814+
}
815+
816+
Ydb::Ymq::V1::DeleteMessageBatchResult GetResult(const NKikimrClient::TSqsResponse& response) override {
817+
Ydb::Ymq::V1::DeleteMessageBatchResult result;
818+
auto entries = response.GetDeleteMessageBatch().GetEntries();
819+
for (auto i = 0; i < entries.size(); i++) {
820+
auto &entry = response.GetDeleteMessageBatch().GetEntries()[i];
821+
if (entry.GetError().HasErrorCode()) {
822+
auto currentFailed = result.Addfailed();
823+
currentFailed->Setcode(entry.GetError().GetErrorCode());
824+
currentFailed->Setid(entry.GetId());
825+
currentFailed->Setmessage(entry.GetError().GetMessage());
826+
827+
ui32 httpStatus = NSQS::TErrorClass::GetHttpStatus(entry.GetError().GetErrorCode()).GetOrElse(400);
828+
currentFailed->Setsender_fault(400 <= httpStatus && httpStatus < 500);
829+
} else {
830+
auto currentSuccessful = result.Addsuccessful();
831+
currentSuccessful->Setid(entry.GetId());
832+
}
833+
}
834+
return result;
835+
}
836+
};
837+
838+
class TDeleteMessageBatchActor : public TRpcRequestActor<
839+
TEvYmqDeleteMessageBatchRequest,
840+
NKikimr::NSQS::TDeleteMessageBatchRequest,
841+
TDeleteMessageBatchReplyCallback> {
842+
public:
843+
using TRpcRequestActor::TRpcRequestActor;
844+
845+
private:
846+
NKikimr::NSQS::TDeleteMessageBatchRequest* GetRequest(THolder<TSqsRequest>& requestHolder) override {
847+
auto result = requestHolder->MutableDeleteMessageBatch();
848+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url())->second);
849+
for (auto& requestEntry : GetProtoRequest()->Getentries()) {
850+
auto entry = requestHolder->MutableDeleteMessageBatch()->AddEntries();
851+
entry->SetId(requestEntry.Getid());
852+
entry->SetReceiptHandle(requestEntry.Getreceipt_handle());
853+
}
854+
return result;
855+
}
856+
};
804857
}
805858

806859
namespace NKikimr::NGRpcService {
@@ -826,5 +879,6 @@ DECLARE_RPC(DeleteQueue);
826879
DECLARE_RPC(ChangeMessageVisibility);
827880
DECLARE_RPC(SetQueueAttributes);
828881
DECLARE_RPC(SendMessageBatch);
882+
DECLARE_RPC(DeleteMessageBatch);
829883

830884
}

ydb/services/ymq/ymq_proxy.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ using TEvYmqDeleteQueueRequest = TGrpcRequestOperationCall<Ydb::Ymq::V1::DeleteQ
2525
using TEvYmqChangeMessageVisibilityRequest = TGrpcRequestOperationCall<Ydb::Ymq::V1::ChangeMessageVisibilityRequest, Ydb::Ymq::V1::ChangeMessageVisibilityResponse>;
2626
using TEvYmqSetQueueAttributesRequest = TGrpcRequestOperationCall<Ydb::Ymq::V1::SetQueueAttributesRequest, Ydb::Ymq::V1::SetQueueAttributesResponse>;
2727
using TEvYmqSendMessageBatchRequest = TGrpcRequestOperationCall<Ydb::Ymq::V1::SendMessageBatchRequest, Ydb::Ymq::V1::SendMessageBatchResponse>;
28+
using TEvYmqDeleteMessageBatchRequest = TGrpcRequestOperationCall<Ydb::Ymq::V1::DeleteMessageBatchRequest, Ydb::Ymq::V1::DeleteMessageBatchResponse>;
2829

2930
}
3031
}

0 commit comments

Comments
 (0)