Skip to content

Commit 5a68445

Browse files
Decompression support for schema inference (#7632)
1 parent 938db49 commit 5a68445

File tree

13 files changed

+159
-8
lines changed

13 files changed

+159
-8
lines changed

ydb/core/external_sources/object_storage.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,7 @@ struct TObjectStorageExternalSource : public IExternalSource {
349349
meta->Attributes.erase("withinfer");
350350

351351
auto fileFormat = NObjectStorage::NInference::ConvertFileFormat(*format);
352-
auto arrowFetcherId = ActorSystem->Register(NObjectStorage::NInference::CreateArrowFetchingActor(s3FetcherId, fileFormat));
352+
auto arrowFetcherId = ActorSystem->Register(NObjectStorage::NInference::CreateArrowFetchingActor(s3FetcherId, fileFormat, meta->Attributes));
353353
auto arrowInferencinatorId = ActorSystem->Register(NObjectStorage::NInference::CreateArrowInferencinator(arrowFetcherId, fileFormat, meta->Attributes));
354354

355355
return afterListing.Apply([arrowInferencinatorId, meta, actorSystem = ActorSystem](const NThreading::TFuture<TString>& pathFut) {

ydb/core/external_sources/object_storage/inference/arrow_fetcher.cpp

Lines changed: 58 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,24 @@
1313
#include <ydb/core/external_sources/object_storage/events.h>
1414
#include <ydb/library/actors/core/actor_bootstrapped.h>
1515
#include <ydb/library/actors/core/hfunc.h>
16+
#include <ydb/library/yql/providers/s3/compressors/factory.h>
17+
#include <ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadBufferFromString.h>
1618

1719
namespace NKikimr::NExternalSource::NObjectStorage::NInference {
1820

1921
class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher> {
2022
static constexpr uint64_t PrefixSize = 10_MB;
2123
public:
22-
TArrowFileFetcher(NActors::TActorId s3FetcherId, EFileFormat format)
24+
TArrowFileFetcher(NActors::TActorId s3FetcherId, EFileFormat format, const THashMap<TString, TString>& params)
2325
: S3FetcherId_{s3FetcherId}
2426
, Format_{format}
2527
{
2628
Y_ABORT_UNLESS(IsArrowInferredFormat(Format_));
29+
30+
auto decompression = params.FindPtr("compression");
31+
if (decompression) {
32+
DecompressionFormat_ = *decompression;
33+
}
2734
}
2835

2936
void Bootstrap() {
@@ -67,6 +74,15 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
6774

6875
const auto& request = requestIt->second;
6976

77+
TString data = std::move(response.Data);
78+
if (DecompressionFormat_) {
79+
auto decompressedData = DecompressFile(data, request, ctx);
80+
if (!decompressedData) {
81+
return;
82+
}
83+
data = std::move(*decompressedData);
84+
}
85+
7086
std::shared_ptr<arrow::io::RandomAccessFile> file;
7187
switch (Format_) {
7288
case EFileFormat::CsvWithNames:
@@ -76,7 +92,7 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
7692
if (Format_ == EFileFormat::TsvWithNames) {
7793
options.delimiter = '\t';
7894
}
79-
file = CleanupCsvFile(response.Data, request, options, ctx);
95+
file = CleanupCsvFile(data, request, options, ctx);
8096
ctx.Send(request.Requester, new TEvArrowFile(std::move(file), request.Path));
8197
break;
8298
}
@@ -135,6 +151,43 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
135151

136152
// Cutting file
137153

154+
TMaybe<TString> DecompressFile(const TString& data, const TRequest& request, const NActors::TActorContext& ctx) {
155+
try {
156+
NDB::ReadBufferFromString dataBuffer(data);
157+
auto decompressorBuffer = NYql::MakeDecompressor(dataBuffer, *DecompressionFormat_);
158+
if (!decompressorBuffer) {
159+
auto error = MakeError(
160+
request.Path,
161+
NFq::TIssuesIds::INTERNAL_ERROR,
162+
TStringBuilder{} << "unknown compression: " << *DecompressionFormat_ << ". Use one of: gzip, zstd, lz4, brotli, bzip2, xz"
163+
);
164+
SendError(ctx, error);
165+
return {};
166+
}
167+
168+
TStringBuilder decompressedData;
169+
while (!decompressorBuffer->eof() && decompressedData.size() < 10_MB) {
170+
decompressorBuffer->nextIfAtEnd();
171+
size_t maxDecompressedChunkSize = std::min(
172+
decompressorBuffer->available(),
173+
10_MB - decompressedData.size()
174+
);
175+
TString decompressedChunk{maxDecompressedChunkSize, ' '};
176+
decompressorBuffer->read(&decompressedChunk.front(), maxDecompressedChunkSize);
177+
decompressedData << decompressedChunk;
178+
}
179+
return std::move(decompressedData);
180+
} catch (const yexception& error) {
181+
auto errorEv = MakeError(
182+
request.Path,
183+
NFq::TIssuesIds::INTERNAL_ERROR,
184+
TStringBuilder{} << "couldn't decompress file, check compression params: " << error.what()
185+
);
186+
SendError(ctx, errorEv);
187+
return {};
188+
}
189+
}
190+
138191
std::shared_ptr<arrow::io::RandomAccessFile> CleanupCsvFile(const TString& data, const TRequest& request, const arrow::csv::ParseOptions& options, const NActors::TActorContext& ctx) {
139192
auto chunker = arrow::csv::MakeChunker(options);
140193
std::shared_ptr<arrow::Buffer> whole, partial;
@@ -183,10 +236,11 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
183236
// Fields
184237
NActors::TActorId S3FetcherId_;
185238
EFileFormat Format_;
239+
TMaybe<TString> DecompressionFormat_;
186240
std::unordered_map<TString, TRequest> InflightRequests_; // Path -> Request
187241
};
188242

189-
NActors::IActor* CreateArrowFetchingActor(NActors::TActorId s3FetcherId, EFileFormat format) {
190-
return new TArrowFileFetcher{s3FetcherId, format};
243+
NActors::IActor* CreateArrowFetchingActor(NActors::TActorId s3FetcherId, EFileFormat format, const THashMap<TString, TString>& params) {
244+
return new TArrowFileFetcher{s3FetcherId, format, params};
191245
}
192246
} // namespace NKikimr::NExternalSource::NObjectStorage::NInference

ydb/core/external_sources/object_storage/inference/arrow_fetcher.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,5 @@
55

66
namespace NKikimr::NExternalSource::NObjectStorage::NInference {
77

8-
NActors::IActor* CreateArrowFetchingActor(NActors::TActorId s3FetcherId, EFileFormat format);
8+
NActors::IActor* CreateArrowFetchingActor(NActors::TActorId s3FetcherId, EFileFormat format, const THashMap<TString, TString>& params);
99
} // namespace NKikimr::NExternalSource::NObjectStorage::NInference

ydb/core/external_sources/object_storage/inference/ut/arrow_inference_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class ArrowInferenceTest : public testing::Test {
5050

5151
NActors::TActorId RegisterInferencinator(TStringBuf formatStr) {
5252
auto format = NInference::ConvertFileFormat(formatStr);
53-
auto arrowFetcher = ActorSystem.Register(NInference::CreateArrowFetchingActor(S3ActorId, format), 1);
53+
auto arrowFetcher = ActorSystem.Register(NInference::CreateArrowFetchingActor(S3ActorId, format, {}), 1);
5454
return ActorSystem.Register(NInference::CreateArrowInferencinator(arrowFetcher, format, {}), 1);
5555
}
5656

ydb/core/external_sources/object_storage/inference/ut/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
GTEST()
22

33
PEERDIR(
4+
ydb/library/yql/public/udf/service/stub
45
ydb/core/external_sources/object_storage/inference
56
ydb/core/external_sources/object_storage
67
ydb/core/tx/scheme_board

ydb/core/external_sources/object_storage/inference/ya.make

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
LIBRARY()
22

3+
ADDINCL(
4+
ydb/library/yql/udfs/common/clickhouse/client/base
5+
ydb/library/yql/udfs/common/clickhouse/client/base/pcg-random
6+
ydb/library/yql/udfs/common/clickhouse/client/src
7+
)
8+
39
SRCS(
410
arrow_fetcher.cpp
511
arrow_inferencinator.cpp
@@ -9,6 +15,8 @@ PEERDIR(
915
contrib/libs/apache/arrow
1016

1117
ydb/core/external_sources/object_storage
18+
19+
ydb/library/yql/providers/s3/compressors
1220
)
1321

1422
END()
45 Bytes
Binary file not shown.
Binary file not shown.
66 Bytes
Binary file not shown.
Binary file not shown.
96 Bytes
Binary file not shown.
Binary file not shown.

ydb/tests/fq/s3/test_compressions.py

Lines changed: 89 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import ydb.public.api.protos.draft.fq_pb2 as fq
1111

1212
import ydb.tests.fq.s3.s3_helpers as s3_helpers
13-
from ydb.tests.tools.fq_runner.kikimr_utils import yq_all
13+
from ydb.tests.tools.fq_runner.kikimr_utils import yq_all, yq_v2
1414

1515

1616
class TestS3Compressions:
@@ -33,6 +33,26 @@ def validate_result(self, result_set):
3333
assert result_set.rows[0].items[0].bytes_value == b"yq"
3434
assert result_set.rows[0].items[1].int32_value == 0
3535
assert result_set.rows[0].items[2].bytes_value == b"abc"
36+
37+
def validate_result_csv(self, result_set):
38+
logging.debug(str(result_set))
39+
assert len(result_set.columns) == 3
40+
assert result_set.columns[0].name == "a"
41+
assert result_set.columns[0].type.optional_type.item.type_id == ydb.Type.INT64
42+
assert result_set.columns[1].name == "b"
43+
assert result_set.columns[1].type.type_id == ydb.Type.UTF8
44+
assert result_set.columns[2].name == "c"
45+
assert result_set.columns[2].type.optional_type.item.type_id == ydb.Type.DOUBLE
46+
assert len(result_set.rows) == 3
47+
assert result_set.rows[0].items[0].int64_value == 1
48+
assert result_set.rows[0].items[1].text_value == "hello"
49+
assert result_set.rows[0].items[2].double_value == 0.5
50+
assert result_set.rows[1].items[0].int64_value == 2
51+
assert result_set.rows[1].items[1].text_value == "world"
52+
assert result_set.rows[1].items[2].double_value == 0.25
53+
assert result_set.rows[2].items[0].int64_value == 3
54+
assert result_set.rows[2].items[1].text_value == "!"
55+
assert result_set.rows[2].items[2].double_value == 0.125
3656

3757
@yq_all
3858
@pytest.mark.parametrize(
@@ -70,6 +90,38 @@ def test_compression(self, kikimr, s3, client, filename, compression, unique_pre
7090
result_set = data.result.result_set
7191
self.validate_result(result_set)
7292

93+
@yq_v2
94+
@pytest.mark.parametrize(
95+
"filename, compression",
96+
[
97+
("test.csv.gz", "gzip"),
98+
("test.csv.lz4", "lz4"),
99+
("test.csv.br", "brotli"),
100+
("test.csv.bz2", "bzip2"),
101+
("test.csv.zst", "zstd"),
102+
("test.csv.xz", "xz"),
103+
],
104+
)
105+
def test_compression_inference(self, kikimr, s3, client, filename, compression, unique_prefix):
106+
self.create_bucket_and_upload_file(filename, s3, kikimr)
107+
storage_connection_name = unique_prefix + "fruitbucket"
108+
client.create_storage_connection(storage_connection_name, "fbucket")
109+
110+
sql = '''
111+
SELECT *
112+
FROM `{}`.`{}`
113+
WITH (format=csv_with_names, compression="{}", with_infer="true");
114+
'''.format(
115+
storage_connection_name, filename, compression
116+
)
117+
118+
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
119+
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
120+
121+
data = client.get_result_data(query_id)
122+
result_set = data.result.result_set
123+
self.validate_result_csv(result_set)
124+
73125
@yq_all
74126
@pytest.mark.parametrize(
75127
"filename, compression",
@@ -152,3 +204,39 @@ def test_invalid_compression(self, kikimr, s3, client, unique_prefix):
152204
assert (
153205
"Unknown compression: some_compression. Use one of: gzip, zstd, lz4, brotli, bzip2, xz" in describe_string
154206
)
207+
208+
@yq_v2
209+
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
210+
def test_invalid_compression_inference(self, kikimr, s3, client, unique_prefix):
211+
resource = boto3.resource(
212+
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
213+
)
214+
215+
bucket = resource.Bucket("fbucket")
216+
bucket.create(ACL='public-read')
217+
218+
s3_client = boto3.client(
219+
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
220+
)
221+
222+
fruits = R'''[{"name" : "banana", "price" : 3, "weight" : 100}]'''
223+
s3_client.put_object(Body=fruits, Bucket='fbucket', Key='fruits.json', ContentType='text/plain')
224+
kikimr.control_plane.wait_bootstrap(1)
225+
226+
storage_connection_name = unique_prefix + "fruitbucket"
227+
client.create_storage_connection(storage_connection_name, "fbucket")
228+
229+
sql = fR'''
230+
SELECT *
231+
FROM `{storage_connection_name}`.`fruits.json`
232+
WITH (format=csv_with_names, compression="gzip", with_infer="true");
233+
'''
234+
235+
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
236+
client.wait_query_status(query_id, fq.QueryMeta.FAILED)
237+
describe_result = client.describe_query(query_id).result
238+
logging.debug("Describe result: {}".format(describe_result))
239+
describe_string = "{}".format(describe_result)
240+
assert (
241+
"couldn\\'t decompress file, check compression params:" in describe_string
242+
)

0 commit comments

Comments
 (0)