Skip to content

Commit 85498bb

Browse files
authored
Merge pull request #93 from potato2003/master
Add support for clustered table
2 parents ecfb538 + 548da5f commit 85498bb

File tree

4 files changed

+25
-0
lines changed

4 files changed

+25
-0
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ Following options are same as [bq command-line tools](https://cloud.google.com/b
108108
| time_partitioning.expiration_ms | int | optional | nil | Number of milliseconds for which to keep the storage for a partition. |
109109
| time_partitioning.field | string | optional | nil | `DATE` or `TIMESTAMP` column used for partitioning |
110110
| time_partitioning.require_partition_filter | boolean | optional | nil | If true, valid partition filter is required when query |
111+
| clustering | hash | optional | nil | (Experimental) Currently, clustering is supported for partitioned tables, so must be used with `time_partitioning` option. NOTE: **clustered tables** is a beta release. See [clustered tables](https://cloud.google.com/bigquery/docs/clustered-tables) |
112+
| clustering.fields | array | required | nil | One or more fields on which data should be clustered. The order of the specified columns determines the sort order of the data. |
111113
| schema_update_options | array | optional | nil | (Experimental) List of `ALLOW_FIELD_ADDITION` or `ALLOW_FIELD_RELAXATION` or both. See [jobs#configuration.load.schemaUpdateOptions](https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.schemaUpdateOptions). NOTE for the current status: `schema_update_options` does not work for `copy` job, that is, is not effective for most of modes such as `append`, `replace` and `replace_backup`. `delete_in_advance` deletes origin table so does not need to update schema. Only `append_direct` can utilize schema update. |
112114

113115
### Example

lib/embulk/output/bigquery.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ def self.configure(config, schema, task_count)
8989
'ignore_unknown_values' => config.param('ignore_unknown_values', :bool, :default => false),
9090
'allow_quoted_newlines' => config.param('allow_quoted_newlines', :bool, :default => false),
9191
'time_partitioning' => config.param('time_partitioning', :hash, :default => nil),
92+
'clustering' => config.param('clustering', :hash, :default => nil), # google-api-ruby-client >= v0.21.0
9293
'schema_update_options' => config.param('schema_update_options', :array, :default => nil),
9394

9495
# for debug
@@ -234,6 +235,12 @@ def self.configure(config, schema, task_count)
234235
task['time_partitioning'] = {'type' => 'DAY'}
235236
end
236237

238+
if task['clustering']
239+
unless task['clustering']['fields']
240+
raise ConfigError.new "`clustering` must have `fields` key"
241+
end
242+
end
243+
237244
if task['schema_update_options']
238245
task['schema_update_options'].each do |schema_update_option|
239246
unless %w[ALLOW_FIELD_ADDITION ALLOW_FIELD_RELAXATION].include?(schema_update_option)

lib/embulk/output/bigquery/bigquery_client.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,13 @@ def create_table(table, dataset: nil, options: nil)
441441
}
442442
end
443443

444+
options['clustering'] ||= @task['clustering']
445+
if options['clustering']
446+
body[:clustering] = {
447+
fields: options['clustering']['fields'],
448+
}
449+
end
450+
444451
opts = {}
445452
Embulk.logger.debug { "embulk-output-bigquery: insert_table(#{@project}, #{dataset}, #{@location_for_log}, #{body}, #{opts})" }
446453
with_network_retry { client.insert_table(@project, dataset, body, opts) }

test/test_configure.rb

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ def test_configure_default
8484
assert_equal false, task['ignore_unknown_values']
8585
assert_equal false, task['allow_quoted_newlines']
8686
assert_equal nil, task['time_partitioning']
87+
assert_equal nil, task['clustering']
8788
assert_equal false, task['skip_load']
8889
end
8990

@@ -277,6 +278,14 @@ def test_time_partitioning
277278
assert_equal 'DAY', task['time_partitioning']['type']
278279
end
279280

281+
def test_clustering
282+
config = least_config.merge('clustering' => {'fields' => ['field_a']})
283+
assert_nothing_raised { Bigquery.configure(config, schema, processor_count) }
284+
285+
config = least_config.merge('clustering' => {})
286+
assert_raise { Bigquery.configure(config, schema, processor_count) }
287+
end
288+
280289
def test_schema_update_options
281290
config = least_config.merge('schema_update_options' => ['ALLOW_FIELD_ADDITION', 'ALLOW_FIELD_RELAXATION'])
282291
assert_nothing_raised { Bigquery.configure(config, schema, processor_count) }

0 commit comments

Comments
 (0)