Skip to content

Commit f25e287

Browse files
authored
Merge pull request #7 from giwa/update-description
2 parents 82b62cf + c487368 commit f25e287

File tree

4 files changed

+36
-1
lines changed

4 files changed

+36
-1
lines changed

embulk-output-bigquery.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |spec|
22
spec.name = "embulk-output-bigquery"
3-
spec.version = "0.6.5"
3+
spec.version = "0.6.6"
44
spec.authors = ["Satoshi Akama", "Naotoshi Seo"]
55
spec.summary = "Google BigQuery output plugin for Embulk"
66
spec.description = "Embulk plugin that insert records to Google BigQuery."

lib/embulk/output/bigquery.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,12 @@ def self.transaction(config, schema, task_count, &control)
398398
end
399399
end
400400
end
401+
402+
if task['mode'] == 'append' || task['mode'] == 'append_direct'
403+
# update only column.description based on column_options
404+
bigquery.patch_table
405+
end
406+
401407
ensure
402408
begin
403409
if task['temp_table'] # append or replace or replace_backup

lib/embulk/output/bigquery/bigquery_client.rb

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,29 @@ def get_table_or_partition(table, dataset: nil)
511511
raise Error, "failed to get table #{@project}:#{dataset}.#{table}, response:#{response}"
512512
end
513513
end
514+
515+
# update only column.description
516+
def patch_table
517+
table = get_table(@task['table'])
518+
519+
def patch_description(fields, column_options)
520+
fields.map do |field|
521+
column_option = column_options.select{|col_opt| col_opt['name'] == field.name}.first
522+
if column_option
523+
field.update!(description: column_option['description']) if column_option['description']
524+
if field.fields && column_option['fields']
525+
nested_fields = patch_description(field.fields, column_option['fields'])
526+
field.update!(fields: nested_fields)
527+
end
528+
end
529+
field
530+
end
531+
end
532+
533+
fields = patch_description(table.schema.fields, @task['column_options'])
534+
table.schema.update!(fields: fields)
535+
client.patch_table(@project, @dataset, @task['table'], table)
536+
end
514537
end
515538
end
516539
end

test/test_transaction.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ def test_append_direc_without_auto_create
4545
any_instance_of(BigqueryClient) do |obj|
4646
mock(obj).get_dataset(config['dataset'])
4747
mock(obj).get_table(config['table'])
48+
mock(obj).patch_table
4849
end
4950
Bigquery.transaction(config, schema, processor_count, &control)
5051
end
@@ -55,6 +56,7 @@ def test_append_direct_with_auto_create
5556
any_instance_of(BigqueryClient) do |obj|
5657
mock(obj).create_dataset(config['dataset'])
5758
mock(obj).create_table_if_not_exists(config['table'])
59+
mock(obj).patch_table
5860
end
5961
Bigquery.transaction(config, schema, processor_count, &control)
6062
end
@@ -64,6 +66,7 @@ def test_append_direct_with_partition_without_auto_create
6466
any_instance_of(BigqueryClient) do |obj|
6567
mock(obj).get_dataset(config['dataset'])
6668
mock(obj).get_table(config['table'])
69+
mock(obj).patch_table
6770
end
6871
Bigquery.transaction(config, schema, processor_count, &control)
6972
end
@@ -74,6 +77,7 @@ def test_append_direct_with_partition_with_auto_create
7477
any_instance_of(BigqueryClient) do |obj|
7578
mock(obj).create_dataset(config['dataset'])
7679
mock(obj).create_table_if_not_exists(config['table'])
80+
mock(obj).patch_table
7781
end
7882
Bigquery.transaction(config, schema, processor_count, &control)
7983
end
@@ -200,6 +204,7 @@ def test_append
200204
mock(obj).create_table_if_not_exists(config['table'])
201205
mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_APPEND')
202206
mock(obj).delete_table(config['temp_table'])
207+
mock(obj).patch_table
203208
end
204209
Bigquery.transaction(config, schema, processor_count, &control)
205210
end
@@ -213,6 +218,7 @@ def test_append_with_partitioning
213218
mock(obj).create_table_if_not_exists(config['table'])
214219
mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_APPEND')
215220
mock(obj).delete_table(config['temp_table'])
221+
mock(obj).patch_table
216222
end
217223
Bigquery.transaction(config, schema, processor_count, &control)
218224
end

0 commit comments

Comments
 (0)