Skip to content

Commit 246ab1d

Browse files
Add ChangeMessageVisibilityBatch request to SQS Json API progress
1 parent 8e9eab8 commit 246ab1d

File tree

7 files changed

+61
-0
lines changed

7 files changed

+61
-0
lines changed

ydb/core/grpc_services/service_ymq.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,6 @@ void DoYmqChangeMessageVisibilityRequest(std::unique_ptr<IRequestOpCtx> p, const
2424
void DoYmqSetQueueAttributesRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
2525
void DoYmqSendMessageBatchRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
2626
void DoYmqDeleteMessageBatchRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
27+
void DoYmqChangeMessageVisibilityBatchRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
2728
}
2829
}

ydb/core/http_proxy/http_req.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -534,6 +534,8 @@ namespace NKikimr::NHttpProxy {
534534
action = NSQS::EAction::SendMessageBatch;
535535
}else if (Method == "DeleteMessageBatch") {
536536
action = NSQS::EAction::DeleteMessageBatch;
537+
} else if (Method == "ChangeMessageVisibilityBatch") {
538+
action = NSQS::EAction::ChangeMessageVisibilityBatch;
537539
}
538540

539541
requestHolder->SetRequestId(HttpContext.RequestId);
@@ -1072,6 +1074,7 @@ namespace NKikimr::NHttpProxy {
10721074
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(SetQueueAttributes);
10731075
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(SendMessageBatch);
10741076
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(DeleteMessageBatch);
1077+
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(ChangeMessageVisibilityBatch);
10751078
#undef DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN
10761079
}
10771080

ydb/core/http_proxy/ut/http_proxy_ut.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1995,4 +1995,5 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) {
19951995
UNIT_ASSERT_VALUES_EQUAL(json["Messages"].GetArray().size(), 0);
19961996

19971997
}
1998+
19981999
} // Y_UNIT_TEST_SUITE(TestHttpProxy)

ydb/public/api/grpc/draft/ydb_ymq_v1.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,5 @@ service YmqService {
2323
rpc SetQueueAttributes(SetQueueAttributesRequest) returns (SetQueueAttributesResponse);
2424
rpc SendMessageBatch(SendMessageBatchRequest) returns (SendMessageBatchResponse);
2525
rpc DeleteMessageBatch(DeleteMessageBatchRequest) returns (DeleteMessageBatchResponse);
26+
rpc ChangeMessageVisibilityBatch(ChangeMessageVisibilityBatchRequest) returns (ChangeMessageVisibilityBatchResponse);
2627
}

ydb/services/ymq/grpc_service.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ void TGRpcYmqService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger)
4545
ADD_REQUEST(SetQueueAttributes, DoYmqSetQueueAttributesRequest, nullptr, Off)
4646
ADD_REQUEST(SendMessageBatch, DoYmqSendMessageBatchRequest, nullptr, Off)
4747
ADD_REQUEST(DeleteMessageBatch, DoYmqDeleteMessageBatchRequest, nullptr, Off)
48+
ADD_REQUEST(ChangeMessageVisibilityBatch, DoYmqChangeMessageVisibilityBatchRequest, nullptr, Off)
4849

4950
#undef ADD_REQUEST
5051
}

ydb/services/ymq/ymq_proxy.cpp

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -854,6 +854,58 @@ namespace NKikimr::NYmq::V1 {
854854
return result;
855855
}
856856
};
857+
858+
class TChangeMessageVisibilityBatchReplyCallback : public TReplyCallback<
859+
NKikimr::NSQS::TChangeMessageVisibilityBatchResponse,
860+
Ydb::Ymq::V1::ChangeMessageVisibilityBatchResult> {
861+
public:
862+
using TReplyCallback::TReplyCallback;
863+
864+
private:
865+
const NKikimr::NSQS::TChangeMessageVisibilityBatchResponse& GetResponse(const NKikimrClient::TSqsResponse& resp) override {
866+
return resp.GetChangeMessageVisibilityBatch();
867+
}
868+
869+
Ydb::Ymq::V1::ChangeMessageVisibilityBatchResult GetResult(const NKikimrClient::TSqsResponse& response) override {
870+
Ydb::Ymq::V1::ChangeMessageVisibilityBatchResult result;
871+
for (auto& entry : response.GetChangeMessageVisibilityBatch().GetEntries()) {
872+
if (entry.GetError().HasErrorCode()) {
873+
auto currentFailed = result.Addfailed();
874+
currentFailed->Setcode(entry.GetError().GetErrorCode());
875+
currentFailed->Setid(entry.GetId());
876+
currentFailed->Setmessage(entry.GetError().GetMessage());
877+
878+
ui32 httpStatus = NSQS::TErrorClass::GetHttpStatus(entry.GetError().GetErrorCode()).GetOrElse(400);
879+
currentFailed->Setsender_fault(400 <= httpStatus && httpStatus < 500);
880+
} else {
881+
auto currentSuccessful = result.Addsuccessful();
882+
currentSuccessful->Setid(entry.GetId());
883+
}
884+
}
885+
return result;
886+
}
887+
};
888+
889+
class TChangeMessageVisibilityBatchActor : public TRpcRequestActor<
890+
TEvYmqChangeMessageVisibilityBatchRequest,
891+
NKikimr::NSQS::TChangeMessageVisibilityBatchRequest,
892+
TChangeMessageVisibilityBatchReplyCallback> {
893+
public:
894+
using TRpcRequestActor::TRpcRequestActor;
895+
896+
private:
897+
NKikimr::NSQS::TChangeMessageVisibilityBatchRequest* GetRequest(THolder<TSqsRequest>& requestHolder) override {
898+
auto result = requestHolder->MutableChangeMessageVisibilityBatch();
899+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url())->second);
900+
for (auto& requestEntry : GetProtoRequest()->Getentries()) {
901+
auto entry = requestHolder->MutableChangeMessageVisibilityBatch()->MutableEntries()->Add();
902+
entry->SetId(requestEntry.Getid());
903+
entry->SetVisibilityTimeout(requestEntry.Getvisibility_timeout());
904+
entry->SetReceiptHandle(requestEntry.Getreceipt_handle());
905+
}
906+
return result;
907+
}
908+
};
857909
}
858910

859911
namespace NKikimr::NGRpcService {
@@ -880,5 +932,6 @@ DECLARE_RPC(ChangeMessageVisibility);
880932
DECLARE_RPC(SetQueueAttributes);
881933
DECLARE_RPC(SendMessageBatch);
882934
DECLARE_RPC(DeleteMessageBatch);
935+
DECLARE_RPC(ChangeMessageVisibilityBatch);
883936

884937
}

ydb/services/ymq/ymq_proxy.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ using TEvYmqChangeMessageVisibilityRequest = TGrpcRequestOperationCall<Ydb::Ymq:
2626
using TEvYmqSetQueueAttributesRequest = TGrpcRequestOperationCall<Ydb::Ymq::V1::SetQueueAttributesRequest, Ydb::Ymq::V1::SetQueueAttributesResponse>;
2727
using TEvYmqSendMessageBatchRequest = TGrpcRequestOperationCall<Ydb::Ymq::V1::SendMessageBatchRequest, Ydb::Ymq::V1::SendMessageBatchResponse>;
2828
using TEvYmqDeleteMessageBatchRequest = TGrpcRequestOperationCall<Ydb::Ymq::V1::DeleteMessageBatchRequest, Ydb::Ymq::V1::DeleteMessageBatchResponse>;
29+
using TEvYmqChangeMessageVisibilityBatchRequest = TGrpcRequestOperationCall<Ydb::Ymq::V1::ChangeMessageVisibilityBatchRequest, Ydb::Ymq::V1::ChangeMessageVisibilityBatchResponse>;
2930

3031
}
3132
}

0 commit comments

Comments
 (0)