Skip to content

Commit a044150

Browse files
committed
feat: Add range partitioning support
1 parent c265236 commit a044150

File tree

5 files changed

+80
-2
lines changed

5 files changed

+80
-2
lines changed

README.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,12 @@ Following options are same as [bq command-line tools](https://cloud.google.com/b
110110
| time_partitioning.type | string | required | nil | The only type supported is DAY, which will generate one partition per day based on data loading time. |
111111
| time_partitioning.expiration_ms | int | optional | nil | Number of milliseconds for which to keep the storage for a partition. |
112112
| time_partitioning.field | string | optional | nil | `DATE` or `TIMESTAMP` column used for partitioning |
113+
| range_partitioning | hash | optional | nil | See [Range Partitioning](#range-partitioning) |
114+
| range_partitioning.field | string | required | nil | `INT64` column used for partitioning |
115+
| range-partitioning.range | hash | required | nil | Defines the ranges for range paritioning |
116+
| range-partitioning.range.start | string | required | nil | The start of range partitioning, inclusive. This field is an INT64 value represented as a string. |
117+
| range-partitioning.range.end | string | required | nil | The end of range partitioning, exclusive. This field is an INT64 value represented as a string. |
118+
| range-partitioning.range.interval| string | required | nil | The width of each interval. This field is an INT64 value represented as a string. |
113119
| clustering | hash | optional | nil | Currently, clustering is supported for partitioned tables, so must be used with `time_partitioning` option. See [clustered tables](https://cloud.google.com/bigquery/docs/clustered-tables) |
114120
| clustering.fields | array | required | nil | One or more fields on which data should be clustered. The order of the specified columns determines the sort order of the data. |
115121
| schema_update_options | array | optional | nil | (Experimental) List of `ALLOW_FIELD_ADDITION` or `ALLOW_FIELD_RELAXATION` or both. See [jobs#configuration.load.schemaUpdateOptions](https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.schemaUpdateOptions). NOTE for the current status: `schema_update_options` does not work for `copy` job, that is, is not effective for most of modes such as `append`, `replace` and `replace_backup`. `delete_in_advance` deletes origin table so does not need to update schema. Only `append_direct` can utilize schema update. |
@@ -448,6 +454,24 @@ MEMO: [jobs#configuration.load.schemaUpdateOptions](https://cloud.google.com/big
448454
to update the schema of the desitination table as a side effect of the load job, but it is not available for copy job.
449455
Thus, it was not suitable for embulk-output-bigquery idempotence modes, `append`, `replace`, and `replace_backup`, sigh.
450456

457+
### Range Partitioning
458+
459+
See also [Creating and Updating Range-Partitioned Tables](https://cloud.google.com/bigquery/docs/creating-partitioned-tables).
460+
461+
To load into a partition, specify `range_partitioning` and `table` parameter with a partition decorator as:
462+
463+
```yaml
464+
out:
465+
type: bigquery
466+
table: table_name$1
467+
range_partitioning:
468+
field: customer_id
469+
range:
470+
start: '1'
471+
end: '99999'
472+
range: '1'
473+
```
474+
451475
## Development
452476

453477
### Run example:
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
in:
2+
type: file
3+
path_prefix: example/example.csv
4+
parser:
5+
type: csv
6+
charset: UTF-8
7+
newline: CRLF
8+
null_string: 'NULL'
9+
skip_header_lines: 1
10+
comment_line_marker: '#'
11+
columns:
12+
- {name: date, type: string}
13+
- {name: timestamp, type: timestamp, format: "%Y-%m-%d %H:%M:%S.%N", timezone: "+09:00"}
14+
- {name: "null", type: string}
15+
- {name: long, type: INT64}
16+
- {name: string, type: string}
17+
- {name: double, type: double}
18+
- {name: boolean, type: boolean}
19+
out:
20+
type: bigquery
21+
mode: replace
22+
auth_method: service_account
23+
json_keyfile: example/your-project-000.json
24+
dataset: your_dataset_name
25+
table: your_field_partitioned_table_name
26+
source_format: NEWLINE_DELIMITED_JSON
27+
compression: NONE
28+
auto_create_dataset: true
29+
auto_create_table: true
30+
schema_file: example/schema.json
31+
range_partitioning:
32+
field: 'long'
33+
range:
34+
start: '90'
35+
end: '100'
36+
interval: '1'

lib/embulk/output/bigquery.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ def self.configure(config, schema, task_count)
8989
'ignore_unknown_values' => config.param('ignore_unknown_values', :bool, :default => false),
9090
'allow_quoted_newlines' => config.param('allow_quoted_newlines', :bool, :default => false),
9191
'time_partitioning' => config.param('time_partitioning', :hash, :default => nil),
92+
'range_partitioning' => config.param('range_partitioning', :hash, :default => nil),
9293
'clustering' => config.param('clustering', :hash, :default => nil), # google-api-ruby-client >= v0.21.0
9394
'schema_update_options' => config.param('schema_update_options', :array, :default => nil),
9495

@@ -231,7 +232,8 @@ def self.configure(config, schema, task_count)
231232
unless task['time_partitioning']['type']
232233
raise ConfigError.new "`time_partitioning` must have `type` key"
233234
end
234-
elsif Helper.has_partition_decorator?(task['table'])
235+
# If user specify range_partitioning, it should be used as is
236+
elsif Helper.has_partition_decorator?(task['table']) && !task['range_partitioning'].nil?
235237
task['time_partitioning'] = {'type' => 'DAY'}
236238
end
237239

lib/embulk/output/bigquery/bigquery_client.rb

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,7 @@ def create_table_if_not_exists(table, dataset: nil, options: nil)
411411
dataset ||= @dataset
412412
options ||= {}
413413
options['time_partitioning'] ||= @task['time_partitioning']
414-
if Helper.has_partition_decorator?(table)
414+
if Helper.has_partition_decorator?(table) and not @task.keys?('range_partitioning')
415415
options['time_partitioning'] ||= {'type' => 'DAY'}
416416
table = Helper.chomp_partition_decorator(table)
417417
end
@@ -435,6 +435,17 @@ def create_table_if_not_exists(table, dataset: nil, options: nil)
435435
}
436436
end
437437

438+
if options['range_partitioning']
439+
body[:range_partitioning] = {
440+
field: options['range_partitioning']['field'],
441+
range: {
442+
start: options['range_partitioning']['range']['start'],
443+
end: options['range_partitioning']['range']['end'],
444+
interval: options['range_partitioning']['range']['interval'],
445+
},
446+
}
447+
end
448+
438449
options['clustering'] ||= @task['clustering']
439450
if options['clustering']
440451
body[:clustering] = {

test/test_configure.rb

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,11 @@ def test_time_partitioning
270270
assert_equal 'DAY', task['time_partitioning']['type']
271271
end
272272

273+
def test_range_partitioning
274+
config = least_config.merge('range_partitioning' => {'field' => 'foo', 'range' => { 'start' => '1', 'end' => '2', 'interval' => '1' }})
275+
assert_nothing_raised { Bigquery.configure(config, schema, processor_count) }
276+
end
277+
273278
def test_clustering
274279
config = least_config.merge('clustering' => {'fields' => ['field_a']})
275280
assert_nothing_raised { Bigquery.configure(config, schema, processor_count) }

0 commit comments

Comments
 (0)