Skip to content

Commit dd10cb7

Browse files
authored
Merge pull request #35 from trocco-io/merge_origin
Merge origin
2 parents 5b6658b + 11f283e commit dd10cb7

9 files changed

+216
-6
lines changed

CHANGELOG.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,12 @@
1+
## 0.7.5 - 2025-05-13
2+
* [enhancement] Add range partitioning support (Thanks to kitagry) #174
3+
4+
## 0.7.4 - 2024-12-19
5+
* [maintenance] Primary location unless location is set explicitly (Thanks to joker1007) #172
6+
7+
## 0.7.3 - 2024-08-28
8+
* [enhancement] Add TIME type conversion to string converter (Thanks to p-eye)
9+
110
## 0.7.2 - 2024-07-21
211
* [maintenance] Fix GitHub Actions #166
312
* [maintenance] Fix gcs_client in order to load data using gcs_bucket parameter (Thanks to kashira202111) #164

README.md

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,12 @@ Following options are same as [bq command-line tools](https://cloud.google.com/b
112112
| time_partitioning.type | string | required | nil | The only type supported is DAY, which will generate one partition per day based on data loading time. |
113113
| time_partitioning.expiration_ms | int | optional | nil | Number of milliseconds for which to keep the storage for a partition. |
114114
| time_partitioning.field | string | optional | nil | `DATE` or `TIMESTAMP` column used for partitioning |
115+
| range_partitioning | hash | optional | nil | See [Range Partitioning](#range-partitioning) |
116+
| range_partitioning.field | string | required | nil | `INT64` column used for partitioning |
117+
| range-partitioning.range | hash | required | nil | Defines the ranges for range paritioning |
118+
| range-partitioning.range.start | int | required | nil | The start of range partitioning, inclusive. |
119+
| range-partitioning.range.end | int | required | nil | The end of range partitioning, exclusive. |
120+
| range-partitioning.range.interval| int | required | nil | The width of each interval. |
115121
| clustering | hash | optional | nil | Currently, clustering is supported for partitioned tables, so must be used with `time_partitioning` option. See [clustered tables](https://cloud.google.com/bigquery/docs/clustered-tables) |
116122
| clustering.fields | array | required | nil | One or more fields on which data should be clustered. The order of the specified columns determines the sort order of the data. |
117123
| schema_update_options | array | optional | nil | (Experimental) List of `ALLOW_FIELD_ADDITION` or `ALLOW_FIELD_RELAXATION` or both. See [jobs#configuration.load.schemaUpdateOptions](https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.schemaUpdateOptions). NOTE for the current status: `schema_update_options` does not work for `copy` job, that is, is not effective for most of modes such as `append`, `replace` and `replace_backup`. `delete_in_advance` deletes origin table so does not need to update schema. Only `append_direct` can utilize schema update. |
@@ -332,8 +338,8 @@ Column options are used to aid guessing BigQuery schema, or to define conversion
332338
- boolean: `BOOLEAN`, `STRING` (default: `BOOLEAN`)
333339
- long: `BOOLEAN`, `INTEGER`, `FLOAT`, `STRING`, `TIMESTAMP` (default: `INTEGER`)
334340
- double: `INTEGER`, `FLOAT`, `STRING`, `TIMESTAMP` (default: `FLOAT`)
335-
- string: `BOOLEAN`, `INTEGER`, `FLOAT`, `STRING`, `TIMESTAMP`, `DATETIME`, `DATE`, `RECORD` (default: `STRING`)
336-
- timestamp: `INTEGER`, `FLOAT`, `STRING`, `TIMESTAMP`, `DATETIME`, `DATE` (default: `TIMESTAMP`)
341+
- string: `BOOLEAN`, `INTEGER`, `FLOAT`, `STRING`, `TIME`, `TIMESTAMP`, `DATETIME`, `DATE`, `RECORD` (default: `STRING`)
342+
- timestamp: `INTEGER`, `FLOAT`, `STRING`, `TIME`, `TIMESTAMP`, `DATETIME`, `DATE` (default: `TIMESTAMP`)
337343
- json: `STRING`, `RECORD` (default: `STRING`)
338344
- numeric: `STRING`
339345
- **mode**: BigQuery mode such as `NULLABLE`, `REQUIRED`, and `REPEATED` (string, default: `NULLABLE`)
@@ -458,6 +464,24 @@ MEMO: [jobs#configuration.load.schemaUpdateOptions](https://cloud.google.com/big
458464
to update the schema of the desitination table as a side effect of the load job, but it is not available for copy job.
459465
Thus, it was not suitable for embulk-output-bigquery idempotence modes, `append`, `replace`, and `replace_backup`, sigh.
460466

467+
### Range Partitioning
468+
469+
See also [Creating and Updating Range-Partitioned Tables](https://cloud.google.com/bigquery/docs/creating-partitioned-tables).
470+
471+
To load into a partition, specify `range_partitioning` and `table` parameter with a partition decorator as:
472+
473+
```yaml
474+
out:
475+
type: bigquery
476+
table: table_name$1
477+
range_partitioning:
478+
field: customer_id
479+
range:
480+
start: 1
481+
end: 99999
482+
interval: 1
483+
```
484+
461485
## Development
462486

463487
### Run example:

embulk-output-bigquery.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |spec|
22
spec.name = "embulk-output-bigquery"
3-
spec.version = "0.6.9.trocco.0.1.0"
3+
spec.version = "0.7.5.trocco.0.0.1"
44
spec.authors = ["Satoshi Akama", "Naotoshi Seo"]
55
spec.summary = "Google BigQuery output plugin for Embulk"
66
spec.description = "Embulk plugin that insert records to Google BigQuery."
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
in:
2+
type: file
3+
path_prefix: example/example.csv
4+
parser:
5+
type: csv
6+
charset: UTF-8
7+
newline: CRLF
8+
null_string: 'NULL'
9+
skip_header_lines: 1
10+
comment_line_marker: '#'
11+
columns:
12+
- {name: date, type: string}
13+
- {name: timestamp, type: timestamp, format: "%Y-%m-%d %H:%M:%S.%N", timezone: "+09:00"}
14+
- {name: "null", type: string}
15+
- {name: long, type: long}
16+
- {name: string, type: string}
17+
- {name: double, type: double}
18+
- {name: boolean, type: boolean}
19+
out:
20+
type: bigquery
21+
mode: replace
22+
auth_method: service_account
23+
json_keyfile: example/your-project-000.json
24+
dataset: your_dataset_name
25+
table: your_field_partitioned_table_name
26+
source_format: NEWLINE_DELIMITED_JSON
27+
compression: NONE
28+
auto_create_dataset: true
29+
auto_create_table: true
30+
schema_file: example/schema.json
31+
range_partitioning:
32+
field: 'long'
33+
range:
34+
start: 90
35+
end: 100
36+
interval: 1

lib/embulk/output/bigquery.rb

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ def self.configure(config, schema, task_count)
8989
'ignore_unknown_values' => config.param('ignore_unknown_values', :bool, :default => false),
9090
'allow_quoted_newlines' => config.param('allow_quoted_newlines', :bool, :default => false),
9191
'time_partitioning' => config.param('time_partitioning', :hash, :default => nil),
92+
'range_partitioning' => config.param('range_partitioning', :hash, :default => nil),
9293
'clustering' => config.param('clustering', :hash, :default => nil), # google-api-ruby-client >= v0.21.0
9394
'schema_update_options' => config.param('schema_update_options', :array, :default => nil),
9495
'merge_keys' => config.param('merge_keys', :array, :default => []),
@@ -229,14 +230,55 @@ def self.configure(config, schema, task_count)
229230
task['abort_on_error'] = (task['max_bad_records'] == 0)
230231
end
231232

233+
if task['time_partitioning'] && task['range_partitioning']
234+
raise ConfigError.new "`time_partitioning` and `range_partitioning` cannot be used at the same time"
235+
end
236+
232237
if task['time_partitioning']
233238
unless task['time_partitioning']['type']
234239
raise ConfigError.new "`time_partitioning` must have `type` key"
235240
end
236-
elsif Helper.has_partition_decorator?(task['table'])
241+
end
242+
243+
if Helper.has_partition_decorator?(task['table'])
244+
if task['range_partitioning']
245+
raise ConfigError.new "Partition decorators(`#{task['table']}`) don't support `range_partition`"
246+
end
237247
task['time_partitioning'] = {'type' => 'DAY'}
238248
end
239249

250+
if task['range_partitioning']
251+
unless task['range_partitioning']['field']
252+
raise ConfigError.new "`range_partitioning` must have `field` key"
253+
end
254+
unless task['range_partitioning']['range']
255+
raise ConfigError.new "`range_partitioning` must have `range` key"
256+
end
257+
258+
range = task['range_partitioning']['range']
259+
unless range['start']
260+
raise ConfigError.new "`range_partitioning` must have `range.start` key"
261+
end
262+
unless range['start'].is_a?(Integer)
263+
raise ConfigError.new "`range_partitioning.range.start` must be an integer"
264+
end
265+
unless range['end']
266+
raise ConfigError.new "`range_partitioning` must have `range.end` key"
267+
end
268+
unless range['end'].is_a?(Integer)
269+
raise ConfigError.new "`range_partitioning.range.end` must be an integer"
270+
end
271+
unless range['interval']
272+
raise ConfigError.new "`range_partitioning` must have `range.interval` key"
273+
end
274+
unless range['interval'].is_a?(Integer)
275+
raise ConfigError.new "`range_partitioning.range.interval` must be an integer"
276+
end
277+
if range['start'] + range['interval'] >= range['end']
278+
raise ConfigError.new "`range_partitioning.range.start` + `range_partitioning.range.interval` must be less than `range_partitioning.range.end`"
279+
end
280+
end
281+
240282
if task['clustering']
241283
unless task['clustering']['fields']
242284
raise ConfigError.new "`clustering` must have `fields` key"

lib/embulk/output/bigquery/bigquery_client.rb

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ def initialize(task, schema, fields = nil)
2121
@destination_project = @task['destination_project']
2222
@dataset = @task['dataset']
2323
@location = @task['location']
24-
@location_for_log = @location.nil? ? 'us/eu' : @location
24+
@location_for_log = @location.nil? ? 'Primary location' : @location
2525

2626
@task['source_format'] ||= 'CSV'
2727
@task['max_bad_records'] ||= 0
@@ -300,6 +300,7 @@ def wait_load(kind, response)
300300

301301
while true
302302
job_id = _response.job_reference.job_id
303+
location = @location || _response.job_reference.location
303304
elapsed = Time.now - started
304305
status = _response.status.state
305306
if status == "DONE"
@@ -319,7 +320,7 @@ def wait_load(kind, response)
319320
"job_id:[#{job_id}] elapsed_time:#{elapsed.to_f}sec status:[#{status}]"
320321
}
321322
sleep wait_interval
322-
_response = with_network_retry { client.get_job(@project, job_id, location: @location) }
323+
_response = with_network_retry { client.get_job(@project, job_id, location: location) }
323324
end
324325
end
325326

@@ -434,6 +435,18 @@ def create_table_if_not_exists(table, dataset: nil, options: nil)
434435
}
435436
end
436437

438+
options['range_partitioning'] ||= @task['range_partitioning']
439+
if options['range_partitioning']
440+
body[:range_partitioning] = {
441+
field: options['range_partitioning']['field'],
442+
range: {
443+
start: options['range_partitioning']['range']['start'].to_s,
444+
end: options['range_partitioning']['range']['end'].to_s,
445+
interval: options['range_partitioning']['range']['interval'].to_s,
446+
},
447+
}
448+
end
449+
437450
options['clustering'] ||= @task['clustering']
438451
if options['clustering']
439452
body[:clustering] = {

lib/embulk/output/bigquery/value_converter_factory.rb

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,14 @@ def string_converter
230230
val # Users must care of BQ timestamp format
231231
}
232232
end
233+
when 'TIME'
234+
# TimeWithZone doesn't affect any change to the time value
235+
Proc.new {|val|
236+
next nil if val.nil?
237+
with_typecast_error(val) do |val|
238+
TimeWithZone.set_zone_offset(Time.parse(val), zone_offset).strftime("%H:%M:%S.%6N")
239+
end
240+
}
233241
when 'RECORD'
234242
Proc.new {|val|
235243
next nil if val.nil?
@@ -284,6 +292,11 @@ def timestamp_converter
284292
next nil if val.nil?
285293
val.localtime(zone_offset).strftime("%Y-%m-%d %H:%M:%S.%6N")
286294
}
295+
when 'TIME'
296+
Proc.new {|val|
297+
next nil if val.nil?
298+
val.localtime(zone_offset).strftime("%H:%M:%S.%6N")
299+
}
287300
else
288301
raise NotSupportedType, "cannot take column type #{type} for timestamp column"
289302
end

test/test_configure.rb

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,44 @@ def test_time_partitioning
273273
assert_equal 'DAY', task['time_partitioning']['type']
274274
end
275275

276+
def test_range_partitioning
277+
config = least_config.merge('range_partitioning' => {'field' => 'foo', 'range' => { 'start' => 1, 'end' => 3, 'interval' => 1 }})
278+
assert_nothing_raised { Bigquery.configure(config, schema, processor_count) }
279+
280+
# field is required
281+
config = least_config.merge('range_partitioning' => {'range' => { 'start' => 1, 'end' => 2, 'interval' => 1 }})
282+
assert_raise { Bigquery.configure(config, schema, processor_count) }
283+
284+
285+
# range is required
286+
config = least_config.merge('range_partitioning' => {'field' => 'foo'})
287+
assert_raise { Bigquery.configure(config, schema, processor_count) }
288+
289+
# range.start is required
290+
config = least_config.merge('range_partitioning' => {'field' => 'foo', 'range' => { 'end' => 2, 'interval' => 1 }})
291+
assert_raise { Bigquery.configure(config, schema, processor_count) }
292+
293+
# range.end is required
294+
config = least_config.merge('range_partitioning' => {'field' => 'foo', 'range' => { 'start' => 1, 'interval' => 1 }})
295+
assert_raise { Bigquery.configure(config, schema, processor_count) }
296+
297+
# range.interval is required
298+
config = least_config.merge('range_partitioning' => {'field' => 'foo', 'range' => { 'start' => 1, 'end' => 2 }})
299+
assert_raise { Bigquery.configure(config, schema, processor_count) }
300+
301+
# range.start + range.interval should be less than range.end
302+
config = least_config.merge('range_partitioning' => {'field' => 'foo', 'range' => { 'start' => 1, 'end' => 2, 'interval' => 2 }})
303+
assert_raise { Bigquery.configure(config, schema, processor_count) }
304+
end
305+
306+
def test_time_and_range_partitioning_error
307+
config = least_config.merge('time_partitioning' => {'type' => 'DAY'}, 'range_partitioning' => {'field' => 'foo', 'range' => { 'start' => 1, 'end' => 2, 'interval' => 1 }})
308+
assert_raise { Bigquery.configure(config, schema, processor_count) }
309+
310+
config = least_config.merge('table' => 'table_name$20160912', 'range_partitioning' => {'field' => 'foo', 'range' => { 'start' => 1, 'end' => 2, 'interval' => 1 }})
311+
assert_raise { Bigquery.configure(config, schema, processor_count) }
312+
end
313+
276314
def test_clustering
277315
config = least_config.merge('clustering' => {'fields' => ['field_a']})
278316
assert_nothing_raised { Bigquery.configure(config, schema, processor_count) }

test/test_value_converter_factory.rb

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,23 @@ def test_datetime
262262
assert_equal "2016-02-26 00:00:00", converter.call("2016-02-26 00:00:00")
263263
end
264264

265+
def test_time
266+
converter = ValueConverterFactory.new(SCHEMA_TYPE, 'TIME').create_converter
267+
assert_equal nil, converter.call(nil)
268+
assert_equal "00:03:22.000000", converter.call("00:03:22")
269+
assert_equal "15:22:00.000000", converter.call("3:22 PM")
270+
assert_equal "03:22:00.000000", converter.call("3:22 AM")
271+
assert_equal "00:00:00.000000", converter.call("2016-02-26 00:00:00")
272+
273+
# TimeWithZone doesn't affect any change to the time value
274+
converter = ValueConverterFactory.new(
275+
SCHEMA_TYPE, 'TIME', timezone: 'Asia/Tokyo'
276+
).create_converter
277+
assert_equal "15:00:01.000000", converter.call("15:00:01")
278+
279+
assert_raise { converter.call('foo') }
280+
end
281+
265282
def test_record
266283
converter = ValueConverterFactory.new(SCHEMA_TYPE, 'RECORD').create_converter
267284
assert_equal({'foo'=>'foo'}, converter.call(%Q[{"foo":"foo"}]))
@@ -350,6 +367,24 @@ def test_datetime
350367
assert_raise { converter.call('foo') }
351368
end
352369

370+
def test_time
371+
converter = ValueConverterFactory.new(SCHEMA_TYPE, 'TIME').create_converter
372+
assert_equal nil, converter.call(nil)
373+
timestamp = Time.parse("2016-02-26 00:00:00.500000 +00:00")
374+
expected = "00:00:00.500000"
375+
assert_equal expected, converter.call(timestamp)
376+
377+
converter = ValueConverterFactory.new(
378+
SCHEMA_TYPE, 'TIME', timezone: 'Asia/Tokyo'
379+
).create_converter
380+
assert_equal nil, converter.call(nil)
381+
timestamp = Time.parse("2016-02-25 15:00:00.500000 +00:00")
382+
expected = "00:00:00.500000"
383+
assert_equal expected, converter.call(timestamp)
384+
385+
assert_raise { converter.call('foo') }
386+
end
387+
353388
def test_record
354389
assert_raise { ValueConverterFactory.new(SCHEMA_TYPE, 'RECORD').create_converter }
355390
end

0 commit comments

Comments
 (0)