Skip to content

Commit 9c135dd

Browse files
authored
Merge pull request #140 from TKNGUE/master
Add an expiration option of temporary table to clean up
2 parents 559fd12 + 247c37e commit 9c135dd

File tree

3 files changed

+21
-3
lines changed

3 files changed

+21
-3
lines changed

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,13 @@ Options for intermediate local files
7979
| delete_from_local_when_job_end | boolean | optional | true | If set to true, delete generate local files when job is end |
8080
| compression | string | optional | "NONE" | Compression of local files (`GZIP` or `NONE`) |
8181

82+
83+
Options for intermediate tables on BigQuery
84+
85+
| name | type | required? | default | description |
86+
|:-------------------------------------|:------------|:-----------|:-------------------------|:-----------------------|
87+
| temporary_table_expiration | integer | optional | | Temporary table's expiration time in seconds |
88+
8289
`source_format` is also used to determine formatter (csv or jsonl).
8390

8491
#### Same options of bq command-line tools or BigQuery job's property

lib/embulk/output/bigquery.rb

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ def self.configure(config, schema, task_count)
9090
'clustering' => config.param('clustering', :hash, :default => nil), # google-api-ruby-client >= v0.21.0
9191
'schema_update_options' => config.param('schema_update_options', :array, :default => nil),
9292

93+
'temporary_table_expiration' => config.param('temporary_table_expiration', :integer, :default => nil),
94+
9395
# for debug
9496
'skip_load' => config.param('skip_load', :bool, :default => false),
9597
'temp_table' => config.param('temp_table', :string, :default => nil),
@@ -300,19 +302,23 @@ def self.auto_create(task, bigquery)
300302
end
301303
end
302304

305+
temp_table_expiration = task['temporary_table_expiration']
306+
temp_options = {'expiration_time' => temp_table_expiration}
307+
303308
case task['mode']
304309
when 'delete_in_advance'
305310
bigquery.delete_table_or_partition(task['table'])
306311
bigquery.create_table_if_not_exists(task['table'])
307312
when 'replace'
308-
bigquery.create_table_if_not_exists(task['temp_table'])
313+
bigquery.create_table_if_not_exists(task['temp_table'], options: temp_options)
309314
bigquery.create_table_if_not_exists(task['table']) # needs for when task['table'] is a partition
310315
when 'append'
311-
bigquery.create_table_if_not_exists(task['temp_table'])
316+
bigquery.create_table_if_not_exists(task['temp_table'], options: temp_options)
312317
bigquery.create_table_if_not_exists(task['table']) # needs for when task['table'] is a partition
313318
when 'replace_backup'
314-
bigquery.create_table_if_not_exists(task['temp_table'])
319+
bigquery.create_table_if_not_exists(task['temp_table'], options: temp_options)
315320
bigquery.create_table_if_not_exists(task['table'])
321+
316322
bigquery.create_table_if_not_exists(task['table_old'], dataset: task['dataset_old']) # needs for when a partition
317323
else # append_direct
318324
if task['auto_create_table']

lib/embulk/output/bigquery/bigquery_client.rb

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,11 @@ def create_table_if_not_exists(table, dataset: nil, options: nil)
440440
}
441441
end
442442

443+
if options['expiration_time']
444+
# expiration_time is expressed in milliseconds
445+
body[:expiration_time] = (Time.now.to_i + options['expiration_time']) * 1000
446+
end
447+
443448
opts = {}
444449
Embulk.logger.debug { "embulk-output-bigquery: insert_table(#{@project}, #{dataset}, #{@location_for_log}, #{body}, #{opts})" }
445450
with_network_retry { client.insert_table(@project, dataset, body, opts) }

0 commit comments

Comments
 (0)