Skip to content

Commit cfce3a6

Browse files
authored
Merge pull request #105 from embulk/auto_create_table
Change default value of `auto_create_table` to `true` from `false`
2 parents 6b3dc95 + 7aa6b7d commit cfce3a6

File tree

4 files changed

+35
-100
lines changed

4 files changed

+35
-100
lines changed

README.md

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ OAuth flow for installed applications.
3939
| location | string | optional | nil | geographic location of dataset. See [Location](#location) |
4040
| table | string | required | | table name, or table name with a partition decorator such as `table_name$20160929`|
4141
| auto_create_dataset | boolean | optional | false | automatically create dataset |
42-
| auto_create_table | boolean | optional | false | See [Dynamic Table Creating](#dynamic-table-creating) and [Time Partitioning](#time-partitioning) |
42+
| auto_create_table | boolean | optional | true | `false` is available only for `append_direct` mode. Other modes requires `true`. See [Dynamic Table Creating](#dynamic-table-creating) and [Time Partitioning](#time-partitioning) |
4343
| schema_file | string | optional | | /path/to/schema.json |
4444
| template_table | string | optional | | template table name. See [Dynamic Table Creating](#dynamic-table-creating) |
4545
| job_status_max_polling_time | int | optional | 3600 sec | Max job status polling time |
@@ -242,11 +242,6 @@ out:
242242

243243
### Dynamic table creating
244244

245-
This plugin tries to create a table using BigQuery API when
246-
247-
* mode is either of `delete_in_advance`, `replace`, `replace_backup`, `append`.
248-
* mode is `append_direct` and `auto_create_table` is true.
249-
250245
There are 3 ways to set schema.
251246

252247
#### Set schema.json
@@ -375,32 +370,31 @@ To load into a partition, specify `table` parameter with a partition decorator a
375370
out:
376371
type: bigquery
377372
table: table_name$20160929
378-
auto_create_table: true
379373
```
380374

381-
You may configure `time_partitioning` parameter together to create table via `auto_create_table: true` option as:
375+
You may configure `time_partitioning` parameter together as:
382376

383377
```yaml
384378
out:
385379
type: bigquery
386380
table: table_name$20160929
387-
auto_create_table: true
388381
time_partitioning:
389382
type: DAY
390383
expiration_ms: 259200000
391384
```
392385

393386
You can also create column-based partitioning table as:
387+
394388
```yaml
395389
out:
396390
type: bigquery
397391
mode: replace
398-
auto_create_table: true
399392
table: table_name
400393
time_partitioning:
401394
type: DAY
402395
field: timestamp
403396
```
397+
404398
Note the `time_partitioning.field` should be top-level `DATE` or `TIMESTAMP`.
405399

406400
Use [Tables: patch](https://cloud.google.com/bigquery/docs/reference/v2/tables/patch) API to update the schema of the partitioned table, embulk-output-bigquery itself does not support it, though.

lib/embulk/output/bigquery.rb

Lines changed: 10 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ def self.configure(config, schema, task_count)
4545
'table_old' => config.param('table_old', :string, :default => nil),
4646
'table_name_old' => config.param('table_name_old', :string, :default => nil), # lower version compatibility
4747
'auto_create_dataset' => config.param('auto_create_dataset', :bool, :default => false),
48-
'auto_create_table' => config.param('auto_create_table', :bool, :default => false),
48+
'auto_create_table' => config.param('auto_create_table', :bool, :default => true),
4949
'schema_file' => config.param('schema_file', :string, :default => nil),
5050
'template_table' => config.param('template_table', :string, :default => nil),
5151

@@ -104,10 +104,14 @@ def self.configure(config, schema, task_count)
104104
raise ConfigError.new "`mode` must be one of append, append_direct, replace, delete_in_advance, replace_backup"
105105
end
106106

107+
if %w[append replace delete_in_advance replace_backup].include?(task['mode']) and !task['auto_create_table']
108+
raise ConfigError.new "`mode: #{task['mode']}` requires `auto_create_table: true`"
109+
end
110+
107111
if task['mode'] == 'replace_backup'
108112
task['table_old'] ||= task['table_name_old'] # for lower version compatibility
109113
if task['dataset_old'].nil? and task['table_old'].nil?
110-
raise ConfigError.new "`mode replace_backup` requires either of `dataset_old` or `table_old`"
114+
raise ConfigError.new "`mode: replace_backup` requires either of `dataset_old` or `table_old`"
111115
end
112116
task['dataset_old'] ||= task['dataset']
113117
task['table_old'] ||= task['table']
@@ -309,38 +313,14 @@ def self.auto_create(task, bigquery)
309313
bigquery.create_table_if_not_exists(task['table'])
310314
when 'replace'
311315
bigquery.create_table_if_not_exists(task['temp_table'])
312-
if Helper.has_partition_decorator?(task['table'])
313-
if task['auto_create_table']
314-
bigquery.create_table_if_not_exists(task['table'])
315-
else
316-
bigquery.get_table(task['table']) # raises NotFoundError
317-
end
318-
end
316+
bigquery.create_table_if_not_exists(task['table'])
319317
when 'append'
320318
bigquery.create_table_if_not_exists(task['temp_table'])
321-
if Helper.has_partition_decorator?(task['table'])
322-
if task['auto_create_table']
323-
bigquery.create_table_if_not_exists(task['table'])
324-
else
325-
bigquery.get_table(task['table']) # raises NotFoundError
326-
end
327-
end
319+
bigquery.create_table_if_not_exists(task['table'])
328320
when 'replace_backup'
329321
bigquery.create_table_if_not_exists(task['temp_table'])
330-
if Helper.has_partition_decorator?(task['table'])
331-
if task['auto_create_table']
332-
bigquery.create_table_if_not_exists(task['table'])
333-
else
334-
bigquery.get_table(task['table']) # raises NotFoundError
335-
end
336-
end
337-
if Helper.has_partition_decorator?(task['table_old'])
338-
if task['auto_create_table']
339-
bigquery.create_table_if_not_exists(task['table_old'], dataset: task['dataset_old'])
340-
else
341-
bigquery.get_table(task['table_old'], dataset: task['dataset_old']) # raises NotFoundError
342-
end
343-
end
322+
bigquery.create_table_if_not_exists(task['table'])
323+
bigquery.create_table_if_not_exists(task['table_old'], dataset: task['dataset_old'])
344324
else # append_direct
345325
if task['auto_create_table']
346326
bigquery.create_table_if_not_exists(task['table'])

test/test_configure.rb

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def test_configure_default
5555
assert_equal nil, task['table_old']
5656
assert_equal nil, task['table_name_old']
5757
assert_equal false, task['auto_create_dataset']
58-
assert_equal false, task['auto_create_table']
58+
assert_equal true, task['auto_create_table']
5959
assert_equal nil, task['schema_file']
6060
assert_equal nil, task['template_table']
6161
assert_equal true, task['delete_from_local_when_job_end']
@@ -161,22 +161,22 @@ def test_json_keyfile
161161
end
162162

163163
def test_payload_column
164-
config = least_config.merge('payload_column' => schema.first.name)
164+
config = least_config.merge('payload_column' => schema.first.name, 'auto_create_table' => false, 'mode' => 'append_direct')
165165
task = Bigquery.configure(config, schema, processor_count)
166166
assert_equal task['payload_column_index'], 0
167167

168-
config = least_config.merge('payload_column' => 'not_exist')
168+
config = least_config.merge('payload_column' => 'not_exist', 'auto_create_table' => false, 'mode' => 'append_direct')
169169
assert_raise { Bigquery.configure(config, schema, processor_count) }
170170
end
171171

172172
def test_payload_column_index
173-
config = least_config.merge('payload_column_index' => 0)
173+
config = least_config.merge('payload_column_index' => 0, 'auto_create_table' => false, 'mode' => 'append_direct')
174174
assert_nothing_raised { Bigquery.configure(config, schema, processor_count) }
175175

176-
config = least_config.merge('payload_column_index' => -1)
176+
config = least_config.merge('payload_column_index' => -1, 'auto_create_table' => false, 'mode' => 'append_direct')
177177
assert_raise { Bigquery.configure(config, schema, processor_count) }
178178

179-
config = least_config.merge('payload_column_index' => schema.size)
179+
config = least_config.merge('payload_column_index' => schema.size, 'auto_create_table' => false, 'mode' => 'append_direct')
180180
assert_raise { Bigquery.configure(config, schema, processor_count) }
181181
end
182182

test/test_transaction.rb

Lines changed: 15 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ def setup
4141
end
4242

4343
sub_test_case "append_direct" do
44-
def test_append_direct
45-
config = least_config.merge('mode' => 'append_direct')
44+
def test_append_direc_without_auto_create
45+
config = least_config.merge('mode' => 'append_direct', 'auto_create_dataset' => false, 'auto_create_table' => false)
4646
any_instance_of(BigqueryClient) do |obj|
4747
mock(obj).get_dataset(config['dataset'])
4848
mock(obj).get_table(config['table'])
@@ -60,8 +60,8 @@ def test_append_direct_with_auto_create
6060
Bigquery.transaction(config, schema, processor_count, &control)
6161
end
6262

63-
def test_append_direct_with_partition
64-
config = least_config.merge('mode' => 'append_direct', 'table' => 'table$20160929')
63+
def test_append_direct_with_partition_without_auto_create
64+
config = least_config.merge('mode' => 'append_direct', 'table' => 'table$20160929', 'auto_create_dataset' => false, 'auto_create_table' => false)
6565
any_instance_of(BigqueryClient) do |obj|
6666
mock(obj).get_dataset(config['dataset'])
6767
mock(obj).get_table(config['table'])
@@ -111,6 +111,7 @@ def test_replace
111111
any_instance_of(BigqueryClient) do |obj|
112112
mock(obj).get_dataset(config['dataset'])
113113
mock(obj).create_table_if_not_exists(config['temp_table'])
114+
mock(obj).create_table_if_not_exists(config['table'])
114115
mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_TRUNCATE')
115116
mock(obj).delete_table(config['temp_table'])
116117
end
@@ -120,19 +121,6 @@ def test_replace
120121
def test_replace_with_partitioning
121122
config = least_config.merge('mode' => 'replace', 'table' => 'table$20160929')
122123
task = Bigquery.configure(config, schema, processor_count)
123-
any_instance_of(BigqueryClient) do |obj|
124-
mock(obj).get_dataset(config['dataset'])
125-
mock(obj).create_table_if_not_exists(config['temp_table'])
126-
mock(obj).get_table(config['table'])
127-
mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_TRUNCATE')
128-
mock(obj).delete_table(config['temp_table'])
129-
end
130-
Bigquery.transaction(config, schema, processor_count, &control)
131-
end
132-
133-
def test_replace_with_partitioning_with_auto_create_table
134-
config = least_config.merge('mode' => 'replace', 'table' => 'table$20160929', 'auto_create_table' => true)
135-
task = Bigquery.configure(config, schema, processor_count)
136124
any_instance_of(BigqueryClient) do |obj|
137125
mock(obj).get_dataset(config['dataset'])
138126
mock(obj).create_table_if_not_exists(config['temp_table'])
@@ -152,8 +140,10 @@ def test_replace_backup
152140
mock(obj).get_dataset(config['dataset'])
153141
mock(obj).get_dataset(config['dataset_old'])
154142
mock(obj).create_table_if_not_exists(config['temp_table'])
143+
mock(obj).create_table_if_not_exists(config['table'])
144+
mock(obj).create_table_if_not_exists(config['table_old'], dataset: config['dataset_old'])
155145

156-
mock(obj).get_table_or_partition(task['table'])
146+
mock(obj).get_table_or_partition(config['table'])
157147
mock(obj).copy(config['table'], config['table_old'], config['dataset_old'])
158148

159149
mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_TRUNCATE')
@@ -168,9 +158,11 @@ def test_replace_backup_auto_create_dataset
168158
any_instance_of(BigqueryClient) do |obj|
169159
mock(obj).create_dataset(config['dataset'])
170160
mock(obj).create_dataset(config['dataset_old'], reference: config['dataset'])
161+
mock(obj).create_table_if_not_exists(config['table'])
171162
mock(obj).create_table_if_not_exists(config['temp_table'])
163+
mock(obj).create_table_if_not_exists(config['table_old'], dataset: config['dataset_old'])
172164

173-
mock(obj).get_table_or_partition(task['table'])
165+
mock(obj).get_table_or_partition(config['table'])
174166
mock(obj).copy(config['table'], config['table_old'], config['dataset_old'])
175167

176168
mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_TRUNCATE')
@@ -180,35 +172,16 @@ def test_replace_backup_auto_create_dataset
180172
end
181173

182174
def test_replace_backup_with_partitioning
183-
config = least_config.merge('mode' => 'replace_backup', 'table' => 'table$20160929', 'dataset_old' => 'dataset_old', 'table_old' => 'table_old$20190929', 'temp_table' => 'temp_table')
184-
task = Bigquery.configure(config, schema, processor_count)
185-
any_instance_of(BigqueryClient) do |obj|
186-
mock(obj).get_dataset(config['dataset'])
187-
mock(obj).get_dataset(config['dataset_old'])
188-
mock(obj).create_table_if_not_exists(config['temp_table'])
189-
mock(obj).get_table(task['table'])
190-
mock(obj).get_table(task['table_old'], dataset: config['dataset_old'])
191-
192-
mock(obj).get_table_or_partition(task['table'])
193-
mock(obj).copy(config['table'], config['table_old'], config['dataset_old'])
194-
195-
mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_TRUNCATE')
196-
mock(obj).delete_table(config['temp_table'])
197-
end
198-
Bigquery.transaction(config, schema, processor_count, &control)
199-
end
200-
201-
def test_replace_backup_with_partitioning_auto_create_table
202175
config = least_config.merge('mode' => 'replace_backup', 'table' => 'table$20160929', 'dataset_old' => 'dataset_old', 'table_old' => 'table_old$20160929', 'temp_table' => 'temp_table', 'auto_create_table' => true)
203176
task = Bigquery.configure(config, schema, processor_count)
204177
any_instance_of(BigqueryClient) do |obj|
205178
mock(obj).get_dataset(config['dataset'])
206179
mock(obj).get_dataset(config['dataset_old'])
207180
mock(obj).create_table_if_not_exists(config['temp_table'])
208-
mock(obj).create_table_if_not_exists(task['table'])
209-
mock(obj).create_table_if_not_exists(task['table_old'], dataset: config['dataset_old'])
181+
mock(obj).create_table_if_not_exists(config['table'])
182+
mock(obj).create_table_if_not_exists(config['table_old'], dataset: config['dataset_old'])
210183

211-
mock(obj).get_table_or_partition(task['table'])
184+
mock(obj).get_table_or_partition(config['table'])
212185
mock(obj).copy(config['table'], config['table_old'], config['dataset_old'])
213186

214187
mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_TRUNCATE')
@@ -225,26 +198,14 @@ def test_append
225198
any_instance_of(BigqueryClient) do |obj|
226199
mock(obj).get_dataset(config['dataset'])
227200
mock(obj).create_table_if_not_exists(config['temp_table'])
201+
mock(obj).create_table_if_not_exists(config['table'])
228202
mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_APPEND')
229203
mock(obj).delete_table(config['temp_table'])
230204
end
231205
Bigquery.transaction(config, schema, processor_count, &control)
232206
end
233207

234208
def test_append_with_partitioning
235-
config = least_config.merge('mode' => 'append', 'table' => 'table$20160929')
236-
task = Bigquery.configure(config, schema, processor_count)
237-
any_instance_of(BigqueryClient) do |obj|
238-
mock(obj).get_dataset(config['dataset'])
239-
mock(obj).create_table_if_not_exists(config['temp_table'])
240-
mock(obj).get_table(config['table'])
241-
mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_APPEND')
242-
mock(obj).delete_table(config['temp_table'])
243-
end
244-
Bigquery.transaction(config, schema, processor_count, &control)
245-
end
246-
247-
def test_append_with_partitioning_with_auto_create_table
248209
config = least_config.merge('mode' => 'append', 'table' => 'table$20160929', 'auto_create_table' => true)
249210
task = Bigquery.configure(config, schema, processor_count)
250211
any_instance_of(BigqueryClient) do |obj|

0 commit comments

Comments
 (0)