Skip to content

YQ improved s3 read / write partitions validation #20081

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions ydb/core/external_sources/object_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,19 @@ struct TObjectStorageExternalSource : public IExternalSource {
} else if (key.StartsWith("projection.") || key == "storage.location.template") {
objectStorage.mutable_projection()->insert({key, value});
} else if (lowerKey == "partitioned_by") {
auto json = NSc::TValue::FromJsonThrow(value);
for (const auto& column: json.GetArray()) {
*objectStorage.add_partitioned_by() = column;
try {
const auto json = NSc::TValue::FromJsonThrow(value);
if (!json.IsArray()) {
throw TExternalSourceException() << "partitioned_by must be an array of column names";
}
for (const auto& column: json.GetArray()) {
if (!column.IsString()) {
throw TExternalSourceException() << "partitioned_by must be an array of strings";
}
*objectStorage.add_partitioned_by() = column;
}
} catch (const std::exception& e) {
throw TExternalSourceException() << "Failed to parse partitioned_by: " << e.what();
}
} else if (IsIn({"file_pattern"sv, "data.interval.unit"sv, "data.datetime.format_name"sv, "data.datetime.format"sv, "data.timestamp.format_name"sv, "data.timestamp.format"sv, "data.date.format"sv, "csv_delimiter"sv}, lowerKey)) {
objectStorage.mutable_format_setting()->insert({lowerKey, value});
Expand Down
20 changes: 20 additions & 0 deletions ydb/core/external_sources/object_storage_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,26 @@ Y_UNIT_TEST_SUITE(ObjectStorageTest) {
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Location '*' contains wildcards");
}
}

Y_UNIT_TEST(FailedPartitionedByValidation) {
const auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false, false);
NKikimrExternalSources::TSchema schema;
{
NKikimrExternalSources::TGeneral general;
general.mutable_attributes()->emplace("partitioned_by", "{\"year\": \"2025\"}");
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "partitioned_by must be an array of column names");
}
{
NKikimrExternalSources::TGeneral general;
general.mutable_attributes()->emplace("partitioned_by", "[{\"year\": \"2025\"}]");
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "partitioned_by must be an array of strings");
}
{
NKikimrExternalSources::TGeneral general;
general.mutable_attributes()->emplace("partitioned_by", "[{");
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Failed to parse partitioned_by:");
}
}
}

} // NKikimr
76 changes: 76 additions & 0 deletions ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2687,6 +2687,82 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
UNIT_ASSERT(parser.TryNextRow());
UNIT_ASSERT(!parser.ColumnParser(0).GetOptionalString());
}

Y_UNIT_TEST(TestRawFormatInsertValidation) {
const TString bucket = "test_raw_format_insert_validation_bucket";
CreateBucket(bucket);

auto kikimr = NTestUtils::MakeKikimrRunner();

auto tc = kikimr->GetTableClient();
auto session = tc.CreateSession().GetValueSync().GetSession();
{
const TString query = fmt::format(R"(
CREATE EXTERNAL DATA SOURCE `test_bucket` WITH (
SOURCE_TYPE="ObjectStorage",
LOCATION="{location}",
AUTH_METHOD="NONE"
);)",
"location"_a = GetBucketLocation(bucket)
);
const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
}

auto db = kikimr->GetQueryClient();
const TString query = R"(
INSERT INTO test_bucket.`/result/` WITH (
FORMAT = "raw",
SCHEMA (
data String??
)
)
(data)
VALUES
("some_string")
)";
const auto result = db.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync();
const auto& issues = result.GetIssues().ToString();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::GENERIC_ERROR, issues);
UNIT_ASSERT_STRING_CONTAINS(issues, "Only column with primitive type allowed for raw format");
}

Y_UNIT_TEST(TestPartitionedByInsertValidation) {
const TString bucket = "test_partitioned_by_insert_validation_bucket";
CreateBucket(bucket);

auto kikimr = NTestUtils::MakeKikimrRunner();

auto tc = kikimr->GetTableClient();
auto session = tc.CreateSession().GetValueSync().GetSession();
{
const TString query = fmt::format(R"(
CREATE EXTERNAL DATA SOURCE `test_bucket` WITH (
SOURCE_TYPE="ObjectStorage",
LOCATION="{location}",
AUTH_METHOD="NONE"
);)",
"location"_a = GetBucketLocation(bucket)
);
const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
}

auto db = kikimr->GetQueryClient();
const TString query = R"(
INSERT INTO test_bucket.`/result/` WITH (
FORMAT = "csv_with_names",
PARTITIONED_BY = (data)
)
(data)
VALUES
("some_string")
)";
const auto result = db.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync();
const auto& issues = result.GetIssues().ToString();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::GENERIC_ERROR, issues);
UNIT_ASSERT_STRING_CONTAINS(issues, "Write schema contains no columns except partitioning columns.");
}
}

} // namespace NKikimr::NKqp
10 changes: 7 additions & 3 deletions ydb/library/yql/providers/s3/common/util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,13 @@ bool ValidateS3WriteSchema(TPositionHandle pos, std::string_view format, const T
return false;
}

const TDataExprType* rowType;
bool isOptional;
return EnsureDataOrOptionalOfData(pos, schemaStructRowType->GetItems().front()->GetItemType(), isOptional, rowType, ctx);
const auto* rowType = schemaStructRowType->GetItems().front()->GetItemType();
if (rowType->GetKind() != ETypeAnnotationKind::Data) {
ctx.AddError(TIssue(ctx.GetPosition(pos), TStringBuilder() << "Only column with primitive type allowed for raw format (you have field with type " << *rowType << ")"));
return false;
}

return true;
}

if (format == "json_list"sv) {
Expand Down
Loading
Loading