Skip to content

Commit 31b8d00

Browse files
authored
SQS: Handle empty QueueUrl correctly (stable-24-3) (#11635)
1 parent 0a5089e commit 31b8d00

File tree

7 files changed

+68
-43
lines changed

7 files changed

+68
-43
lines changed

ydb/core/http_proxy/http_req.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -469,11 +469,11 @@ namespace NKikimr::NHttpProxy {
469469
auto queueUrl = QueueUrlExtractor(Request);
470470
if (!queueUrl.empty()) {
471471
auto cloudIdAndResourceId = NKikimr::NYmq::CloudIdAndResourceIdFromQueueUrl(queueUrl);
472-
if(cloudIdAndResourceId.Empty()) {
472+
if (cloudIdAndResourceId.first.empty()) {
473473
return ReplyWithError(ctx, NYdb::EStatus::BAD_REQUEST, "Invalid queue url");
474474
}
475-
CloudId = cloudIdAndResourceId.Get()->first;
476-
ResourceId = cloudIdAndResourceId.Get()->second;
475+
CloudId = cloudIdAndResourceId.first;
476+
ResourceId = cloudIdAndResourceId.second;
477477
}
478478
} catch (const NKikimr::NSQS::TSQSException& e) {
479479
NYds::EErrorCodes issueCode = NYds::EErrorCodes::OK;

ydb/core/http_proxy/ut/datastreams_fixture.h

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,8 @@ T GetByPath(const NJson::TJsonValue& msg, TStringBuf path) {
6767
}
6868
}
6969

70-
7170
class THttpProxyTestMock : public NUnitTest::TBaseFixture {
71+
friend class THttpProxyTestMockForSQS;
7272
public:
7373
THttpProxyTestMock() = default;
7474
~THttpProxyTestMock() = default;
@@ -81,12 +81,12 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
8181
InitAll();
8282
}
8383

84-
void InitAll() {
84+
void InitAll(bool yandexCloudMode = true) {
8585
AccessServicePort = PortManager.GetPort(8443);
8686
AccessServiceEndpoint = "127.0.0.1:" + ToString(AccessServicePort);
87-
InitKikimr();
87+
InitKikimr(yandexCloudMode);
8888
InitAccessServiceService();
89-
InitHttpServer();
89+
InitHttpServer(yandexCloudMode);
9090
}
9191

9292
static TString FormAuthorizationStr(const TString& region) {
@@ -365,7 +365,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
365365
return resultSet;
366366
}
367367

368-
void InitKikimr() {
368+
void InitKikimr(bool yandexCloudMode) {
369369
AuthFactory = std::make_shared<TIamAuthFactory>();
370370
NKikimrConfig::TAppConfig appConfig;
371371
appConfig.MutablePQConfig()->SetTopicsAreFirstClassCitizen(true);
@@ -376,7 +376,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
376376
appConfig.MutablePQConfig()->MutableBillingMeteringConfig()->SetEnabled(true);
377377

378378
appConfig.MutableSqsConfig()->SetEnableSqs(true);
379-
appConfig.MutableSqsConfig()->SetYandexCloudMode(true);
379+
appConfig.MutableSqsConfig()->SetYandexCloudMode(yandexCloudMode);
380380
appConfig.MutableSqsConfig()->SetEnableDeadLetterQueues(true);
381381

382382
auto limit = appConfig.MutablePQConfig()->AddValidRetentionLimits();
@@ -639,7 +639,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
639639
AccessServiceServer = builder.BuildAndStart();
640640
}
641641

642-
void InitHttpServer() {
642+
void InitHttpServer(bool yandexCloudMode) {
643643
NKikimrConfig::TServerlessProxyConfig config;
644644
config.MutableHttpConfig()->AddYandexCloudServiceRegion("ru-central1");
645645
config.MutableHttpConfig()->AddYandexCloudServiceRegion("ru-central-1");
@@ -649,7 +649,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
649649
config.MutableHttpConfig()->SetAccessServiceEndpoint(TStringBuilder() << "127.0.0.1:" << AccessServicePort);
650650
config.SetTestMode(true);
651651
config.MutableHttpConfig()->SetPort(HttpServicePort);
652-
config.MutableHttpConfig()->SetYandexCloudMode(true);
652+
config.MutableHttpConfig()->SetYandexCloudMode(yandexCloudMode);
653653
config.MutableHttpConfig()->SetYmqEnabled(true);
654654

655655
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory = NYdb::CreateOAuthCredentialsProviderFactory("proxy_sa@builtin");
@@ -760,3 +760,9 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
760760
ui16 MonPort = 0;
761761
ui16 KikimrGrpcPort = 0;
762762
};
763+
764+
class THttpProxyTestMockForSQS : public THttpProxyTestMock {
765+
void SetUp(NUnitTest::TTestContext&) override {
766+
InitAll(false);
767+
}
768+
};

ydb/core/http_proxy/ut/http_proxy_ut.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1663,6 +1663,18 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) {
16631663
UNIT_ASSERT(!GetByPath<TString>(json, "MessageId").empty());
16641664
}
16651665

1666+
Y_UNIT_TEST_F(TestSendMessageEmptyQueueUrl, THttpProxyTestMockForSQS) {
1667+
NJson::TJsonValue sendMessageReq;
1668+
sendMessageReq["QueueUrl"] = "";
1669+
auto body = "MessageBody-0";
1670+
sendMessageReq["MessageBody"] = body;
1671+
sendMessageReq["MessageDeduplicationId"] = "MessageDeduplicationId-0";
1672+
sendMessageReq["MessageGroupId"] = "MessageGroupId-0";
1673+
1674+
auto res = SendHttpRequest("/Root", "AmazonSQS.SendMessage", std::move(sendMessageReq), FormAuthorizationStr("ru-central1"));
1675+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 400);
1676+
}
1677+
16661678
Y_UNIT_TEST_F(TestReceiveMessage, THttpProxyTestMock) {
16671679
auto createQueueReq = CreateSqsCreateQueueRequest();
16681680
auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", createQueueReq, FormAuthorizationStr("ru-central1"));

ydb/services/ymq/utils.cpp

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#include "utils.h"
2+
3+
#include <util/string/split.h>
4+
#include <ydb/core/ymq/base/utils.h>
5+
6+
7+
namespace NKikimr::NYmq {
8+
std::pair<TString, TString> CloudIdAndResourceIdFromQueueUrl(const TString& queueUrl) {
9+
auto protocolSeparator = queueUrl.find("://");
10+
if (protocolSeparator == TString::npos) {
11+
return {"", ""};
12+
}
13+
14+
auto restOfUrl = queueUrl.substr(protocolSeparator + 3);
15+
auto parts = StringSplitter(restOfUrl).Split('/').ToList<TString>();
16+
if (parts.size() < 3) {
17+
return {"", ""};
18+
}
19+
20+
bool isPrivateRequest = NKikimr::NSQS::IsPrivateRequest(restOfUrl);
21+
TString queueName = NKikimr::NSQS::ExtractQueueNameFromPath(restOfUrl, isPrivateRequest);
22+
TString accountName = NKikimr::NSQS::ExtractAccountNameFromPath(restOfUrl, isPrivateRequest);
23+
return {accountName, queueName};
24+
}
25+
}

ydb/services/ymq/utils.h

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,7 @@
11
#pragma once
22

33
#include <util/generic/string.h>
4-
#include <util/generic/maybe.h>
5-
#include <util/string/split.h>
6-
#include <ydb/core/ymq/base/utils.h>
74

85
namespace NKikimr::NYmq {
9-
inline static TMaybe<std::pair<TString, TString>> CloudIdAndResourceIdFromQueueUrl(const TString& queueUrl) {
10-
auto protocolSeparator = queueUrl.find("://");
11-
if (protocolSeparator == TString::npos) {
12-
return Nothing();
13-
}
14-
15-
auto restOfUrl = queueUrl.substr(protocolSeparator + 3);
16-
auto parts = StringSplitter(restOfUrl).Split('/').ToList<TString>();
17-
if (parts.size() < 3) {
18-
return Nothing();
19-
}
20-
21-
bool isPrivateRequest = NKikimr::NSQS::IsPrivateRequest(restOfUrl);
22-
TString queueName = NKikimr::NSQS::ExtractQueueNameFromPath(restOfUrl, isPrivateRequest);
23-
TString accountName = NKikimr::NSQS::ExtractAccountNameFromPath(restOfUrl, isPrivateRequest);
24-
return std::pair<TString, TString>(std::move(accountName), std::move(queueName));
25-
}
6+
std::pair<TString, TString> CloudIdAndResourceIdFromQueueUrl(const TString& queueUrl);
267
}

ydb/services/ymq/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ LIBRARY()
33
SRCS(
44
ymq_proxy.cpp
55
grpc_service.cpp
6+
utils.cpp
67
)
78

89
PEERDIR(

ydb/services/ymq/ymq_proxy.cpp

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ namespace NKikimr::NYmq::V1 {
290290
COPY_FIELD_IF_PRESENT(message_deduplication_id, MessageDeduplicationId);
291291
COPY_FIELD_IF_PRESENT(message_group_id, MessageGroupId);
292292

293-
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url())->second);
293+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url()).second);
294294

295295
result->SetMessageBody(GetProtoRequest()->Getmessage_body());
296296

@@ -376,7 +376,7 @@ namespace NKikimr::NYmq::V1 {
376376
NKikimr::NSQS::TReceiveMessageRequest* GetRequest(TSqsRequest& requestHolder) override {
377377
auto result = requestHolder.MutableReceiveMessage();
378378

379-
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
379+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url()).second);
380380

381381
COPY_FIELD_IF_PRESENT(max_number_of_messages, MaxNumberOfMessages);
382382
COPY_FIELD_IF_PRESENT(receive_request_attempt_id, ReceiveRequestAttemptId);
@@ -496,7 +496,7 @@ namespace NKikimr::NYmq::V1 {
496496
private:
497497
NKikimr::NSQS::TGetQueueAttributesRequest* GetRequest(TSqsRequest& requestHolder) override {
498498
auto result = requestHolder.MutableGetQueueAttributes();
499-
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
499+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url()).second);
500500
for (const auto& attributeName : GetProtoRequest()->Getattribute_names()) {
501501
result->MutableNames()->Add()->assign(attributeName);
502502
}
@@ -574,7 +574,7 @@ namespace NKikimr::NYmq::V1 {
574574
private:
575575
NKikimr::NSQS::TDeleteMessageRequest* GetRequest(TSqsRequest& requestHolder) override {
576576
auto result = requestHolder.MutableDeleteMessage();
577-
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
577+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url()).second);
578578
result->SetReceiptHandle(GetProtoRequest()->receipt_handle());
579579
return result;
580580
}
@@ -607,7 +607,7 @@ namespace NKikimr::NYmq::V1 {
607607
private:
608608
NKikimr::NSQS::TPurgeQueueRequest* GetRequest(TSqsRequest& requestHolder) override {
609609
auto result = requestHolder.MutablePurgeQueue();
610-
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
610+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url()).second);
611611
return result;
612612
}
613613
};
@@ -639,7 +639,7 @@ namespace NKikimr::NYmq::V1 {
639639
private:
640640
NKikimr::NSQS::TDeleteQueueRequest* GetRequest(TSqsRequest& requestHolder) override {
641641
auto result = requestHolder.MutableDeleteQueue();
642-
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
642+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url()).second);
643643
return result;
644644
}
645645
};
@@ -671,7 +671,7 @@ namespace NKikimr::NYmq::V1 {
671671
private:
672672
NKikimr::NSQS::TChangeMessageVisibilityRequest* GetRequest(TSqsRequest& requestHolder) override {
673673
auto result = requestHolder.MutableChangeMessageVisibility();
674-
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
674+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url()).second);
675675
result->SetReceiptHandle(GetProtoRequest()->Getreceipt_handle());
676676
result->SetVisibilityTimeout(GetProtoRequest()->Getvisibility_timeout());
677677
return result;
@@ -713,7 +713,7 @@ namespace NKikimr::NYmq::V1 {
713713
private:
714714
NKikimr::NSQS::TSetQueueAttributesRequest* GetRequest(TSqsRequest& requestHolder) override {
715715
auto result = requestHolder.MutableSetQueueAttributes();
716-
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
716+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url()).second);
717717
for (auto& [name, value]: GetProtoRequest()->Getattributes()) {
718718
AddAttribute(requestHolder, name, value);
719719
}
@@ -751,7 +751,7 @@ namespace NKikimr::NYmq::V1 {
751751
private:
752752
NKikimr::NSQS::TListDeadLetterSourceQueuesRequest* GetRequest(TSqsRequest& requestHolder) override {
753753
auto result = requestHolder.MutableListDeadLetterSourceQueues();
754-
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
754+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url()).second);
755755
return result;
756756
}
757757
};
@@ -803,7 +803,7 @@ namespace NKikimr::NYmq::V1 {
803803
NKikimr::NSQS::TSendMessageBatchRequest* GetRequest(TSqsRequest& requestHolder) override {
804804
auto result = requestHolder.MutableSendMessageBatch();
805805

806-
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url())->second);
806+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url()).second);
807807

808808
for (auto& requestEntry : GetProtoRequest()->Getentries()) {
809809
auto entry = requestHolder.MutableSendMessageBatch()->MutableEntries()->Add();
@@ -870,7 +870,7 @@ namespace NKikimr::NYmq::V1 {
870870
private:
871871
NKikimr::NSQS::TDeleteMessageBatchRequest* GetRequest(TSqsRequest& requestHolder) override {
872872
auto result = requestHolder.MutableDeleteMessageBatch();
873-
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url())->second);
873+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url()).second);
874874
for (auto& requestEntry : GetProtoRequest()->Getentries()) {
875875
auto entry = requestHolder.MutableDeleteMessageBatch()->AddEntries();
876876
entry->SetId(requestEntry.Getid());
@@ -921,7 +921,7 @@ namespace NKikimr::NYmq::V1 {
921921
private:
922922
NKikimr::NSQS::TChangeMessageVisibilityBatchRequest* GetRequest(TSqsRequest& requestHolder) override {
923923
auto result = requestHolder.MutableChangeMessageVisibilityBatch();
924-
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url())->second);
924+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url()).second);
925925
for (auto& requestEntry : GetProtoRequest()->Getentries()) {
926926
auto entry = requestHolder.MutableChangeMessageVisibilityBatch()->MutableEntries()->Add();
927927
entry->SetId(requestEntry.Getid());

0 commit comments

Comments
 (0)