Skip to content

Commit 5a7b7d0

Browse files
authored
Merge pull request #16 from red-data-tools/impl-schema-catalog
Resolving the Arrow schema from AWS Glue
2 parents 0b435f9 + 101ebae commit 5a7b7d0

File tree

9 files changed

+289
-24
lines changed

9 files changed

+289
-24
lines changed

README.md

Lines changed: 66 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# fluent-plugin-s3-arrow
22

3-
Extends the [fluent-plugin-s3](https://github.com/fluent/fluent-plugin-s3) compression algorithm to enable red-arrow compression.
3+
Extends the [fluent-plugin-s3](https://github.com/fluent/fluent-plugin-s3) compression algorithm to enable [red-arrow](https://github.com/apache/arrow/tree/master/ruby/red-arrow) compression.
44

55
## Installation
66

@@ -33,7 +33,7 @@ $ bundle
3333

3434
## Configuration
3535

36-
Example of fluent-plugin-s3 configuration.
36+
Example of fluent-plugin-s3-arrow configuration.
3737

3838
```
3939
<match pattern>
@@ -47,14 +47,74 @@ Example of fluent-plugin-s3 configuration.
4747
4848
store_as arrow
4949
<arrow>
50-
schema [
51-
{"name": "test_string", "type": "string"},
52-
{"name": "test_uint64", "type": "uint64"}
53-
]
50+
format parquet
51+
compression gzip
52+
53+
schema_from static
54+
<static>
55+
schema [
56+
{"name": "test_string", "type": "string"},
57+
{"name": "test_uint64", "type": "uint64"}
58+
]
59+
</static>
5460
</arrow>
5561
</match>
5662
```
5763

64+
### format and compression
65+
66+
This plugin supports multiple columnar formats and compressions by using red-arrow. Valid settings are below.
67+
68+
| format | compression |
69+
| ---- | ---- |
70+
| arrow | gzip, zstd |
71+
| feather | zstd |
72+
| parquet | gzip, snappy, zstd |
73+
74+
### schema
75+
76+
Schema of columnar formats.
77+
#### schema_from static
78+
79+
Set the schema statically.
80+
81+
```
82+
schema_from static
83+
<static>
84+
schema [
85+
{"name": "test_string", "type": "string"},
86+
{"name": "test_uint64", "type": "uint64"}
87+
]
88+
</static>
89+
```
90+
91+
##### schema (required)
92+
93+
An array containing the names and types of the fields.
94+
#### schema_from glue
95+
96+
Retrieve the schema from the AWS Glue Data Catalog.
97+
98+
```
99+
schema_from glue
100+
<glue>
101+
catalog test_catalog
102+
database test_db
103+
table test_table
104+
</glue>
105+
```
106+
107+
##### catalog
108+
109+
The name of the data catalog for which to retrieve the definition. The default value is the same as the [AWS API CatalogId](https://docs.aws.amazon.com/glue/latest/webapi/API_GetTable.html).
110+
111+
##### database
112+
113+
The name of the database for which to retrieve the definition. The default value is `default`.
114+
##### table (required)
115+
116+
The name of the table for which to retrieve the definition.
117+
58118
## License
59119

60120
Apache License, Version 2.0

benchmark/prelude.rb

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,14 @@
1818
<arrow>
1919
format parquet
2020
compression gzip
21-
schema [
22-
{"name": "test_string", "type": "string"},
23-
{"name": "test_uint64", "type": "uint64"},
24-
{"name": "test_boolean", "type": "boolean"}
25-
]
21+
schema_from static
22+
<static>
23+
schema [
24+
{"name": "test_string", "type": "string"},
25+
{"name": "test_uint64", "type": "uint64"},
26+
{"name": "test_boolean", "type": "boolean"}
27+
]
28+
</static>
2629
</arrow>
2730
]
2831

fluent-plugin-s3-arrow.gemspec

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@ Gem::Specification.new do |spec|
2424
spec.add_development_dependency "faker"
2525
spec.add_development_dependency "rake", "~> 12.0"
2626
spec.add_development_dependency "test-unit", "~> 3.0"
27+
spec.add_development_dependency "test-unit-rr", "~> 1.0"
2728
spec.add_runtime_dependency "fluentd", [">= 0.14.10", "< 2"]
2829
spec.add_runtime_dependency "fluent-plugin-s3", ">= 1.0"
2930
spec.add_runtime_dependency "red-arrow", ">= 2.0"
3031
spec.add_runtime_dependency "red-parquet", ">= 2.0"
32+
spec.add_runtime_dependency "aws-sdk-glue", ">= 1.20.0"
3133
end

lib/fluent-plugin-s3-arrow/schemas.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
require "arrow"
2+
require "fluent-plugin-s3-arrow/schemas/aws_glue"
3+
4+
module FluentPluginS3Arrow
5+
module Schemas
6+
end
7+
end
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
require 'aws-sdk-glue'
2+
3+
module FluentPluginS3Arrow
4+
module Schemas
5+
class AWSGlue
6+
class Error < RuntimeError; end
7+
class ConvertError < Error; end
8+
class Field < Struct.new(:name, :type); end
9+
10+
def initialize(table_name, **options)
11+
@table_name = table_name
12+
@database_name = options.delete(:database_name) || "default"
13+
@catalog_id = options.delete(:catalog_id)
14+
@client = Aws::Glue::Client.new(options)
15+
end
16+
17+
def to_arrow
18+
glue_schema = fetch_glue_schema
19+
convert_to_arrow_schema(glue_schema)
20+
end
21+
22+
private
23+
def fetch_glue_schema
24+
glue_table = @client.get_table({
25+
catalog_id: @catalog_id,
26+
database_name: @database_name,
27+
name: @table_name
28+
})
29+
glue_table.table.storage_descriptor.columns
30+
end
31+
32+
def convert_to_arrow_schema(glue_schema)
33+
arrow_schema_description = glue_schema.map do |glue_field|
34+
convert_to_arrow_field_description(glue_field)
35+
end
36+
Arrow::Schema.new(arrow_schema_description)
37+
end
38+
39+
def convert_to_arrow_field_description(glue_field)
40+
arrow_field = {name: glue_field.name}
41+
case glue_field.type
42+
when "boolean", "float", "double"
43+
arrow_field[:type] = glue_field.type
44+
when "tinyint"
45+
arrow_field[:type] = "int8"
46+
when "smallint"
47+
arrow_field[:type] = "int16"
48+
when "int"
49+
arrow_field[:type] = "int32"
50+
when "bigint"
51+
arrow_field[:type] = "int64"
52+
when /\Achar/,/\Avarchar/,"string"
53+
arrow_field[:type] = "string"
54+
when "binary"
55+
arrow_field[:type] = "binary"
56+
when "date"
57+
arrow_field[:type] = "date32"
58+
when /\Aarray/
59+
arrow_field[:type] = "list"
60+
arrow_field[:field] = parse_array(glue_field.type)
61+
when /\Astruct/
62+
arrow_field[:type] = "struct"
63+
arrow_field[:fields] = parse_struct(glue_field.type)
64+
else
65+
# TODO: Need support for MAP, DECIMAL, TIMESTAMP type.
66+
raise ConvertError, "Input type is not supported: #{glue_field.type}"
67+
end
68+
arrow_field
69+
end
70+
71+
def parse_array(str)
72+
matched = str.match(/\Aarray<(.*)>\z/)
73+
raise ConvertError, "Parse error on array type: #{str}" if matched.nil?
74+
convert_to_arrow_field_description(Field.new("", matched[1]))
75+
end
76+
77+
def parse_struct(str)
78+
fields = []
79+
matched = str.match(/\Astruct<(.*)>\z/)
80+
raise ConvertError, "Parse error on struct type: #{str}" if matched.nil?
81+
each_struct_fields(matched[1]) do |name, type|
82+
fields << convert_to_arrow_field_description(Field.new(name, type))
83+
end
84+
fields
85+
end
86+
87+
def each_struct_fields(str)
88+
start, nest = 0, 0
89+
name = ""
90+
str.each_char.with_index do |c, i|
91+
case c
92+
when ':'
93+
if nest == 0
94+
name = str[start...i]
95+
start = i + 1
96+
end
97+
when '<'
98+
nest += 1
99+
when '>'
100+
nest -= 1
101+
when ','
102+
if nest == 0
103+
type = str[start...i]
104+
yield(name, type)
105+
start = i + 1
106+
end
107+
end
108+
end
109+
type = str[start..]
110+
yield(name, type)
111+
end
112+
113+
end
114+
end
115+
end

lib/fluent/plugin/s3_compressor_arrow.rb

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
require 'arrow'
22
require 'parquet'
3+
require 'fluent-plugin-s3-arrow/schemas'
34

45
module Fluent::Plugin
56
class S3Output
@@ -12,11 +13,21 @@ class ArrowCompressor < Compressor
1213
}
1314

1415
config_section :arrow, multi: false do
15-
config_param :schema, :array
1616
config_param :format, :enum, list: [:arrow, :feather, :parquet], default: :arrow
1717
SUPPORTED_COMPRESSION = [:gzip, :snappy, :zstd]
1818
config_param :compression, :enum, list: SUPPORTED_COMPRESSION, default: nil
1919
config_param :chunk_size, :integer, default: nil
20+
config_param :schema_from, :enum, list: [:static, :glue], default: :static
21+
22+
config_section :static, multi: false do
23+
config_param :schema, :array, default: nil
24+
end
25+
26+
config_section :glue, multi: false do
27+
config_param :catalog, :string, default: nil
28+
config_param :database, :string, default: "default"
29+
config_param :table, :string, default: nil
30+
end
2031
end
2132

2233
def configure(conf)
@@ -26,9 +37,8 @@ def configure(conf)
2637
raise Fluent::ConfigError, "#{@arrow.format} unsupported with #{@arrow.format}"
2738
end
2839

29-
@schema = Arrow::Schema.new(@arrow.schema)
3040
@options = Arrow::JSONReadOptions.new
31-
@options.schema = @schema
41+
@options.schema = resolve_schema
3242
@options.unexpected_field_behavior = :ignore
3343
end
3444

@@ -51,6 +61,21 @@ def compress(chunk, tmp)
5161
compression: @arrow.compression,
5262
)
5363
end
64+
65+
private
66+
67+
def resolve_schema
68+
case @arrow.schema_from
69+
when :static
70+
Arrow::Schema.new(@arrow.static.schema)
71+
when :glue
72+
glue_schema = FluentPluginS3Arrow::Schemas::AWSGlue.new(@arrow.glue.table, {
73+
catalog_id: @arrow.glue.catalog,
74+
database_name: @arrow.glue.database,
75+
})
76+
glue_schema.to_arrow
77+
end
78+
end
5479
end
5580
end
5681
end
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
require "helper"
2+
require "fluent-plugin-s3-arrow/schemas"
3+
4+
class AWSGlueTest < Test::Unit::TestCase
5+
def setup
6+
stub(Aws::Glue::Client).new
7+
@schema = FluentPluginS3Arrow::Schemas::AWSGlue.new('test')
8+
end
9+
10+
11+
def test_to_arrow
12+
stub(@schema).fetch_glue_schema{
13+
[
14+
Aws::Glue::Types::Column.new({name: "a", type: "boolean"}),
15+
Aws::Glue::Types::Column.new({name: "b", type: "tinyint"}),
16+
Aws::Glue::Types::Column.new({name: "c", type: "smallint"}),
17+
Aws::Glue::Types::Column.new({name: "d", type: "int"}),
18+
Aws::Glue::Types::Column.new({name: "e", type: "bigint"}),
19+
Aws::Glue::Types::Column.new({name: "f", type: "float"}),
20+
Aws::Glue::Types::Column.new({name: "g", type: "double"}),
21+
Aws::Glue::Types::Column.new({name: "h", type: "char(1)"}),
22+
Aws::Glue::Types::Column.new({name: "i", type: "varchar(1)"}),
23+
Aws::Glue::Types::Column.new({name: "j", type: "string"}),
24+
Aws::Glue::Types::Column.new({name: "k", type: "binary"}),
25+
Aws::Glue::Types::Column.new({name: "l", type: "date"}),
26+
Aws::Glue::Types::Column.new({name: "m", type: "array<array<string>>"}),
27+
Aws::Glue::Types::Column.new({name: "n", type: "struct<p1:string,p2:struct<c1:string,c2:string>,p3:string>"})
28+
]
29+
}
30+
actual = @schema.to_arrow
31+
expect = Arrow::Schema.new([
32+
{name: "a", type: "boolean"},
33+
{name: "b", type: "int8"},
34+
{name: "c", type: "int16"},
35+
{name: "d", type: "int32"},
36+
{name: "e", type: "int64"},
37+
{name: "f", type: "float"},
38+
{name: "g", type: "double"},
39+
{name: "h", type: "string"},
40+
{name: "i", type: "string"},
41+
{name: "j", type: "string"},
42+
{name: "k", type: "binary"},
43+
{name: "l", type: "date32"},
44+
{name: "m", type: "list", field: {name: "", type: "list", field: {name: "", type: "string"}}},
45+
{name: "n", type: "struct", fields: [{name: "p1", type: "string"},{name: "p2", type: "struct", fields: [{name: "c1", type: "string"},{name: "c2", type: "string"}]},{name: "p3", type: "string"}]}
46+
])
47+
48+
assert_equal expect, actual
49+
50+
end
51+
end

test/helper.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
$LOAD_PATH.unshift(File.expand_path("../../", __FILE__))
22
require "test-unit"
3+
require "test/unit/rr"
34
require "fluent/test"
45
require "fluent/test/driver/output"
56
require "fluent/test/helpers"

0 commit comments

Comments
 (0)