Skip to content

Commit eaef6f3

Browse files
authored
Merge pull request #101 from embulk/support-field-partitioning
Support field partitioning
2 parents 8c3052a + 453540d commit eaef6f3

12 files changed

+206
-103
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,8 @@ This is not transactional, i.e., if fails, the target table could have some rows
158158

159159
```is_skip_job_result_check``` must be false when replace mode
160160

161+
NOTE: BigQuery does not support replacing (actually, copying into) a non-partitioned table with a paritioned table atomically. You must once delete the non-partitioned table, otherwise, you get `Incompatible table partitioning specification when copying to the column partitioned table` error.
162+
161163
##### replace_backup
162164

163165
1. Load to temporary table (Create and WRITE_APPEND in parallel)
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
in:
2+
type: file
3+
path_prefix: example/example.csv
4+
parser:
5+
type: csv
6+
charset: UTF-8
7+
newline: CRLF
8+
null_string: 'NULL'
9+
skip_header_lines: 1
10+
comment_line_marker: '#'
11+
columns:
12+
- {name: date, type: string}
13+
- {name: timestamp, type: timestamp, format: "%Y-%m-%d %H:%M:%S.%N", timezone: "+09:00"}
14+
- {name: "null", type: string}
15+
- {name: long, type: long}
16+
- {name: string, type: string}
17+
- {name: double, type: double}
18+
- {name: boolean, type: boolean}
19+
out:
20+
type: bigquery
21+
mode: delete_in_advance
22+
auth_method: json_key
23+
json_keyfile: example/your-project-000.json
24+
dataset: your_dataset_name
25+
table: your_field_partitioned_table_name
26+
source_format: NEWLINE_DELIMITED_JSON
27+
compression: NONE
28+
auto_create_dataset: true
29+
auto_create_table: true
30+
schema_file: example/schema.json
31+
time_partitioning:
32+
type: 'DAY'
33+
field: timestamp
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
in:
2+
type: file
3+
path_prefix: example/example.csv
4+
parser:
5+
type: csv
6+
charset: UTF-8
7+
newline: CRLF
8+
null_string: 'NULL'
9+
skip_header_lines: 1
10+
comment_line_marker: '#'
11+
columns:
12+
- {name: date, type: string}
13+
- {name: timestamp, type: timestamp, format: "%Y-%m-%d %H:%M:%S.%N", timezone: "+09:00"}
14+
- {name: "null", type: string}
15+
- {name: long, type: long}
16+
- {name: string, type: string}
17+
- {name: double, type: double}
18+
- {name: boolean, type: boolean}
19+
out:
20+
type: bigquery
21+
mode: replace_backup
22+
auth_method: json_key
23+
json_keyfile: example/your-project-000.json
24+
dataset: your_dataset_name
25+
table: your_field_partitioned_table_name
26+
table_old: your_field_partitioned_table_name_old
27+
source_format: NEWLINE_DELIMITED_JSON
28+
compression: NONE
29+
auto_create_dataset: true
30+
auto_create_table: true
31+
schema_file: example/schema.json
32+
time_partitioning:
33+
type: 'DAY'
34+
field: 'timestamp'
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
in:
2+
type: file
3+
path_prefix: example/example.csv
4+
parser:
5+
type: csv
6+
charset: UTF-8
7+
newline: CRLF
8+
null_string: 'NULL'
9+
skip_header_lines: 1
10+
comment_line_marker: '#'
11+
columns:
12+
- {name: date, type: string}
13+
- {name: timestamp, type: timestamp, format: "%Y-%m-%d %H:%M:%S.%N", timezone: "+09:00"}
14+
- {name: "null", type: string}
15+
- {name: long, type: long}
16+
- {name: string, type: string}
17+
- {name: double, type: double}
18+
- {name: boolean, type: boolean}
19+
out:
20+
type: bigquery
21+
mode: replace
22+
auth_method: json_key
23+
json_keyfile: example/your-project-000.json
24+
dataset: your_dataset_name
25+
table: your_field_partitioned_table_name
26+
source_format: NEWLINE_DELIMITED_JSON
27+
compression: NONE
28+
auto_create_dataset: true
29+
auto_create_table: true
30+
schema_file: example/schema.json
31+
time_partitioning:
32+
type: 'DAY'
33+
field: 'timestamp'

example/example.jsonl

Lines changed: 0 additions & 16 deletions
This file was deleted.

lib/embulk/output/bigquery.rb

Lines changed: 34 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ def self.configure(config, schema, task_count)
6464
'default_timestamp_format' => config.param('default_timestamp_format', :string, :default => ValueConverterFactory::DEFAULT_TIMESTAMP_FORMAT),
6565
'payload_column' => config.param('payload_column', :string, :default => nil),
6666
'payload_column_index' => config.param('payload_column_index', :integer, :default => nil),
67-
67+
6868
'open_timeout_sec' => config.param('open_timeout_sec', :integer, :default => nil),
6969
'timeout_sec' => config.param('timeout_sec', :integer, :default => nil), # google-api-ruby-client < v0.11.0
7070
'send_timeout_sec' => config.param('send_timeout_sec', :integer, :default => nil), # google-api-ruby-client >= v0.11.0
@@ -276,7 +276,7 @@ def self.transaction_report(task, responses)
276276
sum + (response ? response.statistics.load.output_rows.to_i : 0)
277277
end
278278
if task['temp_table']
279-
num_output_rows = bigquery.get_table(task['temp_table']).num_rows.to_i
279+
num_output_rows = bigquery.get_table_or_partition(task['temp_table']).num_rows.to_i
280280
else
281281
num_output_rows = num_response_rows
282282
end
@@ -306,37 +306,48 @@ def self.auto_create(task, bigquery)
306306

307307
case task['mode']
308308
when 'delete_in_advance'
309-
if task['time_partitioning']
310-
bigquery.delete_partition(task['table'])
311-
else
312-
bigquery.delete_table(task['table'])
309+
bigquery.delete_partition(task['table'])
310+
bigquery.create_table_if_not_exists(task['table'])
311+
when 'replace'
312+
bigquery.create_table_if_not_exists(task['temp_table'])
313+
if Helper.has_partition_decorator?(task['table'])
314+
if task['auto_create_table']
315+
bigquery.create_table_if_not_exists(task['table'])
316+
else
317+
bigquery.get_table(task['table']) # raises NotFoundError
318+
end
313319
end
314-
bigquery.create_table(task['table'])
315-
when 'replace', 'replace_backup', 'append'
316-
bigquery.create_table(task['temp_table'])
317-
if task['time_partitioning']
320+
when 'append'
321+
bigquery.create_table_if_not_exists(task['temp_table'])
322+
if Helper.has_partition_decorator?(task['table'])
318323
if task['auto_create_table']
319-
bigquery.create_table(task['table'])
324+
bigquery.create_table_if_not_exists(task['table'])
320325
else
321326
bigquery.get_table(task['table']) # raises NotFoundError
322327
end
323328
end
324-
else # append_direct
325-
if task['auto_create_table']
326-
bigquery.create_table(task['table'])
327-
else
328-
bigquery.get_table(task['table']) # raises NotFoundError
329+
when 'replace_backup'
330+
bigquery.create_table_if_not_exists(task['temp_table'])
331+
if Helper.has_partition_decorator?(task['table'])
332+
if task['auto_create_table']
333+
bigquery.create_table_if_not_exists(task['table'])
334+
else
335+
bigquery.get_table(task['table']) # raises NotFoundError
336+
end
329337
end
330-
end
331-
332-
if task['mode'] == 'replace_backup'
333-
if task['time_partitioning'] and Helper.has_partition_decorator?(task['table_old'])
338+
if Helper.has_partition_decorator?(task['table_old'])
334339
if task['auto_create_table']
335-
bigquery.create_table(task['table_old'], dataset: task['dataset_old'])
340+
bigquery.create_table_if_not_exists(task['table_old'], dataset: task['dataset_old'])
336341
else
337342
bigquery.get_table(task['table_old'], dataset: task['dataset_old']) # raises NotFoundError
338343
end
339344
end
345+
else # append_direct
346+
if task['auto_create_table']
347+
bigquery.create_table_if_not_exists(task['table'])
348+
else
349+
bigquery.get_table(task['table']) # raises NotFoundError
350+
end
340351
end
341352
end
342353

@@ -403,7 +414,7 @@ def self.transaction(config, schema, task_count, &control)
403414

404415
if task['mode'] == 'replace_backup'
405416
begin
406-
bigquery.get_table(task['table'])
417+
bigquery.get_table_or_partition(task['table'])
407418
bigquery.copy(task['table'], task['table_old'], task['dataset_old'])
408419
rescue NotFoundError
409420
end
@@ -515,7 +526,7 @@ def load_rehearsal
515526

516527
self.class.rehearsal_thread = Thread.new do
517528
begin
518-
bigquery.create_table(task['rehearsal_table'])
529+
bigquery.create_table_if_not_exists(task['rehearsal_table'])
519530
response = bigquery.load(rehearsal_path, task['rehearsal_table'])
520531
num_output_rows = response ? response.statistics.load.output_rows.to_i : 0
521532
Embulk.logger.info { "embulk-output-bigquery: Loaded rehearsal #{num_output_rows}" }

lib/embulk/output/bigquery/bigquery_client.rb

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -116,11 +116,11 @@ def load_from_gcs(object_uris, table)
116116
if @location
117117
body[:job_reference][:location] = @location
118118
end
119-
119+
120120
if @task['schema_update_options']
121121
body[:configuration][:load][:schema_update_options] = @task['schema_update_options']
122122
end
123-
123+
124124
opts = {}
125125

126126
Embulk.logger.debug { "embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts})" }
@@ -412,7 +412,7 @@ def get_dataset(dataset = nil)
412412
end
413413
end
414414

415-
def create_table(table, dataset: nil, options: nil)
415+
def create_table_if_not_exists(table, dataset: nil, options: nil)
416416
begin
417417
dataset ||= @dataset
418418
options ||= {}
@@ -466,8 +466,17 @@ def create_table(table, dataset: nil, options: nil)
466466
end
467467

468468
def delete_table(table, dataset: nil)
469+
table = Helper.chomp_partition_decorator(table)
470+
delete_table_or_partition(table, dataset: dataset)
471+
end
472+
473+
def delete_partition(table, dataset: nil)
474+
delete_table_or_partition(table, dataset: dataset)
475+
end
476+
477+
# if `table` with a partition decorator is given, a partition is deleted.
478+
def delete_table_or_partition(table, dataset: nil)
469479
begin
470-
table = Helper.chomp_partition_decorator(table)
471480
dataset ||= @dataset
472481
Embulk.logger.info { "embulk-output-bigquery: Delete table... #{@project}:#{dataset}.#{table}" }
473482
with_network_retry { client.delete_table(@project, dataset, table) }
@@ -486,8 +495,16 @@ def delete_table(table, dataset: nil)
486495
end
487496

488497
def get_table(table, dataset: nil)
498+
table = Helper.chomp_partition_decorator(table)
499+
get_table_or_partition(table)
500+
end
501+
502+
def get_partition(table, dataset: nil)
503+
get_table_or_partition(table)
504+
end
505+
506+
def get_table_or_partition(table, dataset: nil)
489507
begin
490-
table = Helper.chomp_partition_decorator(table)
491508
dataset ||= @dataset
492509
Embulk.logger.info { "embulk-output-bigquery: Get table... #{@project}:#{dataset}.#{table}" }
493510
with_network_retry { client.get_table(@project, dataset, table) }
@@ -503,21 +520,6 @@ def get_table(table, dataset: nil)
503520
raise Error, "failed to get table #{@project}:#{dataset}.#{table}, response:#{response}"
504521
end
505522
end
506-
507-
# Is this only a way to drop partition?
508-
def delete_partition(table_with_partition, dataset: nil)
509-
dataset ||= @dataset
510-
begin
511-
table = Helper.chomp_partition_decorator(table_with_partition)
512-
get_table(table, dataset: dataset)
513-
rescue NotFoundError
514-
else
515-
Embulk.logger.info { "embulk-output-bigquery: Delete partition... #{@project}:#{dataset}.#{table_with_partition}" }
516-
Tempfile.create('embulk_output_bigquery_empty_file_') do |fp|
517-
load(fp.path, table_with_partition, write_disposition: 'WRITE_TRUNCATE')
518-
end
519-
end
520-
end
521523
end
522524
end
523525
end

lib/embulk/output/bigquery/helper.rb

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,16 @@ class Bigquery < OutputPlugin
77
class Helper
88
PARTITION_DECORATOR_REGEXP = /\$.+\z/
99

10-
def self.has_partition_decorator?(table)
11-
!!(table =~ PARTITION_DECORATOR_REGEXP)
10+
def self.field_partitioning?(task)
11+
(task['time_partitioning'] || {}).key?('field')
1212
end
1313

14-
def self.chomp_partition_decorator(table)
15-
table.sub(PARTITION_DECORATOR_REGEXP, '')
14+
def self.has_partition_decorator?(table_name)
15+
!!(table_name =~ PARTITION_DECORATOR_REGEXP)
16+
end
17+
18+
def self.chomp_partition_decorator(table_name)
19+
table_name.sub(PARTITION_DECORATOR_REGEXP, '')
1620
end
1721

1822
def self.bq_type_from_embulk_type(embulk_type)

0 commit comments

Comments
 (0)