1
1
require 'time'
2
2
require 'time_with_zone'
3
3
require 'json'
4
+ require 'bigdecimal'
4
5
require_relative 'helper'
5
6
6
7
module Embulk
@@ -14,6 +15,7 @@ class TypeCastError < StandardError; end
14
15
15
16
DEFAULT_TIMESTAMP_FORMAT = "%Y-%m-%d %H:%M:%S.%6N" # BigQuery timestamp format
16
17
DEFAULT_TIMEZONE = "UTC"
18
+ DEFAULT_SCALE = 9
17
19
18
20
# @param [Hash] task
19
21
# @option task [String] default_timestamp_format
@@ -29,13 +31,15 @@ def self.create_converters(task, schema)
29
31
column_name = column [ :name ]
30
32
embulk_type = column [ :type ]
31
33
column_option = column_options_map [ column_name ] || { }
34
+ scale = column_option [ 'scale' ] || DEFAULT_SCALE
32
35
self . new (
33
36
embulk_type , column_option [ 'type' ] ,
34
37
timestamp_format : column_option [ 'timestamp_format' ] ,
35
38
timezone : column_option [ 'timezone' ] ,
36
39
strict : column_option [ 'strict' ] ,
37
40
default_timestamp_format : default_timestamp_format ,
38
41
default_timezone : default_timezone ,
42
+ scale : scale
39
43
) . create_converter
40
44
end
41
45
end
@@ -46,7 +50,8 @@ def initialize(
46
50
embulk_type , type = nil ,
47
51
timestamp_format : nil , timezone : nil , strict : nil ,
48
52
default_timestamp_format : DEFAULT_TIMESTAMP_FORMAT ,
49
- default_timezone : DEFAULT_TIMEZONE
53
+ default_timezone : DEFAULT_TIMEZONE ,
54
+ scale : DEFAULT_SCALE
50
55
)
51
56
@embulk_type = embulk_type
52
57
@type = ( type || Helper . bq_type_from_embulk_type ( embulk_type ) ) . upcase
@@ -55,6 +60,7 @@ def initialize(
55
60
@timezone = timezone || default_timezone
56
61
@zone_offset = TimeWithZone . zone_offset ( @timezone )
57
62
@strict = strict . nil? ? true : strict
63
+ @scale = scale
58
64
end
59
65
60
66
def create_converter
@@ -231,6 +237,13 @@ def string_converter
231
237
JSON . parse ( val )
232
238
end
233
239
}
240
+ when 'NUMERIC'
241
+ Proc . new { |val |
242
+ next nil if val . nil?
243
+ with_typecast_error ( val ) do |val |
244
+ BigDecimal ( val ) . round ( @scale , BigDecimal ::ROUND_CEILING )
245
+ end
246
+ }
234
247
else
235
248
raise NotSupportedType , "cannot take column type #{ type } for string column"
236
249
end
0 commit comments