Skip to content

Commit e128490

Browse files
inference json support (#8102)
1 parent fe50fe3 commit e128490

File tree

8 files changed

+136
-49
lines changed

8 files changed

+136
-49
lines changed

ydb/core/external_sources/object_storage/inference/arrow_fetcher.cpp

Lines changed: 58 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
#include <arrow/buffer_builder.h>
66
#include <arrow/csv/chunker.h>
77
#include <arrow/csv/options.h>
8+
#include <arrow/json/chunker.h>
9+
#include <arrow/json/options.h>
810
#include <arrow/io/memory.h>
911
#include <arrow/util/endian.h>
1012

@@ -58,7 +60,9 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
5860

5961
switch (Format_) {
6062
case EFileFormat::CsvWithNames:
61-
case EFileFormat::TsvWithNames: {
63+
case EFileFormat::TsvWithNames:
64+
case EFileFormat::JsonEachRow:
65+
case EFileFormat::JsonList: {
6266
RequestPartialFile(std::move(localRequest), ctx, 0, 10_MB);
6367
break;
6468
}
@@ -114,6 +118,12 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
114118
ctx.Send(request.Requester, new TEvArrowFile(std::move(file), request.Path));
115119
break;
116120
}
121+
case EFileFormat::JsonEachRow:
122+
case EFileFormat::JsonList: {
123+
file = CleanupJsonFile(data, request, arrow::json::ParseOptions::Defaults(), ctx);
124+
ctx.Send(request.Requester, new TEvArrowFile(std::move(file), request.Path));
125+
break;
126+
}
117127
case EFileFormat::Undefined:
118128
default:
119129
Y_ABORT("Invalid format should be unreachable");
@@ -220,23 +230,7 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
220230
std::shared_ptr<arrow::io::RandomAccessFile> CleanupCsvFile(const TString& data, const TRequest& request, const arrow::csv::ParseOptions& options, const NActors::TActorContext& ctx) {
221231
auto chunker = arrow::csv::MakeChunker(options);
222232
std::shared_ptr<arrow::Buffer> whole, partial;
223-
auto arrowData = std::make_shared<arrow::Buffer>(nullptr, 0);
224-
{
225-
arrow::BufferBuilder builder;
226-
auto buildRes = builder.Append(data.data(), data.size());
227-
if (buildRes.ok()) {
228-
buildRes = builder.Finish(&arrowData);
229-
}
230-
if (!buildRes.ok()) {
231-
auto error = MakeError(
232-
request.Path,
233-
NFq::TIssuesIds::INTERNAL_ERROR,
234-
TStringBuilder{} << "couldn't consume buffer from S3Fetcher: " << buildRes.ToString()
235-
);
236-
SendError(ctx, error);
237-
return nullptr;
238-
}
239-
}
233+
auto arrowData = BuildBufferFromData(data, request, ctx);
240234
auto status = chunker->Process(arrowData, &whole, &partial);
241235

242236
if (!status.ok()) {
@@ -277,7 +271,50 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
277271
}
278272

279273
std::shared_ptr<arrow::io::RandomAccessFile> BuildParquetFileFromMetadata(const TString& data, const TRequest& request, const NActors::TActorContext& ctx) {
280-
auto arrowData = std::make_shared<arrow::Buffer>(nullptr, 0);
274+
auto arrowData = BuildBufferFromData(data, request, ctx);
275+
return std::make_shared<arrow::io::BufferReader>(std::move(arrowData));
276+
}
277+
278+
std::shared_ptr<arrow::io::RandomAccessFile> CleanupJsonFile(const TString& data, const TRequest& request, const arrow::json::ParseOptions& options, const NActors::TActorContext& ctx) {
279+
auto chunker = arrow::json::MakeChunker(options);
280+
std::shared_ptr<arrow::Buffer> whole, partial;
281+
auto arrowData = BuildBufferFromData(data, request, ctx);
282+
283+
if (Format_ == EFileFormat::JsonList) {
284+
auto empty = std::make_shared<arrow::Buffer>(nullptr, 0);
285+
int64_t count = 1;
286+
auto status = chunker->ProcessSkip(empty, arrowData, false, &count, &whole);
287+
288+
if (!status.ok()) {
289+
auto error = MakeError(
290+
request.Path,
291+
NFq::TIssuesIds::INTERNAL_ERROR,
292+
TStringBuilder{} << "couldn't run arrow json chunker for " << request.Path << ": " << status.ToString()
293+
);
294+
SendError(ctx, error);
295+
return nullptr;
296+
}
297+
298+
arrowData = std::move(whole);
299+
}
300+
301+
auto status = chunker->Process(arrowData, &whole, &partial);
302+
303+
if (!status.ok()) {
304+
auto error = MakeError(
305+
request.Path,
306+
NFq::TIssuesIds::INTERNAL_ERROR,
307+
TStringBuilder{} << "couldn't run arrow json chunker for " << request.Path << ": " << status.ToString()
308+
);
309+
SendError(ctx, error);
310+
return nullptr;
311+
}
312+
313+
return std::make_shared<arrow::io::BufferReader>(std::move(whole));
314+
}
315+
316+
std::shared_ptr<arrow::Buffer> BuildBufferFromData(const TString& data, const TRequest& request, const NActors::TActorContext& ctx) {
317+
auto dataBuffer = std::make_shared<arrow::Buffer>(nullptr, 0);
281318
arrow::BufferBuilder builder;
282319
auto buildRes = builder.Append(data.data(), data.size());
283320
if (!buildRes.ok()) {
@@ -290,7 +327,7 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
290327
return nullptr;
291328
}
292329

293-
buildRes = builder.Finish(&arrowData);
330+
buildRes = builder.Finish(&dataBuffer);
294331
if (!buildRes.ok()) {
295332
auto error = MakeError(
296333
request.Path,
@@ -301,7 +338,7 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
301338
return nullptr;
302339
}
303340

304-
return std::make_shared<arrow::io::BufferReader>(std::move(arrowData));
341+
return dataBuffer;
305342
}
306343

307344
// Utility

ydb/core/external_sources/object_storage/inference/arrow_inferencinator.cpp

Lines changed: 59 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
#include <arrow/table.h>
44
#include <arrow/csv/options.h>
55
#include <arrow/csv/reader.h>
6+
#include <arrow/json/options.h>
7+
#include <arrow/json/reader.h>
68
#include <parquet/arrow/reader.h>
79

810
#include <ydb/core/external_sources/object_storage/events.h>
@@ -182,6 +184,10 @@ struct CsvConfig : public FormatConfig {
182184
arrow::csv::ConvertOptions ConvOpts = arrow::csv::ConvertOptions::Defaults();
183185
};
184186

187+
struct JsonConfig : public FormatConfig {
188+
arrow::json::ParseOptions ParseOpts = arrow::json::ParseOptions::Defaults();
189+
};
190+
185191
using TsvConfig = CsvConfig;
186192

187193
namespace {
@@ -190,23 +196,30 @@ using ArrowField = std::shared_ptr<arrow::Field>;
190196
using ArrowFields = std::vector<ArrowField>;
191197

192198
std::variant<ArrowFields, TString> InferCsvTypes(std::shared_ptr<arrow::io::RandomAccessFile> file, const CsvConfig& config) {
199+
int64_t fileSize;
200+
if (auto sizeStatus = file->GetSize().Value(&fileSize); !sizeStatus.ok()) {
201+
return TStringBuilder{} << "coudn't get file size: " << sizeStatus.ToString();
202+
}
203+
193204
std::shared_ptr<arrow::csv::TableReader> reader;
194-
auto fileSize = static_cast<int32_t>(file->GetSize().ValueOr(1 << 20));
195-
fileSize = std::min(fileSize, 1 << 20);
196205
auto readerStatus = arrow::csv::TableReader::Make(
197-
arrow::io::default_io_context(), std::move(file), arrow::csv::ReadOptions{.use_threads = false, .block_size = fileSize}, config.ParseOpts, config.ConvOpts
206+
arrow::io::default_io_context(),
207+
std::move(file),
208+
arrow::csv::ReadOptions{.use_threads = false, .block_size = static_cast<int32_t>(fileSize)},
209+
config.ParseOpts,
210+
config.ConvOpts
198211
)
199212
.Value(&reader);
200213

201214
if (!readerStatus.ok()) {
202-
return TString{TStringBuilder{} << "couldn't parse csv/tsv file, check format and compression params: " << readerStatus.ToString()};
215+
return TString{TStringBuilder{} << "couldn't open csv/tsv file, check format and compression params: " << readerStatus.ToString()};
203216
}
204217

205218
std::shared_ptr<arrow::Table> table;
206219
auto tableRes = reader->Read().Value(&table);
207220

208221
if (!tableRes.ok()) {
209-
return TStringBuilder{} << "couldn't parse csv/tsv file, check format and compression params: " << readerStatus.ToString();
222+
return TStringBuilder{} << "couldn't parse csv/tsv file, check format and compression params: " << tableRes.ToString();
210223
}
211224

212225
return table->fields();
@@ -217,24 +230,52 @@ std::variant<ArrowFields, TString> InferParquetTypes(std::shared_ptr<arrow::io::
217230
builder.properties(parquet::ArrowReaderProperties(false));
218231
auto openStatus = builder.Open(std::move(file));
219232
if (!openStatus.ok()) {
220-
return TStringBuilder{} << "couldn't parse parquet file, check format params: " << openStatus.ToString();
233+
return TStringBuilder{} << "couldn't open parquet file, check format params: " << openStatus.ToString();
221234
}
222235

223236
std::unique_ptr<parquet::arrow::FileReader> reader;
224237
auto readerStatus = builder.Build(&reader);
225238
if (!readerStatus.ok()) {
226-
return TStringBuilder{} << "couldn't parse parquet file, check format params: " << openStatus.ToString();
239+
return TStringBuilder{} << "couldn't read parquet file, check format params: " << readerStatus.ToString();
227240
}
228241

229242
std::shared_ptr<arrow::Schema> schema;
230243
auto schemaRes = reader->GetSchema(&schema);
231244
if (!schemaRes.ok()) {
232-
return TStringBuilder{} << "couldn't parse parquet file, check format params: " << openStatus.ToString();
245+
return TStringBuilder{} << "couldn't parse parquet file, check format params: " << schemaRes.ToString();
233246
}
234247

235248
return schema->fields();
236249
}
237250

251+
std::variant<ArrowFields, TString> InferJsonTypes(std::shared_ptr<arrow::io::RandomAccessFile> file, const JsonConfig& config) {
252+
int64_t fileSize;
253+
if (auto sizeStatus = file->GetSize().Value(&fileSize); !sizeStatus.ok()) {
254+
return TStringBuilder{} << "coudn't get file size: " << sizeStatus.ToString();
255+
}
256+
257+
std::shared_ptr<arrow::json::TableReader> reader;
258+
auto readerStatus = arrow::json::TableReader::Make(
259+
arrow::default_memory_pool(),
260+
std::move(file),
261+
arrow::json::ReadOptions{.use_threads = false, .block_size = static_cast<int32_t>(fileSize)},
262+
config.ParseOpts
263+
).Value(&reader);
264+
265+
if (!readerStatus.ok()) {
266+
return TString{TStringBuilder{} << "couldn't open json file, check format and compression params: " << readerStatus.ToString()};
267+
}
268+
269+
std::shared_ptr<arrow::Table> table;
270+
auto tableRes = reader->Read().Value(&table);
271+
272+
if (!tableRes.ok()) {
273+
return TString{TStringBuilder{} << "couldn't parse json file, check format and compression params: " << tableRes.ToString()};
274+
}
275+
276+
return table->fields();
277+
}
278+
238279
std::variant<ArrowFields, TString> InferType(EFileFormat format, std::shared_ptr<arrow::io::RandomAccessFile> file, const FormatConfig& config) {
239280
switch (format) {
240281
case EFileFormat::CsvWithNames:
@@ -243,6 +284,9 @@ std::variant<ArrowFields, TString> InferType(EFileFormat format, std::shared_ptr
243284
return InferCsvTypes(std::move(file), static_cast<const TsvConfig&>(config));
244285
case EFileFormat::Parquet:
245286
return InferParquetTypes(std::move(file));
287+
case EFileFormat::JsonEachRow:
288+
case EFileFormat::JsonList:
289+
return InferJsonTypes(std::move(file), static_cast<const JsonConfig&>(config));
246290
case EFileFormat::Undefined:
247291
default:
248292
return std::variant<ArrowFields, TString>{std::in_place_type_t<TString>{}, TStringBuilder{} << "unexpected format: " << ConvertFileFormat(format)};
@@ -259,12 +303,19 @@ std::unique_ptr<TsvConfig> MakeTsvConfig(const THashMap<TString, TString>& param
259303
return config;
260304
}
261305

306+
std::unique_ptr<JsonConfig> MakeJsonConfig(const THashMap<TString, TString>&) {
307+
return std::make_unique<JsonConfig>();
308+
}
309+
262310
std::unique_ptr<FormatConfig> MakeFormatConfig(EFileFormat format, const THashMap<TString, TString>& params) {
263311
switch (format) {
264312
case EFileFormat::CsvWithNames:
265313
return MakeCsvConfig(params);
266314
case EFileFormat::TsvWithNames:
267315
return MakeTsvConfig(params);
316+
case EFileFormat::JsonEachRow:
317+
case EFileFormat::JsonList:
318+
return MakeJsonConfig(params);
268319
case EFileFormat::Undefined:
269320
default:
270321
return nullptr;

ydb/core/external_sources/object_storage/inference/arrow_inferencinator.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ enum class EFileFormat {
99
CsvWithNames,
1010
TsvWithNames,
1111
JsonEachRow,
12+
JsonList,
1213
Parquet,
1314
};
1415

@@ -22,6 +23,9 @@ constexpr EFileFormat ConvertFileFormat(TStringBuf format) {
2223
if (format == "json_each_row") {
2324
return EFileFormat::JsonEachRow;
2425
}
26+
if (format == "json_list") {
27+
return EFileFormat::JsonList;
28+
}
2529
if (format == "parquet") {
2630
return EFileFormat::Parquet;
2731
}
@@ -37,6 +41,8 @@ constexpr TStringBuf ConvertFileFormat(EFileFormat format) {
3741
return "tsv_with_names";
3842
case EFileFormat::JsonEachRow:
3943
return "json_each_row";
44+
case EFileFormat::JsonList:
45+
return "json_list";
4046
case EFileFormat::Parquet:
4147
return "parquet";
4248
case EFileFormat::Undefined:

ydb/tests/fq/s3/test_format_data/test.json

Lines changed: 0 additions & 17 deletions
This file was deleted.
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{ "Fruit": "Banana", "Price": 3, "Weight": 100 }
2+
{ "Fruit": "Apple", "Price": 2, "Weight": 22 }
3+
{ "Fruit": "Pear", "Price": 15, "Weight": 33 }
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
[
2+
{ "Fruit": "Banana", "Price": 3, "Weight": 100 },
3+
{ "Fruit": "Apple", "Price": 2, "Weight": 22 },
4+
{ "Fruit": "Pear", "Price": 15, "Weight": 33 }
5+
]

ydb/tests/fq/s3/test_formats.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,8 @@ def validate_pg_result(self, result_set):
8686
[
8787
("test.csv", "csv_with_names"),
8888
("test.tsv", "tsv_with_names"),
89-
("test.json", "json_each_row"),
90-
("test.json", "json_list"),
89+
("test_each_row.json", "json_each_row"),
90+
("test_list.json", "json_list"),
9191
("test.parquet", "parquet"),
9292
],
9393
)
@@ -130,6 +130,8 @@ def test_format(self, kikimr, s3, client, filename, type_format, yq_version, uni
130130
[
131131
("test.csv", "csv_with_names"),
132132
("test.tsv", "tsv_with_names"),
133+
("test_each_row.json", "json_each_row"),
134+
("test_list.json", "json_list"),
133135
("test.parquet", "parquet"),
134136
],
135137
)

ydb/tests/fq/s3/test_s3_0.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ def test_inference_file_error(self, kikimr, s3, client, unique_prefix):
339339

340340
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
341341
client.wait_query_status(query_id, fq.QueryMeta.FAILED)
342-
assert "couldn\\'t parse csv/tsv file, check format and compression params:" in str(
342+
assert "couldn\\'t open csv/tsv file, check format and compression params:" in str(
343343
client.describe_query(query_id).result
344344
)
345345

0 commit comments

Comments
 (0)