Skip to content

Commit 0ef4835

Browse files
authored
SQS: handle empty QueueUrl correctly (#11652)
1 parent 1e55f85 commit 0ef4835

File tree

7 files changed

+68
-44
lines changed

7 files changed

+68
-44
lines changed

ydb/core/http_proxy/http_req.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -469,11 +469,11 @@ namespace NKikimr::NHttpProxy {
469469
auto queueUrl = QueueUrlExtractor(Request);
470470
if (!queueUrl.empty()) {
471471
auto cloudIdAndResourceId = NKikimr::NYmq::CloudIdAndResourceIdFromQueueUrl(queueUrl);
472-
if(cloudIdAndResourceId.Empty()) {
472+
if (cloudIdAndResourceId.first.empty()) {
473473
return ReplyWithError(ctx, NYdb::EStatus::BAD_REQUEST, "Invalid queue url");
474474
}
475-
CloudId = cloudIdAndResourceId.Get()->first;
476-
ResourceId = cloudIdAndResourceId.Get()->second;
475+
CloudId = cloudIdAndResourceId.first;
476+
ResourceId = cloudIdAndResourceId.second;
477477
}
478478
} catch (const NKikimr::NSQS::TSQSException& e) {
479479
NYds::EErrorCodes issueCode = NYds::EErrorCodes::OK;

ydb/core/http_proxy/ut/datastreams_fixture.h

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ T GetByPath(const NJson::TJsonValue& msg, TStringBuf path) {
6868
}
6969
}
7070

71-
7271
class THttpProxyTestMock : public NUnitTest::TBaseFixture {
72+
friend class THttpProxyTestMockForSQS;
7373
public:
7474
THttpProxyTestMock() = default;
7575
~THttpProxyTestMock() = default;
@@ -82,12 +82,12 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
8282
InitAll();
8383
}
8484

85-
void InitAll() {
85+
void InitAll(bool yandexCloudMode = true) {
8686
AccessServicePort = PortManager.GetPort(8443);
8787
AccessServiceEndpoint = "127.0.0.1:" + ToString(AccessServicePort);
88-
InitKikimr();
88+
InitKikimr(yandexCloudMode);
8989
InitAccessServiceService();
90-
InitHttpServer();
90+
InitHttpServer(yandexCloudMode);
9191
}
9292

9393
static TString FormAuthorizationStr(const TString& region) {
@@ -366,7 +366,9 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
366366

367367
NJson::TJsonMap SendMessage(NJson::TJsonMap request, ui32 expectedHttpCode = 200) {
368368
auto json = SendJsonRequest("SendMessage", request, expectedHttpCode);
369-
UNIT_ASSERT(!GetByPath<TString>(json, "MD5OfMessageBody").empty());
369+
if (expectedHttpCode == 200) {
370+
UNIT_ASSERT(!GetByPath<TString>(json, "MD5OfMessageBody").empty());
371+
}
370372
return json;
371373
}
372374

@@ -468,7 +470,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
468470
return resultSet;
469471
}
470472

471-
void InitKikimr() {
473+
void InitKikimr(bool yandexCloudMode) {
472474
AuthFactory = std::make_shared<TIamAuthFactory>();
473475
NKikimrConfig::TAppConfig appConfig;
474476
appConfig.MutablePQConfig()->SetTopicsAreFirstClassCitizen(true);
@@ -479,7 +481,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
479481
appConfig.MutablePQConfig()->MutableBillingMeteringConfig()->SetEnabled(true);
480482

481483
appConfig.MutableSqsConfig()->SetEnableSqs(true);
482-
appConfig.MutableSqsConfig()->SetYandexCloudMode(true);
484+
appConfig.MutableSqsConfig()->SetYandexCloudMode(yandexCloudMode);
483485
appConfig.MutableSqsConfig()->SetEnableDeadLetterQueues(true);
484486

485487
auto limit = appConfig.MutablePQConfig()->AddValidRetentionLimits();
@@ -795,7 +797,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
795797
AccessServiceServer = builder.BuildAndStart();
796798
}
797799

798-
void InitHttpServer() {
800+
void InitHttpServer(bool yandexCloudMode = true) {
799801
NKikimrConfig::TServerlessProxyConfig config;
800802
config.MutableHttpConfig()->AddYandexCloudServiceRegion("ru-central1");
801803
config.MutableHttpConfig()->AddYandexCloudServiceRegion("ru-central-1");
@@ -805,7 +807,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
805807
config.MutableHttpConfig()->SetAccessServiceEndpoint(TStringBuilder() << "127.0.0.1:" << AccessServicePort);
806808
config.SetTestMode(true);
807809
config.MutableHttpConfig()->SetPort(HttpServicePort);
808-
config.MutableHttpConfig()->SetYandexCloudMode(true);
810+
config.MutableHttpConfig()->SetYandexCloudMode(yandexCloudMode);
809811
config.MutableHttpConfig()->SetYmqEnabled(true);
810812

811813
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory = NYdb::CreateOAuthCredentialsProviderFactory("proxy_sa@builtin");
@@ -916,3 +918,9 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
916918
ui16 MonPort = 0;
917919
ui16 KikimrGrpcPort = 0;
918920
};
921+
922+
class THttpProxyTestMockForSQS : public THttpProxyTestMock {
923+
void SetUp(NUnitTest::TTestContext&) override {
924+
InitAll(false);
925+
}
926+
};

ydb/core/http_proxy/ut/ymq_ut.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,14 @@ Y_UNIT_TEST_SUITE(TestYmqHttpProxy) {
181181
});
182182
}
183183

184+
Y_UNIT_TEST_F(TestSendMessageEmptyQueueUrl, THttpProxyTestMockForSQS) {
185+
// We had a bug that crashed the server if QueueUrl was empty in a request.
186+
SendMessage({
187+
{"QueueUrl", ""},
188+
{"MessageBody", "MessageBody-0"}
189+
}, 400);
190+
}
191+
184192
Y_UNIT_TEST_F(TestSendMessageFifoQueue, THttpProxyTestMock) {
185193
auto json = CreateQueue({
186194
{"QueueName", "ExampleQueueName.fifo"},

ydb/services/ymq/utils.cpp

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
#include "utils.h"
2+
3+
#include <util/string/split.h>
4+
#include <ydb/core/ymq/base/utils.h>
5+
6+
7+
namespace NKikimr::NYmq {
8+
std::pair<TString, TString> CloudIdAndResourceIdFromQueueUrl(const TString& queueUrl) {
9+
auto protocolSeparator = queueUrl.find("://");
10+
if (protocolSeparator == TString::npos) {
11+
return {"", ""};
12+
}
13+
14+
auto restOfUrl = queueUrl.substr(protocolSeparator + 3);
15+
auto parts = StringSplitter(restOfUrl).Split('/').ToList<TString>();
16+
if (parts.size() < 3) {
17+
return {"", ""};
18+
}
19+
20+
bool isPrivateRequest = NKikimr::NSQS::IsPrivateRequest(restOfUrl);
21+
TString queueName = NKikimr::NSQS::ExtractQueueNameFromPath(restOfUrl, isPrivateRequest);
22+
TString accountName = NKikimr::NSQS::ExtractAccountNameFromPath(restOfUrl, isPrivateRequest);
23+
return {accountName, queueName};
24+
}
25+
}
26+

ydb/services/ymq/utils.h

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,7 @@
11
#pragma once
22

33
#include <util/generic/string.h>
4-
#include <util/generic/maybe.h>
5-
#include <util/string/split.h>
6-
#include <ydb/core/ymq/base/utils.h>
74

85
namespace NKikimr::NYmq {
9-
inline static TMaybe<std::pair<TString, TString>> CloudIdAndResourceIdFromQueueUrl(const TString& queueUrl) {
10-
auto protocolSeparator = queueUrl.find("://");
11-
if (protocolSeparator == TString::npos) {
12-
return Nothing();
13-
}
14-
15-
auto restOfUrl = queueUrl.substr(protocolSeparator + 3);
16-
auto parts = StringSplitter(restOfUrl).Split('/').ToList<TString>();
17-
if (parts.size() < 3) {
18-
return Nothing();
19-
}
20-
21-
bool isPrivateRequest = NKikimr::NSQS::IsPrivateRequest(restOfUrl);
22-
TString queueName = NKikimr::NSQS::ExtractQueueNameFromPath(restOfUrl, isPrivateRequest);
23-
TString accountName = NKikimr::NSQS::ExtractAccountNameFromPath(restOfUrl, isPrivateRequest);
24-
return std::pair<TString, TString>(std::move(accountName), std::move(queueName));
25-
}
6+
std::pair<TString, TString> CloudIdAndResourceIdFromQueueUrl(const TString& queueUrl);
267
}

ydb/services/ymq/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ LIBRARY()
33
SRCS(
44
ymq_proxy.cpp
55
grpc_service.cpp
6+
utils.cpp
67
)
78

89
PEERDIR(

ydb/services/ymq/ymq_proxy.cpp

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ namespace NKikimr::NYmq::V1 {
293293
COPY_FIELD_IF_PRESENT(message_deduplication_id, MessageDeduplicationId);
294294
COPY_FIELD_IF_PRESENT(message_group_id, MessageGroupId);
295295

296-
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url())->second);
296+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url()).second);
297297

298298
result->SetMessageBody(GetProtoRequest()->Getmessage_body());
299299

@@ -379,7 +379,7 @@ namespace NKikimr::NYmq::V1 {
379379
NKikimr::NSQS::TReceiveMessageRequest* GetRequest(TSqsRequest& requestHolder) override {
380380
auto result = requestHolder.MutableReceiveMessage();
381381

382-
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
382+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url()).second);
383383

384384
COPY_FIELD_IF_PRESENT(max_number_of_messages, MaxNumberOfMessages);
385385
COPY_FIELD_IF_PRESENT(receive_request_attempt_id, ReceiveRequestAttemptId);
@@ -510,7 +510,7 @@ namespace NKikimr::NYmq::V1 {
510510
private:
511511
NKikimr::NSQS::TGetQueueAttributesRequest* GetRequest(TSqsRequest& requestHolder) override {
512512
auto result = requestHolder.MutableGetQueueAttributes();
513-
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
513+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url()).second);
514514
for (const auto& attributeName : GetProtoRequest()->Getattribute_names()) {
515515
result->MutableNames()->Add()->assign(attributeName);
516516
}
@@ -588,7 +588,7 @@ namespace NKikimr::NYmq::V1 {
588588
private:
589589
NKikimr::NSQS::TDeleteMessageRequest* GetRequest(TSqsRequest& requestHolder) override {
590590
auto result = requestHolder.MutableDeleteMessage();
591-
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
591+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url()).second);
592592
result->SetReceiptHandle(GetProtoRequest()->receipt_handle());
593593
return result;
594594
}
@@ -621,7 +621,7 @@ namespace NKikimr::NYmq::V1 {
621621
private:
622622
NKikimr::NSQS::TPurgeQueueRequest* GetRequest(TSqsRequest& requestHolder) override {
623623
auto result = requestHolder.MutablePurgeQueue();
624-
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
624+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url()).second);
625625
return result;
626626
}
627627
};
@@ -653,7 +653,7 @@ namespace NKikimr::NYmq::V1 {
653653
private:
654654
NKikimr::NSQS::TDeleteQueueRequest* GetRequest(TSqsRequest& requestHolder) override {
655655
auto result = requestHolder.MutableDeleteQueue();
656-
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
656+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url()).second);
657657
return result;
658658
}
659659
};
@@ -685,7 +685,7 @@ namespace NKikimr::NYmq::V1 {
685685
private:
686686
NKikimr::NSQS::TChangeMessageVisibilityRequest* GetRequest(TSqsRequest& requestHolder) override {
687687
auto result = requestHolder.MutableChangeMessageVisibility();
688-
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
688+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url()).second);
689689
result->SetReceiptHandle(GetProtoRequest()->Getreceipt_handle());
690690
result->SetVisibilityTimeout(GetProtoRequest()->Getvisibility_timeout());
691691
return result;
@@ -727,7 +727,7 @@ namespace NKikimr::NYmq::V1 {
727727
private:
728728
NKikimr::NSQS::TSetQueueAttributesRequest* GetRequest(TSqsRequest& requestHolder) override {
729729
auto result = requestHolder.MutableSetQueueAttributes();
730-
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
730+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url()).second);
731731
for (auto& [name, value]: GetProtoRequest()->Getattributes()) {
732732
AddAttribute(requestHolder, name, value);
733733
}
@@ -765,7 +765,7 @@ namespace NKikimr::NYmq::V1 {
765765
private:
766766
NKikimr::NSQS::TListDeadLetterSourceQueuesRequest* GetRequest(TSqsRequest& requestHolder) override {
767767
auto result = requestHolder.MutableListDeadLetterSourceQueues();
768-
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
768+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url()).second);
769769
return result;
770770
}
771771
};
@@ -816,7 +816,7 @@ namespace NKikimr::NYmq::V1 {
816816
NKikimr::NSQS::TSendMessageBatchRequest* GetRequest(TSqsRequest& requestHolder) override {
817817
auto result = requestHolder.MutableSendMessageBatch();
818818

819-
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url())->second);
819+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url()).second);
820820

821821
for (auto& requestEntry : GetProtoRequest()->Getentries()) {
822822
auto entry = requestHolder.MutableSendMessageBatch()->MutableEntries()->Add();
@@ -883,7 +883,7 @@ namespace NKikimr::NYmq::V1 {
883883
private:
884884
NKikimr::NSQS::TDeleteMessageBatchRequest* GetRequest(TSqsRequest& requestHolder) override {
885885
auto result = requestHolder.MutableDeleteMessageBatch();
886-
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url())->second);
886+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url()).second);
887887
for (auto& requestEntry : GetProtoRequest()->Getentries()) {
888888
auto entry = requestHolder.MutableDeleteMessageBatch()->AddEntries();
889889
entry->SetId(requestEntry.Getid());
@@ -934,7 +934,7 @@ namespace NKikimr::NYmq::V1 {
934934
private:
935935
NKikimr::NSQS::TChangeMessageVisibilityBatchRequest* GetRequest(TSqsRequest& requestHolder) override {
936936
auto result = requestHolder.MutableChangeMessageVisibilityBatch();
937-
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url())->second);
937+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url()).second);
938938
for (auto& requestEntry : GetProtoRequest()->Getentries()) {
939939
auto entry = requestHolder.MutableChangeMessageVisibilityBatch()->MutableEntries()->Add();
940940
entry->SetId(requestEntry.Getid());

0 commit comments

Comments
 (0)