Skip to content

Commit adfd84c

Browse files
fix for YQ-3286 (#8965)
1 parent d4e8297 commit adfd84c

File tree

2 files changed

+49
-12
lines changed

2 files changed

+49
-12
lines changed

ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,12 @@ struct TS3ReadError : public yexception {
150150
using yexception::yexception;
151151
};
152152
153+
void ThrowParquetNotOk(arrow::Status status) {
154+
if (!status.ok()) {
155+
throw parquet::ParquetException(status.ToString());
156+
}
157+
}
158+
153159
using namespace NKikimr::NMiniKQL;
154160
155161
ui64 SubtractSaturating(ui64 lhs, ui64 rhs) {
@@ -641,8 +647,8 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
641647
642648
// init the 1st reader, get meta/rg count
643649
readers.resize(1);
644-
THROW_ARROW_NOT_OK(builder.Open(std::make_shared<THttpRandomAccessFile>(this, RetryStuff->SizeLimit)));
645-
THROW_ARROW_NOT_OK(builder.Build(&readers[0]));
650+
ThrowParquetNotOk(builder.Open(std::make_shared<THttpRandomAccessFile>(this, RetryStuff->SizeLimit)));
651+
ThrowParquetNotOk(builder.Build(&readers[0]));
646652
auto fileMetadata = readers[0]->parquet_reader()->metadata();
647653
648654
bool hasPredicate = ReadSpec->Predicate.payload_case() != NYql::NConnector::NApi::TPredicate::PayloadCase::PAYLOAD_NOT_SET;
@@ -651,7 +657,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
651657
652658
if (numGroups) {
653659
std::shared_ptr<arrow::Schema> schema;
654-
THROW_ARROW_NOT_OK(readers[0]->GetSchema(&schema));
660+
ThrowParquetNotOk(readers[0]->GetSchema(&schema));
655661
std::vector<int> columnIndices;
656662
std::vector<TColumnConverter> columnConverters;
657663
@@ -688,17 +694,17 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
688694
// init other readers if any
689695
readers.resize(readerCount);
690696
for (ui64 i = 1; i < readerCount; i++) {
691-
THROW_ARROW_NOT_OK(builder.Open(std::make_shared<THttpRandomAccessFile>(this, RetryStuff->SizeLimit),
697+
ThrowParquetNotOk(builder.Open(std::make_shared<THttpRandomAccessFile>(this, RetryStuff->SizeLimit),
692698
parquet::default_reader_properties(),
693699
fileMetadata));
694-
THROW_ARROW_NOT_OK(builder.Build(&readers[i]));
700+
ThrowParquetNotOk(builder.Build(&readers[i]));
695701
}
696702
}
697703
698704
for (ui64 i = 0; i < readerCount; i++) {
699705
if (!columnIndices.empty()) {
700706
CurrentRowGroupIndex = i;
701-
THROW_ARROW_NOT_OK(readers[i]->WillNeedRowGroups({ hasPredicate ? static_cast<int>(matchedRowGroups[i]) : static_cast<int>(i) }, columnIndices));
707+
ThrowParquetNotOk(readers[i]->WillNeedRowGroups({ hasPredicate ? static_cast<int>(matchedRowGroups[i]) : static_cast<int>(i) }, columnIndices));
702708
SourceContext->IncChunkCount();
703709
}
704710
RowGroupReaderIndex[i] = i;
@@ -740,7 +746,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
740746
std::shared_ptr<arrow::Table> table;
741747
742748
LOG_CORO_D("Decode RowGroup " << readyGroupIndex << " of " << numGroups << " from reader " << readyReaderIndex);
743-
THROW_ARROW_NOT_OK(readers[readyReaderIndex]->DecodeRowGroups({ hasPredicate ? static_cast<int>(matchedRowGroups[readyGroupIndex]) : static_cast<int>(readyGroupIndex) }, columnIndices, &table));
749+
ThrowParquetNotOk(readers[readyReaderIndex]->DecodeRowGroups({ hasPredicate ? static_cast<int>(matchedRowGroups[readyGroupIndex]) : static_cast<int>(readyGroupIndex) }, columnIndices, &table));
744750
readyGroupCount++;
745751
746752
auto downloadedBytes = ReadInflightSize[readyGroupIndex];
@@ -776,7 +782,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
776782
if (nextGroup < numGroups) {
777783
if (!columnIndices.empty()) {
778784
CurrentRowGroupIndex = nextGroup;
779-
THROW_ARROW_NOT_OK(readers[readyReaderIndex]->WillNeedRowGroups({ hasPredicate ? static_cast<int>(nextGroup) : static_cast<int>(nextGroup) }, columnIndices));
785+
ThrowParquetNotOk(readers[readyReaderIndex]->WillNeedRowGroups({ hasPredicate ? static_cast<int>(nextGroup) : static_cast<int>(nextGroup) }, columnIndices));
780786
SourceContext->IncChunkCount();
781787
}
782788
RowGroupReaderIndex[nextGroup] = readyReaderIndex;
@@ -810,11 +816,11 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
810816
properties.set_cache_options(arrow::io::CacheOptions::LazyDefaults());
811817
properties.set_pre_buffer(true);
812818
builder.properties(properties);
813-
THROW_ARROW_NOT_OK(builder.Open(arrowFile));
814-
THROW_ARROW_NOT_OK(builder.Build(&fileReader));
819+
ThrowParquetNotOk(builder.Open(arrowFile));
820+
ThrowParquetNotOk(builder.Build(&fileReader));
815821
816822
std::shared_ptr<arrow::Schema> schema;
817-
THROW_ARROW_NOT_OK(fileReader->GetSchema(&schema));
823+
ThrowParquetNotOk(fileReader->GetSchema(&schema));
818824
std::vector<int> columnIndices;
819825
std::vector<TColumnConverter> columnConverters;
820826
@@ -833,7 +839,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
833839
834840
std::shared_ptr<arrow::Table> table;
835841
ui64 ingressBytes = IngressBytes;
836-
THROW_ARROW_NOT_OK(fileReader->ReadRowGroup(group, columnIndices, &table));
842+
ThrowParquetNotOk(fileReader->ReadRowGroup(group, columnIndices, &table));
837843
ui64 downloadedBytes = IngressBytes - ingressBytes;
838844
auto reader = std::make_unique<arrow::TableBatchReader>(*table);
839845

ydb/tests/fq/s3/test_s3_0.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -762,6 +762,37 @@ def test_bad_format(self, kikimr, s3, client, runtime_listing, unique_prefix):
762762
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
763763
client.wait_query_status(query_id, fq.QueryMeta.FAILED)
764764

765+
@yq_v2
766+
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
767+
def test_bad_request_on_invalid_parquet(self, kikimr, s3, client, unique_prefix):
768+
resource = boto3.resource(
769+
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
770+
)
771+
772+
bucket = resource.Bucket("bbucket")
773+
bucket.create(ACL='public-read')
774+
bucket.objects.all().delete()
775+
776+
s3_client = boto3.client(
777+
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
778+
)
779+
780+
s3_client.put_object(Body='not a parquet file', Bucket='bbucket', Key='file.txt', ContentType='text/plain')
781+
782+
kikimr.control_plane.wait_bootstrap(1)
783+
storage_connection_name = unique_prefix + "badbucket"
784+
client.create_storage_connection(storage_connection_name, "bbucket")
785+
786+
sql = f'''
787+
select * from `{storage_connection_name}`.`file.txt` with (format=parquet, schema (data string));
788+
'''
789+
790+
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
791+
client.wait_query_status(query_id, fq.QueryMeta.FAILED)
792+
793+
error_message = str(client.describe_query(query_id).result)
794+
assert ("Query failed with code BAD_REQUEST" in error_message) and ("Parquet magic bytes not found in footer." in error_message)
795+
765796
@yq_v1
766797
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
767798
@pytest.mark.parametrize("mvp_external_ydb_endpoint", [{"endpoint": os.getenv("YDB_ENDPOINT")}], indirect=True)

0 commit comments

Comments
 (0)