Skip to content

Commit d5a85e3

Browse files
authored
Merge branch 'master' into bump-6-6-7
2 parents 47595b5 + 530cd73 commit d5a85e3

File tree

6 files changed

+45
-4
lines changed

6 files changed

+45
-4
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,7 @@ Column options are used to aid guessing BigQuery schema, or to define conversion
318318
- **fields**: Describes the nested schema fields if the type property is set to RECORD. Please note that this is **required** for `RECORD` column.
319319
- **timestamp_format**: timestamp format to convert into/from `timestamp` (string, default is `default_timestamp_format`)
320320
- **timezone**: timezone to convert into/from `timestamp`, `date` (string, default is `default_timezone`).
321+
- **description**: description for the column.
321322
- **default_timestamp_format**: default timestamp format for column_options (string, default is "%Y-%m-%d %H:%M:%S.%6N")
322323
- **default_timezone**: default timezone for column_options (string, default is "UTC")
323324

lib/embulk/output/bigquery.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,9 @@ def self.transaction(config, schema, task_count, &control)
398398
end
399399
end
400400
end
401+
402+
bigquery.patch_table
403+
401404
ensure
402405
begin
403406
if task['temp_table'] # append or replace or replace_backup

lib/embulk/output/bigquery/bigquery_client.rb

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,29 @@ def get_table_or_partition(table, dataset: nil)
511511
raise Error, "failed to get table #{@project}:#{dataset}.#{table}, response:#{response}"
512512
end
513513
end
514+
515+
# update only column.description
516+
def patch_table
517+
table = get_table(@task['table'])
518+
519+
def patch_description(fields, column_options)
520+
fields.map do |field|
521+
column_option = column_options.select{|col_opt| col_opt['name'] == field.name}.first
522+
if column_option
523+
field.update!(description: column_option['description']) if column_option['description']
524+
if field.fields && column_option['fields']
525+
nested_fields = patch_description(field.fields, column_option['fields'])
526+
field.update!(fields: nested_fields)
527+
end
528+
end
529+
field
530+
end
531+
end
532+
533+
fields = patch_description(table.schema.fields, @task['column_options'])
534+
table.schema.update!(fields: fields)
535+
client.patch_table(@project, @dataset, @task['table'], table)
536+
end
514537
end
515538
end
516539
end

lib/embulk/output/bigquery/helper.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ def self.fields_from_embulk_schema(task, schema)
5050
field[:type] = (column_option['type'] || bq_type_from_embulk_type(embulk_type)).upcase
5151
field[:mode] = column_option['mode'] if column_option['mode']
5252
field[:fields] = deep_symbolize_keys(column_option['fields']) if column_option['fields']
53+
field[:description] = column_option['description'] if column_option['description']
5354
end
5455
end
5556
end

test/test_helper.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,25 +71,25 @@ def test_fields_from_embulk_schema_with_column_options
7171
{'name' => 'boolean', 'type' => 'STRING', 'mode' => 'REQUIRED'},
7272
{'name' => 'long', 'type' => 'STRING'},
7373
{'name' => 'double', 'type' => 'STRING'},
74-
{'name' => 'string', 'type' => 'INTEGER'},
74+
{'name' => 'string', 'type' => 'INTEGER', 'description' => 'memo'},
7575
{'name' => 'timestamp', 'type' => 'INTEGER'},
7676
{'name' => 'date', 'type' => 'DATE'},
7777
{'name' => 'datetime', 'type' => 'DATETIME'},
7878
{'name' => 'json', 'type' => 'RECORD', 'fields' => [
79-
{ 'name' => 'key1', 'type' => 'STRING' },
79+
{ 'name' => 'key1', 'type' => 'STRING', 'description' => 'nested_memo'},
8080
]},
8181
],
8282
}
8383
expected = [
8484
{name: 'boolean', type: 'STRING', mode: 'REQUIRED'},
8585
{name: 'long', type: 'STRING'},
8686
{name: 'double', type: 'STRING'},
87-
{name: 'string', type: 'INTEGER'},
87+
{name: 'string', type: 'INTEGER', description: 'memo'},
8888
{name: 'timestamp', type: 'INTEGER'},
8989
{name: 'date', type: 'DATE'},
9090
{name: 'datetime', type: 'DATETIME'},
9191
{name: 'json', type: 'RECORD', fields: [
92-
{name: 'key1', type: 'STRING'},
92+
{name: 'key1', type: 'STRING', description: 'nested_memo'},
9393
]},
9494
]
9595
fields = Helper.fields_from_embulk_schema(task, schema)

test/test_transaction.rb

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ def test_append_direc_without_auto_create
4545
any_instance_of(BigqueryClient) do |obj|
4646
mock(obj).get_dataset(config['dataset'])
4747
mock(obj).get_table(config['table'])
48+
mock(obj).patch_table
4849
end
4950
Bigquery.transaction(config, schema, processor_count, &control)
5051
end
@@ -55,6 +56,7 @@ def test_append_direct_with_auto_create
5556
any_instance_of(BigqueryClient) do |obj|
5657
mock(obj).create_dataset(config['dataset'])
5758
mock(obj).create_table_if_not_exists(config['table'])
59+
mock(obj).patch_table
5860
end
5961
Bigquery.transaction(config, schema, processor_count, &control)
6062
end
@@ -64,6 +66,7 @@ def test_append_direct_with_partition_without_auto_create
6466
any_instance_of(BigqueryClient) do |obj|
6567
mock(obj).get_dataset(config['dataset'])
6668
mock(obj).get_table(config['table'])
69+
mock(obj).patch_table
6770
end
6871
Bigquery.transaction(config, schema, processor_count, &control)
6972
end
@@ -74,6 +77,7 @@ def test_append_direct_with_partition_with_auto_create
7477
any_instance_of(BigqueryClient) do |obj|
7578
mock(obj).create_dataset(config['dataset'])
7679
mock(obj).create_table_if_not_exists(config['table'])
80+
mock(obj).patch_table
7781
end
7882
Bigquery.transaction(config, schema, processor_count, &control)
7983
end
@@ -87,6 +91,7 @@ def test_delete_in_advance
8791
mock(obj).get_dataset(config['dataset'])
8892
mock(obj).delete_table_or_partition(config['table'])
8993
mock(obj).create_table_if_not_exists(config['table'])
94+
mock(obj).patch_table
9095
end
9196
Bigquery.transaction(config, schema, processor_count, &control)
9297
end
@@ -98,6 +103,7 @@ def test_delete_in_advance_with_partitioning
98103
mock(obj).get_dataset(config['dataset'])
99104
mock(obj).delete_table_or_partition(config['table'])
100105
mock(obj).create_table_if_not_exists(config['table'])
106+
mock(obj).patch_table
101107
end
102108
Bigquery.transaction(config, schema, processor_count, &control)
103109
end
@@ -113,6 +119,7 @@ def test_replace
113119
mock(obj).create_table_if_not_exists(config['table'])
114120
mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_TRUNCATE')
115121
mock(obj).delete_table(config['temp_table'])
122+
mock(obj).patch_table
116123
end
117124
Bigquery.transaction(config, schema, processor_count, &control)
118125
end
@@ -126,6 +133,7 @@ def test_replace_with_partitioning
126133
mock(obj).create_table_if_not_exists(config['table'])
127134
mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_TRUNCATE')
128135
mock(obj).delete_table(config['temp_table'])
136+
mock(obj).patch_table
129137
end
130138
Bigquery.transaction(config, schema, processor_count, &control)
131139
end
@@ -147,6 +155,7 @@ def test_replace_backup
147155

148156
mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_TRUNCATE')
149157
mock(obj).delete_table(config['temp_table'])
158+
mock(obj).patch_table
150159
end
151160
Bigquery.transaction(config, schema, processor_count, &control)
152161
end
@@ -166,6 +175,7 @@ def test_replace_backup_auto_create_dataset
166175

167176
mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_TRUNCATE')
168177
mock(obj).delete_table(config['temp_table'])
178+
mock(obj).patch_table
169179
end
170180
Bigquery.transaction(config, schema, processor_count, &control)
171181
end
@@ -185,6 +195,7 @@ def test_replace_backup_with_partitioning
185195

186196
mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_TRUNCATE')
187197
mock(obj).delete_table(config['temp_table'])
198+
mock(obj).patch_table
188199
end
189200
Bigquery.transaction(config, schema, processor_count, &control)
190201
end
@@ -200,6 +211,7 @@ def test_append
200211
mock(obj).create_table_if_not_exists(config['table'])
201212
mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_APPEND')
202213
mock(obj).delete_table(config['temp_table'])
214+
mock(obj).patch_table
203215
end
204216
Bigquery.transaction(config, schema, processor_count, &control)
205217
end
@@ -213,6 +225,7 @@ def test_append_with_partitioning
213225
mock(obj).create_table_if_not_exists(config['table'])
214226
mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_APPEND')
215227
mock(obj).delete_table(config['temp_table'])
228+
mock(obj).patch_table
216229
end
217230
Bigquery.transaction(config, schema, processor_count, &control)
218231
end

0 commit comments

Comments
 (0)