Skip to content

Commit 4787227

Browse files
GrigoriyPArekby
authored andcommitted
YQ-3570 added s3 wildcards validations (ydb-platform#8017)
1 parent f9ae353 commit 4787227

File tree

14 files changed

+193
-19
lines changed

14 files changed

+193
-19
lines changed

ydb/core/external_sources/object_storage.cpp

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ struct TObjectStorageExternalSource : public IExternalSource {
6767
}
6868
}
6969

70-
if (auto issues = Validate(schema, objectStorage, PathsLimit)) {
70+
if (auto issues = Validate(schema, objectStorage, PathsLimit, general.location())) {
7171
ythrow TExternalSourceException() << issues.ToString();
7272
}
7373

@@ -136,11 +136,18 @@ struct TObjectStorageExternalSource : public IExternalSource {
136136
}
137137

138138
template<typename TScheme, typename TObjectStorage>
139-
static NYql::TIssues Validate(const TScheme& schema, const TObjectStorage& objectStorage, size_t pathsLimit) {
139+
static NYql::TIssues Validate(const TScheme& schema, const TObjectStorage& objectStorage, size_t pathsLimit, const TString& location) {
140140
NYql::TIssues issues;
141-
issues.AddIssues(ValidateFormatSetting(objectStorage.format(), objectStorage.format_setting()));
141+
if (TString errorString = NYql::NS3::ValidateWildcards(location)) {
142+
issues.AddIssue(MakeErrorIssue(Ydb::StatusIds::BAD_REQUEST, TStringBuilder() << "Location '" << location << "' contains invalid wildcard: " << errorString));
143+
}
144+
const bool hasPartitioning = objectStorage.projection_size() || objectStorage.partitioned_by_size();
145+
issues.AddIssues(ValidateFormatSetting(objectStorage.format(), objectStorage.format_setting(), location, hasPartitioning));
142146
issues.AddIssues(ValidateRawFormat(objectStorage.format(), schema, objectStorage.partitioned_by()));
143-
if (objectStorage.projection_size() || objectStorage.partitioned_by_size()) {
147+
if (hasPartitioning) {
148+
if (NYql::NS3::HasWildcards(location)) {
149+
issues.AddIssue(MakeErrorIssue(Ydb::StatusIds::BAD_REQUEST, TStringBuilder() << "Location '" << location << "' contains wildcards"));
150+
}
144151
try {
145152
TVector<TString> partitionedBy{objectStorage.partitioned_by().begin(), objectStorage.partitioned_by().end()};
146153
issues.AddIssues(ValidateProjectionColumns(schema, partitionedBy));
@@ -160,11 +167,17 @@ struct TObjectStorageExternalSource : public IExternalSource {
160167
return issues;
161168
}
162169

163-
static NYql::TIssues ValidateFormatSetting(const TString& format, const google::protobuf::Map<TString, TString>& formatSetting) {
170+
static NYql::TIssues ValidateFormatSetting(const TString& format, const google::protobuf::Map<TString, TString>& formatSetting, const TString& location, bool hasPartitioning) {
164171
NYql::TIssues issues;
165172
issues.AddIssues(ValidateDateFormatSetting(formatSetting));
166173
for (const auto& [key, value]: formatSetting) {
167174
if (key == "file_pattern"sv) {
175+
if (TString errorString = NYql::NS3::ValidateWildcards(value)) {
176+
issues.AddIssue(MakeErrorIssue(Ydb::StatusIds::BAD_REQUEST, TStringBuilder() << "File pattern '" << value << "' contains invalid wildcard: " << errorString));
177+
}
178+
if (value && !hasPartitioning && !location.EndsWith("/")) {
179+
issues.AddIssue(MakeErrorIssue(Ydb::StatusIds::BAD_REQUEST, "Path pattern cannot be used with file_pattern"));
180+
}
168181
continue;
169182
}
170183

@@ -631,8 +644,8 @@ IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegEx
631644
return MakeIntrusive<TObjectStorageExternalSource>(hostnamePatterns, actorSystem, pathsLimit, std::move(credentialsFactory), enableInfer, allowLocalFiles);
632645
}
633646

634-
NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit) {
635-
return TObjectStorageExternalSource::Validate(schema, objectStorage, pathsLimit);
647+
NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit, const TString& location) {
648+
return TObjectStorageExternalSource::Validate(schema, objectStorage, pathsLimit, location);
636649
}
637650

638651
NYql::TIssues ValidateDateFormatSetting(const google::protobuf::Map<TString, TString>& formatSetting, bool matchAllSettings) {

ydb/core/external_sources/object_storage.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegEx
1616
bool enableInfer,
1717
bool allowLocalFiles);
1818

19-
NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit);
19+
NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit, const TString& location);
2020

2121
NYql::TIssues ValidateDateFormatSetting(const google::protobuf::Map<TString, TString>& formatSetting, bool matchAllSettings = false);
2222

ydb/core/external_sources/object_storage_ut.cpp

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,32 @@ Y_UNIT_TEST_SUITE(ObjectStorageTest) {
2929
general.mutable_attributes()->insert({"projection.h", "b"});
3030
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Partition by must always be specified");
3131
}
32+
33+
Y_UNIT_TEST(WildcardsValidation) {
34+
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false, false);
35+
NKikimrExternalSources::TSchema schema;
36+
37+
{ // location
38+
NKikimrExternalSources::TGeneral general;
39+
general.set_location("{");
40+
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Location '{' contains invalid wildcard:");
41+
}
42+
43+
{ // file pattern
44+
NKikimrExternalSources::TGeneral general;
45+
general.mutable_attributes()->insert({"file_pattern", "{"});
46+
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "File pattern '{' contains invalid wildcard:");
47+
general.set_location("/test_file");
48+
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Path pattern cannot be used with file_pattern");
49+
}
50+
51+
{ // partitioned by
52+
NKikimrExternalSources::TGeneral general;
53+
general.set_location("*");
54+
general.mutable_attributes()->insert({"partitioned_by", "[year]"});
55+
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Location '*' contains wildcards");
56+
}
57+
}
3258
}
3359

3460
} // NKikimr

ydb/core/fq/libs/control_plane_storage/request_validators.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ NYql::TIssues ValidateBinding(const T& ev, size_t maxSize, const TSet<FederatedQ
121121
case FederatedQuery::BindingSetting::kObjectStorage:
122122
const FederatedQuery::ObjectStorageBinding objectStorage = setting.object_storage();
123123
for (const auto& subset: objectStorage.subset()) {
124-
issues.AddIssues(NKikimr::NExternalSource::Validate(subset.schema(), subset, pathsLimit));
124+
issues.AddIssues(NKikimr::NExternalSource::Validate(subset.schema(), subset, pathsLimit, subset.path_pattern()));
125125
}
126126
break;
127127
}

ydb/core/kqp/gateway/utils/scheme_helpers.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ void FillCreateExternalTableColumnDesc(NKikimrSchemeOp::TExternalTableDescriptio
8888
columnDesc.SetNotNull(columnIt->second.NotNull);
8989
}
9090
NKikimrExternalSources::TGeneral general;
91+
general.set_location(settings.Location);
9192
auto& attributes = *general.mutable_attributes();
9293
for (const auto& [key, value]: settings.SourceTypeParameters) {
9394
attributes.insert({key, value});

ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
3535
const TString externalDataSourceName = "/Root/external_data_source";
3636
const TString externalTableName = "/Root/test_binding_resolve";
3737
const TString bucket = "test_bucket1";
38-
const TString object = TStringBuilder() << "test_" << GetSymbolsString(' ', '~', "{}") << "_object";
38+
const TString object = TStringBuilder() << "test_" << GetSymbolsString(' ', '~', "*?{}") << "_object";
3939

4040
CreateBucketWithObject(bucket, object, TEST_CONTENT);
4141

@@ -2143,7 +2143,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
21432143

21442144
Y_UNIT_TEST(TestReadEmptyFileWithCsvFormat) {
21452145
const TString externalDataSourceName = "/Root/external_data_source";
2146-
const TString bucket = "test_bucket1";
2146+
const TString bucket = "test_bucket12";
21472147

21482148
CreateBucketWithObject(bucket, "test_object", "");
21492149

@@ -2181,6 +2181,60 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
21812181
NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
21822182
UNIT_ASSERT_EQUAL_C(readyOp.Metadata().ExecStatus, EExecStatus::Completed, readyOp.Status().GetIssues().ToString());
21832183
}
2184+
2185+
Y_UNIT_TEST(TestWildcardValidation) {
2186+
const TString bucket = "test_bucket13";
2187+
2188+
CreateBucket(bucket);
2189+
2190+
auto kikimr = NTestUtils::MakeKikimrRunner();
2191+
2192+
auto tc = kikimr->GetTableClient();
2193+
auto session = tc.CreateSession().GetValueSync().GetSession();
2194+
const TString query = fmt::format(R"(
2195+
CREATE EXTERNAL DATA SOURCE `/Root/external_data_source` WITH (
2196+
SOURCE_TYPE="ObjectStorage",
2197+
LOCATION="{location}",
2198+
AUTH_METHOD="NONE"
2199+
);)",
2200+
"location"_a = GetBucketLocation(bucket)
2201+
);
2202+
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
2203+
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
2204+
2205+
auto db = kikimr->GetQueryClient();
2206+
2207+
{ // path validation
2208+
const TString sql = R"(
2209+
SELECT * FROM `/Root/external_data_source`.`/{` WITH (
2210+
SCHEMA = (data String),
2211+
FORMAT = "csv_with_names"
2212+
))";
2213+
2214+
auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync();
2215+
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
2216+
2217+
NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
2218+
UNIT_ASSERT_EQUAL_C(readyOp.Metadata().ExecStatus, EExecStatus::Failed, readyOp.Status().GetIssues().ToString());
2219+
UNIT_ASSERT_STRING_CONTAINS(readyOp.Status().GetIssues().ToString(), "Path '/{' contains invalid wildcard:");
2220+
}
2221+
2222+
{ // file pattern validation
2223+
const TString sql = R"(
2224+
SELECT * FROM `/Root/external_data_source`.`/` WITH (
2225+
SCHEMA = (data String),
2226+
FORMAT = "csv_with_names",
2227+
FILE_PATTERN = "{"
2228+
))";
2229+
2230+
auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync();
2231+
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
2232+
2233+
NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
2234+
UNIT_ASSERT_EQUAL_C(readyOp.Metadata().ExecStatus, EExecStatus::Failed, readyOp.Status().GetIssues().ToString());
2235+
UNIT_ASSERT_STRING_CONTAINS(readyOp.Status().GetIssues().ToString(), "File pattern '{' contains invalid wildcard:");
2236+
}
2237+
}
21842238
}
21852239

21862240
} // namespace NKikimr::NKqp

ydb/core/kqp/ut/federated_query/s3/kqp_federated_scheme_ut.cpp

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,28 @@ Y_UNIT_TEST_SUITE(KqpFederatedSchemeTest) {
215215
};
216216
TestInvalidDropForExternalTableWithAuth(queryClientExecutor, "generic_query");
217217
}
218+
219+
Y_UNIT_TEST(ExternalTableDdlLocationValidation) {
220+
auto kikimr = NTestUtils::MakeKikimrRunner();
221+
auto db = kikimr->GetTableClient();
222+
auto session = db.CreateSession().GetValueSync().GetSession();
223+
auto query = TStringBuilder() << R"(
224+
CREATE EXTERNAL DATA SOURCE `/Root/ExternalDataSource` WITH (
225+
SOURCE_TYPE="ObjectStorage",
226+
LOCATION="my-bucket",
227+
AUTH_METHOD="NONE"
228+
);
229+
CREATE EXTERNAL TABLE `/Root/ExternalTable` (
230+
Key Uint64,
231+
Value String
232+
) WITH (
233+
DATA_SOURCE="/Root/ExternalDataSource",
234+
LOCATION="{"
235+
);)";
236+
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
237+
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SCHEME_ERROR);
238+
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Location '{' contains invalid wildcard:");
239+
}
218240
}
219241

220242
} // namespace NKikimr::NKqp

ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5518,7 +5518,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
55185518
month Int64 NOT NULL
55195519
) WITH (
55205520
DATA_SOURCE=")" << externalDataSourceName << R"(",
5521-
LOCATION="/folder1/*",
5521+
LOCATION="/folder1/",
55225522
FORMAT="json_as_string",
55235523
`projection.enabled`="true",
55245524
`projection.year.type`="integer",
@@ -5543,7 +5543,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
55435543
UNIT_ASSERT(externalTable.ExternalTableInfo);
55445544
UNIT_ASSERT_VALUES_EQUAL(externalTable.ExternalTableInfo->Description.ColumnsSize(), 4);
55455545
UNIT_ASSERT_VALUES_EQUAL(externalTable.ExternalTableInfo->Description.GetDataSourcePath(), externalDataSourceName);
5546-
UNIT_ASSERT_VALUES_EQUAL(externalTable.ExternalTableInfo->Description.GetLocation(), "/folder1/*");
5546+
UNIT_ASSERT_VALUES_EQUAL(externalTable.ExternalTableInfo->Description.GetLocation(), "/folder1/");
55475547
}
55485548

55495549
Y_UNIT_TEST(CreateExternalTableWithUpperCaseSettings) {
@@ -5566,7 +5566,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
55665566
Month Int64 NOT NULL
55675567
) WITH (
55685568
DATA_SOURCE=")" << externalDataSourceName << R"(",
5569-
LOCATION="/folder1/*",
5569+
LOCATION="/folder1/",
55705570
FORMAT="json_as_string",
55715571
`projection.enabled`="true",
55725572
`projection.Year.type`="integer",
@@ -5591,7 +5591,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
55915591
UNIT_ASSERT(externalTable.ExternalTableInfo);
55925592
UNIT_ASSERT_VALUES_EQUAL(externalTable.ExternalTableInfo->Description.ColumnsSize(), 4);
55935593
UNIT_ASSERT_VALUES_EQUAL(externalTable.ExternalTableInfo->Description.GetDataSourcePath(), externalDataSourceName);
5594-
UNIT_ASSERT_VALUES_EQUAL(externalTable.ExternalTableInfo->Description.GetLocation(), "/folder1/*");
5594+
UNIT_ASSERT_VALUES_EQUAL(externalTable.ExternalTableInfo->Description.GetLocation(), "/folder1/");
55955595
}
55965596

55975597
Y_UNIT_TEST(DoubleCreateExternalTable) {

ydb/core/protos/external_sources.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ message TSchema {
1111

1212
message TGeneral {
1313
map<string, string> attributes = 1 [(Ydb.size).le = 100];
14+
optional string location = 2;
1415
}
1516

1617
message TObjectStorage {

ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ std::pair<TPathFilter, TEarlyStopChecker> MakeFilterRegexp(const TString& regex,
4747
} else {
4848
re = std::make_shared<RE2>(re2::StringPiece(regex), RE2::Options());
4949
}
50+
Y_ENSURE(re->ok());
5051

5152
const size_t numGroups = re->NumberOfCapturingGroups();
5253
YQL_CLOG(DEBUG, ProviderS3)

0 commit comments

Comments
 (0)