Skip to content

YQ-3570 added s3 wildcards validations #8017

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions ydb/core/external_sources/object_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ struct TObjectStorageExternalSource : public IExternalSource {
}
}

if (auto issues = Validate(schema, objectStorage, PathsLimit)) {
if (auto issues = Validate(schema, objectStorage, PathsLimit, general.location())) {
ythrow TExternalSourceException() << issues.ToString();
}

Expand Down Expand Up @@ -136,11 +136,18 @@ struct TObjectStorageExternalSource : public IExternalSource {
}

template<typename TScheme, typename TObjectStorage>
static NYql::TIssues Validate(const TScheme& schema, const TObjectStorage& objectStorage, size_t pathsLimit) {
static NYql::TIssues Validate(const TScheme& schema, const TObjectStorage& objectStorage, size_t pathsLimit, const TString& location) {
NYql::TIssues issues;
issues.AddIssues(ValidateFormatSetting(objectStorage.format(), objectStorage.format_setting()));
if (TString errorString = NYql::NS3::ValidateWildcards(location)) {
issues.AddIssue(MakeErrorIssue(Ydb::StatusIds::BAD_REQUEST, TStringBuilder() << "Location '" << location << "' contains invalid wildcard: " << errorString));
}
const bool hasPartitioning = objectStorage.projection_size() || objectStorage.partitioned_by_size();
issues.AddIssues(ValidateFormatSetting(objectStorage.format(), objectStorage.format_setting(), location, hasPartitioning));
issues.AddIssues(ValidateRawFormat(objectStorage.format(), schema, objectStorage.partitioned_by()));
if (objectStorage.projection_size() || objectStorage.partitioned_by_size()) {
if (hasPartitioning) {
if (NYql::NS3::HasWildcards(location)) {
issues.AddIssue(MakeErrorIssue(Ydb::StatusIds::BAD_REQUEST, TStringBuilder() << "Location '" << location << "' contains wildcards"));
}
try {
TVector<TString> partitionedBy{objectStorage.partitioned_by().begin(), objectStorage.partitioned_by().end()};
issues.AddIssues(ValidateProjectionColumns(schema, partitionedBy));
Expand All @@ -160,11 +167,17 @@ struct TObjectStorageExternalSource : public IExternalSource {
return issues;
}

static NYql::TIssues ValidateFormatSetting(const TString& format, const google::protobuf::Map<TString, TString>& formatSetting) {
static NYql::TIssues ValidateFormatSetting(const TString& format, const google::protobuf::Map<TString, TString>& formatSetting, const TString& location, bool hasPartitioning) {
NYql::TIssues issues;
issues.AddIssues(ValidateDateFormatSetting(formatSetting));
for (const auto& [key, value]: formatSetting) {
if (key == "file_pattern"sv) {
if (TString errorString = NYql::NS3::ValidateWildcards(value)) {
issues.AddIssue(MakeErrorIssue(Ydb::StatusIds::BAD_REQUEST, TStringBuilder() << "File pattern '" << value << "' contains invalid wildcard: " << errorString));
}
if (value && !hasPartitioning && !location.EndsWith("/")) {
issues.AddIssue(MakeErrorIssue(Ydb::StatusIds::BAD_REQUEST, "Path pattern cannot be used with file_pattern"));
}
continue;
}

Expand Down Expand Up @@ -631,8 +644,8 @@ IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegEx
return MakeIntrusive<TObjectStorageExternalSource>(hostnamePatterns, actorSystem, pathsLimit, std::move(credentialsFactory), enableInfer, allowLocalFiles);
}

NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit) {
return TObjectStorageExternalSource::Validate(schema, objectStorage, pathsLimit);
NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit, const TString& location) {
return TObjectStorageExternalSource::Validate(schema, objectStorage, pathsLimit, location);
}

NYql::TIssues ValidateDateFormatSetting(const google::protobuf::Map<TString, TString>& formatSetting, bool matchAllSettings) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/external_sources/object_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegEx
bool enableInfer,
bool allowLocalFiles);

NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit);
NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit, const TString& location);

NYql::TIssues ValidateDateFormatSetting(const google::protobuf::Map<TString, TString>& formatSetting, bool matchAllSettings = false);

Expand Down
26 changes: 26 additions & 0 deletions ydb/core/external_sources/object_storage_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,32 @@ Y_UNIT_TEST_SUITE(ObjectStorageTest) {
general.mutable_attributes()->insert({"projection.h", "b"});
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Partition by must always be specified");
}

Y_UNIT_TEST(WildcardsValidation) {
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false, false);
NKikimrExternalSources::TSchema schema;

{ // location
NKikimrExternalSources::TGeneral general;
general.set_location("{");
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Location '{' contains invalid wildcard:");
}

{ // file pattern
NKikimrExternalSources::TGeneral general;
general.mutable_attributes()->insert({"file_pattern", "{"});
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "File pattern '{' contains invalid wildcard:");
general.set_location("/test_file");
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Path pattern cannot be used with file_pattern");
}

{ // partitioned by
NKikimrExternalSources::TGeneral general;
general.set_location("*");
general.mutable_attributes()->insert({"partitioned_by", "[year]"});
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Location '*' contains wildcards");
}
}
}

} // NKikimr
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ NYql::TIssues ValidateBinding(const T& ev, size_t maxSize, const TSet<FederatedQ
case FederatedQuery::BindingSetting::kObjectStorage:
const FederatedQuery::ObjectStorageBinding objectStorage = setting.object_storage();
for (const auto& subset: objectStorage.subset()) {
issues.AddIssues(NKikimr::NExternalSource::Validate(subset.schema(), subset, pathsLimit));
issues.AddIssues(NKikimr::NExternalSource::Validate(subset.schema(), subset, pathsLimit, subset.path_pattern()));
}
break;
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/gateway/utils/scheme_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ void FillCreateExternalTableColumnDesc(NKikimrSchemeOp::TExternalTableDescriptio
columnDesc.SetNotNull(columnIt->second.NotNull);
}
NKikimrExternalSources::TGeneral general;
general.set_location(settings.Location);
auto& attributes = *general.mutable_attributes();
for (const auto& [key, value]: settings.SourceTypeParameters) {
attributes.insert({key, value});
Expand Down
58 changes: 56 additions & 2 deletions ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
const TString externalDataSourceName = "/Root/external_data_source";
const TString externalTableName = "/Root/test_binding_resolve";
const TString bucket = "test_bucket1";
const TString object = TStringBuilder() << "test_" << GetSymbolsString(' ', '~', "{}") << "_object";
const TString object = TStringBuilder() << "test_" << GetSymbolsString(' ', '~', "*?{}") << "_object";

CreateBucketWithObject(bucket, object, TEST_CONTENT);

Expand Down Expand Up @@ -2143,7 +2143,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {

Y_UNIT_TEST(TestReadEmptyFileWithCsvFormat) {
const TString externalDataSourceName = "/Root/external_data_source";
const TString bucket = "test_bucket1";
const TString bucket = "test_bucket12";

CreateBucketWithObject(bucket, "test_object", "");

Expand Down Expand Up @@ -2181,6 +2181,60 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
UNIT_ASSERT_EQUAL_C(readyOp.Metadata().ExecStatus, EExecStatus::Completed, readyOp.Status().GetIssues().ToString());
}

Y_UNIT_TEST(TestWildcardValidation) {
const TString bucket = "test_bucket13";

CreateBucket(bucket);

auto kikimr = NTestUtils::MakeKikimrRunner();

auto tc = kikimr->GetTableClient();
auto session = tc.CreateSession().GetValueSync().GetSession();
const TString query = fmt::format(R"(
CREATE EXTERNAL DATA SOURCE `/Root/external_data_source` WITH (
SOURCE_TYPE="ObjectStorage",
LOCATION="{location}",
AUTH_METHOD="NONE"
);)",
"location"_a = GetBucketLocation(bucket)
);
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());

auto db = kikimr->GetQueryClient();

{ // path validation
const TString sql = R"(
SELECT * FROM `/Root/external_data_source`.`/{` WITH (
SCHEMA = (data String),
FORMAT = "csv_with_names"
))";

auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());

NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
UNIT_ASSERT_EQUAL_C(readyOp.Metadata().ExecStatus, EExecStatus::Failed, readyOp.Status().GetIssues().ToString());
UNIT_ASSERT_STRING_CONTAINS(readyOp.Status().GetIssues().ToString(), "Path '/{' contains invalid wildcard:");
}

{ // file pattern validation
const TString sql = R"(
SELECT * FROM `/Root/external_data_source`.`/` WITH (
SCHEMA = (data String),
FORMAT = "csv_with_names",
FILE_PATTERN = "{"
))";

auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());

NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
UNIT_ASSERT_EQUAL_C(readyOp.Metadata().ExecStatus, EExecStatus::Failed, readyOp.Status().GetIssues().ToString());
UNIT_ASSERT_STRING_CONTAINS(readyOp.Status().GetIssues().ToString(), "File pattern '{' contains invalid wildcard:");
}
}
}

} // namespace NKikimr::NKqp
22 changes: 22 additions & 0 deletions ydb/core/kqp/ut/federated_query/s3/kqp_federated_scheme_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,28 @@ Y_UNIT_TEST_SUITE(KqpFederatedSchemeTest) {
};
TestInvalidDropForExternalTableWithAuth(queryClientExecutor, "generic_query");
}

Y_UNIT_TEST(ExternalTableDdlLocationValidation) {
auto kikimr = NTestUtils::MakeKikimrRunner();
auto db = kikimr->GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
auto query = TStringBuilder() << R"(
CREATE EXTERNAL DATA SOURCE `/Root/ExternalDataSource` WITH (
SOURCE_TYPE="ObjectStorage",
LOCATION="my-bucket",
AUTH_METHOD="NONE"
);
CREATE EXTERNAL TABLE `/Root/ExternalTable` (
Key Uint64,
Value String
) WITH (
DATA_SOURCE="/Root/ExternalDataSource",
LOCATION="{"
);)";
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SCHEME_ERROR);
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Location '{' contains invalid wildcard:");
}
}

} // namespace NKikimr::NKqp
8 changes: 4 additions & 4 deletions ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5518,7 +5518,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
month Int64 NOT NULL
) WITH (
DATA_SOURCE=")" << externalDataSourceName << R"(",
LOCATION="/folder1/*",
LOCATION="/folder1/",
FORMAT="json_as_string",
`projection.enabled`="true",
`projection.year.type`="integer",
Expand All @@ -5543,7 +5543,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
UNIT_ASSERT(externalTable.ExternalTableInfo);
UNIT_ASSERT_VALUES_EQUAL(externalTable.ExternalTableInfo->Description.ColumnsSize(), 4);
UNIT_ASSERT_VALUES_EQUAL(externalTable.ExternalTableInfo->Description.GetDataSourcePath(), externalDataSourceName);
UNIT_ASSERT_VALUES_EQUAL(externalTable.ExternalTableInfo->Description.GetLocation(), "/folder1/*");
UNIT_ASSERT_VALUES_EQUAL(externalTable.ExternalTableInfo->Description.GetLocation(), "/folder1/");
}

Y_UNIT_TEST(CreateExternalTableWithUpperCaseSettings) {
Expand All @@ -5566,7 +5566,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
Month Int64 NOT NULL
) WITH (
DATA_SOURCE=")" << externalDataSourceName << R"(",
LOCATION="/folder1/*",
LOCATION="/folder1/",
FORMAT="json_as_string",
`projection.enabled`="true",
`projection.Year.type`="integer",
Expand All @@ -5591,7 +5591,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
UNIT_ASSERT(externalTable.ExternalTableInfo);
UNIT_ASSERT_VALUES_EQUAL(externalTable.ExternalTableInfo->Description.ColumnsSize(), 4);
UNIT_ASSERT_VALUES_EQUAL(externalTable.ExternalTableInfo->Description.GetDataSourcePath(), externalDataSourceName);
UNIT_ASSERT_VALUES_EQUAL(externalTable.ExternalTableInfo->Description.GetLocation(), "/folder1/*");
UNIT_ASSERT_VALUES_EQUAL(externalTable.ExternalTableInfo->Description.GetLocation(), "/folder1/");
}

Y_UNIT_TEST(DoubleCreateExternalTable) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/external_sources.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ message TSchema {

message TGeneral {
map<string, string> attributes = 1 [(Ydb.size).le = 100];
optional string location = 2;
}

message TObjectStorage {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ std::pair<TPathFilter, TEarlyStopChecker> MakeFilterRegexp(const TString& regex,
} else {
re = std::make_shared<RE2>(re2::StringPiece(regex), RE2::Options());
}
Y_ENSURE(re->ok());

const size_t numGroups = re->NumberOfCapturingGroups();
YQL_CLOG(DEBUG, ProviderS3)
Expand Down
33 changes: 29 additions & 4 deletions ydb/library/yql/providers/s3/object_listers/yql_s3_path.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,21 @@ TString RegexFromWildcards(const std::string_view& pattern) {
for (const char& c : escaped) {
switch (c) {
case '{':
result << "(?:";
group = true;
if (group) {
result << "\\{";
} else {
result << "(?:";
group = true;
}
slash = false;
break;
case '}':
result << ')';
group = false;
if (group) {
result << ')';
group = false;
} else {
result << "\\}";
}
slash = false;
break;
case ',':
Expand Down Expand Up @@ -89,7 +97,24 @@ TString RegexFromWildcards(const std::string_view& pattern) {
break;
}
}
Y_ENSURE(!group, "Found unterminated group");
Y_ENSURE(!slash, "Expected symbol after slash");
return result;
}

TString ValidateWildcards(const std::string_view& pattern) {
std::optional<size_t> groupStart;
for (size_t i = 0; i < pattern.size(); ++i) {
if (pattern[i] == '{' && !groupStart) {
groupStart = i;
} else if (pattern[i] == '}') {
groupStart = std::nullopt;
}
}
if (groupStart) {
return TStringBuilder() << "found unterminated group start at position " << *groupStart;
}
return {};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ TString EscapeRegex(const TString& str);
TString EscapeRegex(const std::string_view& str);

TString RegexFromWildcards(const std::string_view& pattern);
TString ValidateWildcards(const std::string_view& pattern);

}
21 changes: 21 additions & 0 deletions ydb/library/yql/providers/s3/object_listers/yql_s3_path_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,27 @@ Y_UNIT_TEST_SUITE(TPathTests) {
UNIT_ASSERT_VALUES_EQUAL(NormalizePath("/a/b/c/"), "a/b/c/");
UNIT_ASSERT_VALUES_EQUAL(NormalizePath("///a/b/c///"), "a/b/c/");
}

void TestRegexFromWildcardsSuccess(const TString& wildcards, const TString& expectedRegex) {
TString errorString = ValidateWildcards(wildcards);
UNIT_ASSERT_C(errorString.empty(), errorString);
UNIT_ASSERT_VALUES_EQUAL(RegexFromWildcards(wildcards), expectedRegex);
}

void TestRegexFromWildcardsFail(const TString& wildcards, const TString& expectedException, const TString& expectedError) {
UNIT_ASSERT_STRING_CONTAINS(ValidateWildcards(wildcards), expectedError);
UNIT_ASSERT_EXCEPTION_CONTAINS(RegexFromWildcards(wildcards), yexception, expectedException);
}

Y_UNIT_TEST(TestRegexFromWildcards) {
TestRegexFromWildcardsSuccess("first,test\\_{alt1,alt2}_text", "first\\,test\\\\_(?:alt1|alt2)_text");
TestRegexFromWildcardsSuccess("hello.*world?str", "hello\\..*world.str");
TestRegexFromWildcardsSuccess("many_{},{alt1,al?t2,al*t3},{alt4}_alts", "many_(?:)\\,(?:alt1|al.t2|al.*t3)\\,(?:alt4)_alts");
TestRegexFromWildcardsSuccess("hello}{}}world", "hello\\}(?:)\\}world");
TestRegexFromWildcardsSuccess("hello{{{}world", "hello(?:\\{\\{)world");

TestRegexFromWildcardsFail("hello{}}{world", "Found unterminated group", "found unterminated group start at position 8");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,10 @@ class TS3IODiscoveryTransformer : public TGraphTransformerBase {
if (!FindFilePattern(settings, ctx, filePattern)) {
return false;
}
if (TString errorString = NS3::ValidateWildcards(filePattern)) {
ctx.AddError(TIssue(ctx.GetPosition(read.Pos()), TStringBuilder() << "File pattern '" << filePattern << "' contains invalid wildcard: " << errorString));
return false;
}
const TString effectiveFilePattern = filePattern ? filePattern : "*";

TVector<TString> paths;
Expand Down Expand Up @@ -763,6 +767,11 @@ class TS3IODiscoveryTransformer : public TGraphTransformerBase {
}

for (const auto& path : paths) {
if (TString errorString = NS3::ValidateWildcards(path)) {
ctx.AddError(TIssue(ctx.GetPosition(read.Pos()), TStringBuilder() << "Path '" << path << "' contains invalid wildcard: " << errorString));
return false;
}

// each path in CONCAT() can generate multiple list requests for explicit partitioning
TVector<TListRequest> reqs;

Expand Down
Loading