Skip to content

Commit 60658d0

Browse files
committed
Improved s3 read / write partitions validation
1 parent 851d661 commit 60658d0

File tree

6 files changed

+185
-71
lines changed

6 files changed

+185
-71
lines changed

ydb/core/external_sources/object_storage.cpp

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,19 @@ struct TObjectStorageExternalSource : public IExternalSource {
6363
} else if (key.StartsWith("projection.") || key == "storage.location.template") {
6464
objectStorage.mutable_projection()->insert({key, value});
6565
} else if (lowerKey == "partitioned_by") {
66-
auto json = NSc::TValue::FromJsonThrow(value);
67-
for (const auto& column: json.GetArray()) {
68-
*objectStorage.add_partitioned_by() = column;
66+
try {
67+
const auto json = NSc::TValue::FromJsonThrow(value);
68+
if (!json.IsArray()) {
69+
throw TExternalSourceException() << "partitioned_by must be an array of column names";
70+
}
71+
for (const auto& column: json.GetArray()) {
72+
if (!column.IsString()) {
73+
throw TExternalSourceException() << "partitioned_by must be an array of strings";
74+
}
75+
*objectStorage.add_partitioned_by() = column;
76+
}
77+
} catch (const std::exception& e) {
78+
throw TExternalSourceException() << "Failed to parse partitioned_by: " << e.what();
6979
}
7080
} else if (IsIn({"file_pattern"sv, "data.interval.unit"sv, "data.datetime.format_name"sv, "data.datetime.format"sv, "data.timestamp.format_name"sv, "data.timestamp.format"sv, "data.date.format"sv, "csv_delimiter"sv}, lowerKey)) {
7181
objectStorage.mutable_format_setting()->insert({lowerKey, value});

ydb/core/external_sources/object_storage_ut.cpp

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,26 @@ Y_UNIT_TEST_SUITE(ObjectStorageTest) {
8989
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Location '*' contains wildcards");
9090
}
9191
}
92+
93+
Y_UNIT_TEST(FailedPartitionedByValidation) {
94+
const auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false, false);
95+
NKikimrExternalSources::TSchema schema;
96+
{
97+
NKikimrExternalSources::TGeneral general;
98+
general.mutable_attributes()->emplace("partitioned_by", "{\"year\": \"2025\"}");
99+
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "partitioned_by must be an array of column names");
100+
}
101+
{
102+
NKikimrExternalSources::TGeneral general;
103+
general.mutable_attributes()->emplace("partitioned_by", "[{\"year\": \"2025\"}]");
104+
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "partitioned_by must be an array of strings");
105+
}
106+
{
107+
NKikimrExternalSources::TGeneral general;
108+
general.mutable_attributes()->emplace("partitioned_by", "[{");
109+
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Failed to parse partitioned_by:");
110+
}
111+
}
92112
}
93113

94114
} // NKikimr

ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2687,6 +2687,82 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
26872687
UNIT_ASSERT(parser.TryNextRow());
26882688
UNIT_ASSERT(!parser.ColumnParser(0).GetOptionalString());
26892689
}
2690+
2691+
Y_UNIT_TEST(TestRawFormatInsertValidation) {
2692+
const TString bucket = "test_raw_format_insert_validation_bucket";
2693+
CreateBucket(bucket);
2694+
2695+
auto kikimr = NTestUtils::MakeKikimrRunner();
2696+
2697+
auto tc = kikimr->GetTableClient();
2698+
auto session = tc.CreateSession().GetValueSync().GetSession();
2699+
{
2700+
const TString query = fmt::format(R"(
2701+
CREATE EXTERNAL DATA SOURCE `test_bucket` WITH (
2702+
SOURCE_TYPE="ObjectStorage",
2703+
LOCATION="{location}",
2704+
AUTH_METHOD="NONE"
2705+
);)",
2706+
"location"_a = GetBucketLocation(bucket)
2707+
);
2708+
const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
2709+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
2710+
}
2711+
2712+
auto db = kikimr->GetQueryClient();
2713+
const TString query = R"(
2714+
INSERT INTO test_bucket.`/result/` WITH (
2715+
FORMAT = "raw",
2716+
SCHEMA (
2717+
data String??
2718+
)
2719+
)
2720+
(data)
2721+
VALUES
2722+
("some_string")
2723+
)";
2724+
const auto result = db.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync();
2725+
const auto& issues = result.GetIssues().ToString();
2726+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::GENERIC_ERROR, issues);
2727+
UNIT_ASSERT_STRING_CONTAINS(issues, "Only column with primitive type allowed for raw format");
2728+
}
2729+
2730+
Y_UNIT_TEST(TestPartitionedByInsertValidation) {
2731+
const TString bucket = "test_partitioned_by_insert_validation_bucket";
2732+
CreateBucket(bucket);
2733+
2734+
auto kikimr = NTestUtils::MakeKikimrRunner();
2735+
2736+
auto tc = kikimr->GetTableClient();
2737+
auto session = tc.CreateSession().GetValueSync().GetSession();
2738+
{
2739+
const TString query = fmt::format(R"(
2740+
CREATE EXTERNAL DATA SOURCE `test_bucket` WITH (
2741+
SOURCE_TYPE="ObjectStorage",
2742+
LOCATION="{location}",
2743+
AUTH_METHOD="NONE"
2744+
);)",
2745+
"location"_a = GetBucketLocation(bucket)
2746+
);
2747+
const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
2748+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
2749+
}
2750+
2751+
auto db = kikimr->GetQueryClient();
2752+
const TString query = R"(
2753+
INSERT INTO test_bucket.`/result/` WITH (
2754+
FORMAT = "csv_with_names",
2755+
PARTITIONED_BY = (data)
2756+
)
2757+
(data)
2758+
VALUES
2759+
("some_string")
2760+
)";
2761+
const auto result = db.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync();
2762+
const auto& issues = result.GetIssues().ToString();
2763+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::GENERIC_ERROR, issues);
2764+
UNIT_ASSERT_STRING_CONTAINS(issues, "Write schema contains no columns except partitioning columns.");
2765+
}
26902766
}
26912767

26922768
} // namespace NKikimr::NKqp

ydb/library/yql/providers/s3/common/util.cpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -218,9 +218,13 @@ bool ValidateS3WriteSchema(TPositionHandle pos, std::string_view format, const T
218218
return false;
219219
}
220220

221-
const TDataExprType* rowType;
222-
bool isOptional;
223-
return EnsureDataOrOptionalOfData(pos, schemaStructRowType->GetItems().front()->GetItemType(), isOptional, rowType, ctx);
221+
const auto* rowType = schemaStructRowType->GetItems().front()->GetItemType();
222+
if (rowType->GetKind() != ETypeAnnotationKind::Data) {
223+
ctx.AddError(TIssue(ctx.GetPosition(pos), TStringBuilder() << "Only column with primitive type allowed for raw format (you have field with type " << *rowType << ")"));
224+
return false;
225+
}
226+
227+
return true;
224228
}
225229

226230
if (format == "json_list"sv) {

0 commit comments

Comments
 (0)