Skip to content

Commit 7ecb496

Browse files
committed
Cleanup
* rename create_table to create_table_if_not_exists * delete_table_or_partition * get_table_or_partition
1 parent 8c3052a commit 7ecb496

File tree

4 files changed

+44
-46
lines changed

4 files changed

+44
-46
lines changed

lib/embulk/output/bigquery.rb

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ def self.configure(config, schema, task_count)
6464
'default_timestamp_format' => config.param('default_timestamp_format', :string, :default => ValueConverterFactory::DEFAULT_TIMESTAMP_FORMAT),
6565
'payload_column' => config.param('payload_column', :string, :default => nil),
6666
'payload_column_index' => config.param('payload_column_index', :integer, :default => nil),
67-
67+
6868
'open_timeout_sec' => config.param('open_timeout_sec', :integer, :default => nil),
6969
'timeout_sec' => config.param('timeout_sec', :integer, :default => nil), # google-api-ruby-client < v0.11.0
7070
'send_timeout_sec' => config.param('send_timeout_sec', :integer, :default => nil), # google-api-ruby-client >= v0.11.0
@@ -276,7 +276,7 @@ def self.transaction_report(task, responses)
276276
sum + (response ? response.statistics.load.output_rows.to_i : 0)
277277
end
278278
if task['temp_table']
279-
num_output_rows = bigquery.get_table(task['temp_table']).num_rows.to_i
279+
num_output_rows = bigquery.get_table_or_partition(task['temp_table']).num_rows.to_i
280280
else
281281
num_output_rows = num_response_rows
282282
end
@@ -306,24 +306,20 @@ def self.auto_create(task, bigquery)
306306

307307
case task['mode']
308308
when 'delete_in_advance'
309-
if task['time_partitioning']
310-
bigquery.delete_partition(task['table'])
311-
else
312-
bigquery.delete_table(task['table'])
313-
end
314-
bigquery.create_table(task['table'])
309+
bigquery.delete_table_or_partition(task['table'])
310+
bigquery.create_table_if_not_exists(task['table'])
315311
when 'replace', 'replace_backup', 'append'
316-
bigquery.create_table(task['temp_table'])
312+
bigquery.create_table_if_not_exists(task['temp_table'])
317313
if task['time_partitioning']
318314
if task['auto_create_table']
319-
bigquery.create_table(task['table'])
315+
bigquery.create_table_if_not_exists(task['table'])
320316
else
321317
bigquery.get_table(task['table']) # raises NotFoundError
322318
end
323319
end
324320
else # append_direct
325321
if task['auto_create_table']
326-
bigquery.create_table(task['table'])
322+
bigquery.create_table_if_not_exists(task['table'])
327323
else
328324
bigquery.get_table(task['table']) # raises NotFoundError
329325
end
@@ -332,7 +328,7 @@ def self.auto_create(task, bigquery)
332328
if task['mode'] == 'replace_backup'
333329
if task['time_partitioning'] and Helper.has_partition_decorator?(task['table_old'])
334330
if task['auto_create_table']
335-
bigquery.create_table(task['table_old'], dataset: task['dataset_old'])
331+
bigquery.create_table_if_not_exists(task['table_old'], dataset: task['dataset_old'])
336332
else
337333
bigquery.get_table(task['table_old'], dataset: task['dataset_old']) # raises NotFoundError
338334
end
@@ -403,7 +399,7 @@ def self.transaction(config, schema, task_count, &control)
403399

404400
if task['mode'] == 'replace_backup'
405401
begin
406-
bigquery.get_table(task['table'])
402+
bigquery.get_table_or_partition(task['table'])
407403
bigquery.copy(task['table'], task['table_old'], task['dataset_old'])
408404
rescue NotFoundError
409405
end
@@ -515,7 +511,7 @@ def load_rehearsal
515511

516512
self.class.rehearsal_thread = Thread.new do
517513
begin
518-
bigquery.create_table(task['rehearsal_table'])
514+
bigquery.create_table_if_not_exists(task['rehearsal_table'])
519515
response = bigquery.load(rehearsal_path, task['rehearsal_table'])
520516
num_output_rows = response ? response.statistics.load.output_rows.to_i : 0
521517
Embulk.logger.info { "embulk-output-bigquery: Loaded rehearsal #{num_output_rows}" }

lib/embulk/output/bigquery/bigquery_client.rb

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -116,11 +116,11 @@ def load_from_gcs(object_uris, table)
116116
if @location
117117
body[:job_reference][:location] = @location
118118
end
119-
119+
120120
if @task['schema_update_options']
121121
body[:configuration][:load][:schema_update_options] = @task['schema_update_options']
122122
end
123-
123+
124124
opts = {}
125125

126126
Embulk.logger.debug { "embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts})" }
@@ -412,7 +412,7 @@ def get_dataset(dataset = nil)
412412
end
413413
end
414414

415-
def create_table(table, dataset: nil, options: nil)
415+
def create_table_if_not_exists(table, dataset: nil, options: nil)
416416
begin
417417
dataset ||= @dataset
418418
options ||= {}
@@ -466,8 +466,13 @@ def create_table(table, dataset: nil, options: nil)
466466
end
467467

468468
def delete_table(table, dataset: nil)
469+
table = Helper.chomp_partition_decorator(table)
470+
delete_table_or_partition(table, dataset: dataset)
471+
end
472+
473+
# if `table` with a partition decorator is given, a partition is deleted.
474+
def delete_table_or_partition(table, dataset: nil)
469475
begin
470-
table = Helper.chomp_partition_decorator(table)
471476
dataset ||= @dataset
472477
Embulk.logger.info { "embulk-output-bigquery: Delete table... #{@project}:#{dataset}.#{table}" }
473478
with_network_retry { client.delete_table(@project, dataset, table) }
@@ -486,8 +491,16 @@ def delete_table(table, dataset: nil)
486491
end
487492

488493
def get_table(table, dataset: nil)
494+
table = Helper.chomp_partition_decorator(table)
495+
get_table_or_partition(table)
496+
end
497+
498+
def get_partition(table, dataset: nil)
499+
get_table_or_partition(table)
500+
end
501+
502+
def get_table_or_partition(table, dataset: nil)
489503
begin
490-
table = Helper.chomp_partition_decorator(table)
491504
dataset ||= @dataset
492505
Embulk.logger.info { "embulk-output-bigquery: Get table... #{@project}:#{dataset}.#{table}" }
493506
with_network_retry { client.get_table(@project, dataset, table) }
@@ -503,21 +516,6 @@ def get_table(table, dataset: nil)
503516
raise Error, "failed to get table #{@project}:#{dataset}.#{table}, response:#{response}"
504517
end
505518
end
506-
507-
# Is this only a way to drop partition?
508-
def delete_partition(table_with_partition, dataset: nil)
509-
dataset ||= @dataset
510-
begin
511-
table = Helper.chomp_partition_decorator(table_with_partition)
512-
get_table(table, dataset: dataset)
513-
rescue NotFoundError
514-
else
515-
Embulk.logger.info { "embulk-output-bigquery: Delete partition... #{@project}:#{dataset}.#{table_with_partition}" }
516-
Tempfile.create('embulk_output_bigquery_empty_file_') do |fp|
517-
load(fp.path, table_with_partition, write_disposition: 'WRITE_TRUNCATE')
518-
end
519-
end
520-
end
521519
end
522520
end
523521
end

lib/embulk/output/bigquery/helper.rb

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,16 @@ class Bigquery < OutputPlugin
77
class Helper
88
PARTITION_DECORATOR_REGEXP = /\$.+\z/
99

10-
def self.has_partition_decorator?(table)
11-
!!(table =~ PARTITION_DECORATOR_REGEXP)
10+
def self.field_partitioning?(task)
11+
(task['time_partitioning'] || {}).key?('field')
1212
end
1313

14-
def self.chomp_partition_decorator(table)
15-
table.sub(PARTITION_DECORATOR_REGEXP, '')
14+
def self.has_partition_decorator?(table_name)
15+
!!(table_name =~ PARTITION_DECORATOR_REGEXP)
16+
end
17+
18+
def self.chomp_partition_decorator(table_name)
19+
table_name.sub(PARTITION_DECORATOR_REGEXP, '')
1620
end
1721

1822
def self.bq_type_from_embulk_type(embulk_type)

test/test_transaction.rb

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,8 @@ def test_delete_in_advance
8686
task = Bigquery.configure(config, schema, processor_count)
8787
any_instance_of(BigqueryClient) do |obj|
8888
mock(obj).get_dataset(config['dataset'])
89-
mock(obj).delete_table(config['table'])
90-
mock(obj).create_table(config['table'])
89+
mock(obj).delete_table_or_partition(config['table'])
90+
mock(obj).create_table_if_not_exists(config['table'])
9191
end
9292
Bigquery.transaction(config, schema, processor_count, &control)
9393
end
@@ -97,8 +97,8 @@ def test_delete_in_advance_with_partitioning
9797
task = Bigquery.configure(config, schema, processor_count)
9898
any_instance_of(BigqueryClient) do |obj|
9999
mock(obj).get_dataset(config['dataset'])
100-
mock(obj).delete_partition(config['table'])
101-
mock(obj).create_table(config['table'])
100+
mock(obj).delete_table_or_partition(config['table'])
101+
mock(obj).create_table_if_not_exists(config['table'])
102102
end
103103
Bigquery.transaction(config, schema, processor_count, &control)
104104
end
@@ -153,7 +153,7 @@ def test_replace_backup
153153
mock(obj).get_dataset(config['dataset_old'])
154154
mock(obj).create_table(config['temp_table'])
155155

156-
mock(obj).get_table(task['table'])
156+
mock(obj).get_table_or_partition(task['table'])
157157
mock(obj).copy(config['table'], config['table_old'], config['dataset_old'])
158158

159159
mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_TRUNCATE')
@@ -170,7 +170,7 @@ def test_replace_backup_auto_create_dataset
170170
mock(obj).create_dataset(config['dataset_old'], reference: config['dataset'])
171171
mock(obj).create_table(config['temp_table'])
172172

173-
mock(obj).get_table(task['table'])
173+
mock(obj).get_table_or_partition(task['table'])
174174
mock(obj).copy(config['table'], config['table_old'], config['dataset_old'])
175175

176176
mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_TRUNCATE')
@@ -189,7 +189,7 @@ def test_replace_backup_with_partitioning
189189
mock(obj).get_table(task['table'])
190190
mock(obj).get_table(task['table_old'], dataset: config['dataset_old'])
191191

192-
mock(obj).get_table(task['table'])
192+
mock(obj).get_table_or_partition(task['table'])
193193
mock(obj).copy(config['table'], config['table_old'], config['dataset_old'])
194194

195195
mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_TRUNCATE')
@@ -208,7 +208,7 @@ def test_replace_backup_with_partitioning_auto_create_table
208208
mock(obj).create_table(task['table'])
209209
mock(obj).create_table(task['table_old'], dataset: config['dataset_old'])
210210

211-
mock(obj).get_table(task['table'])
211+
mock(obj).get_table_or_partition(task['table'])
212212
mock(obj).copy(config['table'], config['table_old'], config['dataset_old'])
213213

214214
mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_TRUNCATE')

0 commit comments

Comments
 (0)