Skip to content

Commit 098973e

Browse files
authored
Merge pull request #10 from red-data-tools/fix-not-using-schema
Fix not using schema
2 parents f17db76 + ab97bb2 commit 098973e

File tree

2 files changed

+10
-4
lines changed

2 files changed

+10
-4
lines changed

lib/fluent/plugin/s3_compressor_arrow.rb

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,14 @@ class ArrowCompressor < Compressor
2222
def configure(conf)
2323
super
2424

25-
@arrow_schema = ::Arrow::Schema.new(@compress.schema)
2625
if INVALID_COMBINATIONS[@compress.arrow_format]&.include? @compress.arrow_compression
2726
raise Fluent::ConfigError, "#{@compress.arrow_format} unsupported with #{@compress.arrow_format}"
2827
end
28+
29+
@arrow_schema = Arrow::Schema.new(@compress.schema)
30+
@options = Arrow::JSONReadOptions.new
31+
@options.schema = @arrow_schema
32+
@options.unexpected_field_behavior = :ignore
2933
end
3034

3135
def ext
@@ -39,7 +43,7 @@ def content_type
3943
def compress(chunk, tmp)
4044
buffer = Arrow::Buffer.new(chunk.read)
4145
stream = Arrow::BufferInputStream.new(buffer)
42-
table = Arrow::JSONReader.new(stream)
46+
table = Arrow::JSONReader.new(stream, @options)
4347

4448
table.read.save(tmp,
4549
format: @compress.arrow_format,

test/plugin/test_s3_compressor_arrow.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,15 +48,17 @@ def test_compress
4848

4949
chunk = Fluent::Plugin::Buffer::MemoryChunk.new(Object.new)
5050
d1 = {"test_string" => 'record1', "test_uint64" => 1}
51-
d2 = {"test_string" => 'record2', "test_uint64" => 2}
51+
d2 = {"test_string" => 'record2', "test_uint64" => 2, "unexpected_field" => false}
52+
expected_d2 = d2.dup
53+
expected_d2.delete "unexpected_field"
5254
chunk.append([d1.to_json + "\n", d2.to_json + "\n"])
5355

5456
Tempfile.create do |tmp|
5557
c.compress(chunk, tmp)
5658
Arrow::MemoryMappedInputStream.open(tmp.path) do |input|
5759
reader = Arrow::RecordBatchFileReader.new(input)
5860
reader.each do |record_batch|
59-
assert_equal([d1, d2], record_batch.collect(&:to_h))
61+
assert_equal([d1, expected_d2], record_batch.collect(&:to_h))
6062
end
6163
end
6264
end

0 commit comments

Comments
 (0)