Skip to content

Commit 1651861

Browse files
committed
Improved s3 read / write partitions validation
1 parent e250218 commit 1651861

File tree

6 files changed

+185
-71
lines changed

6 files changed

+185
-71
lines changed

ydb/core/external_sources/object_storage.cpp

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,19 @@ struct TObjectStorageExternalSource : public IExternalSource {
6363
} else if (key.StartsWith("projection.") || key == "storage.location.template") {
6464
objectStorage.mutable_projection()->insert({key, value});
6565
} else if (lowerKey == "partitioned_by") {
66-
auto json = NSc::TValue::FromJsonThrow(value);
67-
for (const auto& column: json.GetArray()) {
68-
*objectStorage.add_partitioned_by() = column;
66+
try {
67+
const auto json = NSc::TValue::FromJsonThrow(value);
68+
if (!json.IsArray()) {
69+
throw TExternalSourceException() << "partitioned_by must be an array of column names";
70+
}
71+
for (const auto& column: json.GetArray()) {
72+
if (!column.IsString()) {
73+
throw TExternalSourceException() << "partitioned_by must be an array of strings";
74+
}
75+
*objectStorage.add_partitioned_by() = column;
76+
}
77+
} catch (const std::exception& e) {
78+
throw TExternalSourceException() << "Failed to parse partitioned_by: " << e.what();
6979
}
7080
} else if (IsIn({"file_pattern"sv, "data.interval.unit"sv, "data.datetime.format_name"sv, "data.datetime.format"sv, "data.timestamp.format_name"sv, "data.timestamp.format"sv, "data.date.format"sv, "csv_delimiter"sv}, lowerKey)) {
7181
objectStorage.mutable_format_setting()->insert({lowerKey, value});

ydb/core/external_sources/object_storage_ut.cpp

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,26 @@ Y_UNIT_TEST_SUITE(ObjectStorageTest) {
8989
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Location '*' contains wildcards");
9090
}
9191
}
92+
93+
Y_UNIT_TEST(FailedPartitionedByValidation) {
94+
const auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false, false);
95+
NKikimrExternalSources::TSchema schema;
96+
{
97+
NKikimrExternalSources::TGeneral general;
98+
general.mutable_attributes()->emplace("partitioned_by", "{\"year\": \"2025\"}");
99+
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "partitioned_by must be an array of column names");
100+
}
101+
{
102+
NKikimrExternalSources::TGeneral general;
103+
general.mutable_attributes()->emplace("partitioned_by", "[{\"year\": \"2025\"}]");
104+
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "partitioned_by must be an array of strings");
105+
}
106+
{
107+
NKikimrExternalSources::TGeneral general;
108+
general.mutable_attributes()->emplace("partitioned_by", "[{");
109+
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Failed to parse partitioned_by:");
110+
}
111+
}
92112
}
93113

94114
} // NKikimr

ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2620,6 +2620,82 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
26202620
auto kikimr = NTestUtils::MakeKikimrRunner(config);
26212621
ReadLargeParquetFiles(kikimr, "test_local_read_large_file_bucket");
26222622
}
2623+
2624+
Y_UNIT_TEST(TestRawFormatInsertValidation) {
2625+
const TString bucket = "test_raw_format_insert_validation_bucket";
2626+
CreateBucket(bucket);
2627+
2628+
auto kikimr = NTestUtils::MakeKikimrRunner();
2629+
2630+
auto tc = kikimr->GetTableClient();
2631+
auto session = tc.CreateSession().GetValueSync().GetSession();
2632+
{
2633+
const TString query = fmt::format(R"(
2634+
CREATE EXTERNAL DATA SOURCE `test_bucket` WITH (
2635+
SOURCE_TYPE="ObjectStorage",
2636+
LOCATION="{location}",
2637+
AUTH_METHOD="NONE"
2638+
);)",
2639+
"location"_a = GetBucketLocation(bucket)
2640+
);
2641+
const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
2642+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
2643+
}
2644+
2645+
auto db = kikimr->GetQueryClient();
2646+
const TString query = R"(
2647+
INSERT INTO test_bucket.`/result/` WITH (
2648+
FORMAT = "raw",
2649+
SCHEMA (
2650+
data String??
2651+
)
2652+
)
2653+
(data)
2654+
VALUES
2655+
("some_string")
2656+
)";
2657+
const auto result = db.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync();
2658+
const auto& issues = result.GetIssues().ToString();
2659+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::GENERIC_ERROR, issues);
2660+
UNIT_ASSERT_STRING_CONTAINS(issues, "Only column with primitive type allowed for raw format");
2661+
}
2662+
2663+
Y_UNIT_TEST(TestPartitionedByInsertValidation) {
2664+
const TString bucket = "test_partitioned_by_insert_validation_bucket";
2665+
CreateBucket(bucket);
2666+
2667+
auto kikimr = NTestUtils::MakeKikimrRunner();
2668+
2669+
auto tc = kikimr->GetTableClient();
2670+
auto session = tc.CreateSession().GetValueSync().GetSession();
2671+
{
2672+
const TString query = fmt::format(R"(
2673+
CREATE EXTERNAL DATA SOURCE `test_bucket` WITH (
2674+
SOURCE_TYPE="ObjectStorage",
2675+
LOCATION="{location}",
2676+
AUTH_METHOD="NONE"
2677+
);)",
2678+
"location"_a = GetBucketLocation(bucket)
2679+
);
2680+
const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
2681+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
2682+
}
2683+
2684+
auto db = kikimr->GetQueryClient();
2685+
const TString query = R"(
2686+
INSERT INTO test_bucket.`/result/` WITH (
2687+
FORMAT = "csv_with_names",
2688+
PARTITIONED_BY = (data)
2689+
)
2690+
(data)
2691+
VALUES
2692+
("some_string")
2693+
)";
2694+
const auto result = db.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync();
2695+
const auto& issues = result.GetIssues().ToString();
2696+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::GENERIC_ERROR, issues);
2697+
UNIT_ASSERT_STRING_CONTAINS(issues, "Write schema contains no columns except partitioning columns.");
2698+
}
26232699
}
26242700

26252701
} // namespace NKikimr::NKqp

ydb/library/yql/providers/s3/common/util.cpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -218,9 +218,13 @@ bool ValidateS3WriteSchema(TPositionHandle pos, std::string_view format, const T
218218
return false;
219219
}
220220

221-
const TDataExprType* rowType;
222-
bool isOptional;
223-
return EnsureDataOrOptionalOfData(pos, schemaStructRowType->GetItems().front()->GetItemType(), isOptional, rowType, ctx);
221+
const auto* rowType = schemaStructRowType->GetItems().front()->GetItemType();
222+
if (rowType->GetKind() != ETypeAnnotationKind::Data) {
223+
ctx.AddError(TIssue(ctx.GetPosition(pos), TStringBuilder() << "Only column with primitive type allowed for raw format (you have field with type " << *rowType << ")"));
224+
return false;
225+
}
226+
227+
return true;
224228
}
225229

226230
if (format == "json_list"sv) {

0 commit comments

Comments
 (0)