Skip to content

Commit 453540d

Browse files
committed
Support field paritioning
1 parent 7ecb496 commit 453540d

11 files changed

+168
-63
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,8 @@ This is not transactional, i.e., if fails, the target table could have some rows
158158

159159
```is_skip_job_result_check``` must be false when replace mode
160160

161+
NOTE: BigQuery does not support replacing (actually, copying into) a non-partitioned table with a paritioned table atomically. You must once delete the non-partitioned table, otherwise, you get `Incompatible table partitioning specification when copying to the column partitioned table` error.
162+
161163
##### replace_backup
162164

163165
1. Load to temporary table (Create and WRITE_APPEND in parallel)
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
in:
2+
type: file
3+
path_prefix: example/example.csv
4+
parser:
5+
type: csv
6+
charset: UTF-8
7+
newline: CRLF
8+
null_string: 'NULL'
9+
skip_header_lines: 1
10+
comment_line_marker: '#'
11+
columns:
12+
- {name: date, type: string}
13+
- {name: timestamp, type: timestamp, format: "%Y-%m-%d %H:%M:%S.%N", timezone: "+09:00"}
14+
- {name: "null", type: string}
15+
- {name: long, type: long}
16+
- {name: string, type: string}
17+
- {name: double, type: double}
18+
- {name: boolean, type: boolean}
19+
out:
20+
type: bigquery
21+
mode: delete_in_advance
22+
auth_method: json_key
23+
json_keyfile: example/your-project-000.json
24+
dataset: your_dataset_name
25+
table: your_field_partitioned_table_name
26+
source_format: NEWLINE_DELIMITED_JSON
27+
compression: NONE
28+
auto_create_dataset: true
29+
auto_create_table: true
30+
schema_file: example/schema.json
31+
time_partitioning:
32+
type: 'DAY'
33+
field: timestamp
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
in:
2+
type: file
3+
path_prefix: example/example.csv
4+
parser:
5+
type: csv
6+
charset: UTF-8
7+
newline: CRLF
8+
null_string: 'NULL'
9+
skip_header_lines: 1
10+
comment_line_marker: '#'
11+
columns:
12+
- {name: date, type: string}
13+
- {name: timestamp, type: timestamp, format: "%Y-%m-%d %H:%M:%S.%N", timezone: "+09:00"}
14+
- {name: "null", type: string}
15+
- {name: long, type: long}
16+
- {name: string, type: string}
17+
- {name: double, type: double}
18+
- {name: boolean, type: boolean}
19+
out:
20+
type: bigquery
21+
mode: replace_backup
22+
auth_method: json_key
23+
json_keyfile: example/your-project-000.json
24+
dataset: your_dataset_name
25+
table: your_field_partitioned_table_name
26+
table_old: your_field_partitioned_table_name_old
27+
source_format: NEWLINE_DELIMITED_JSON
28+
compression: NONE
29+
auto_create_dataset: true
30+
auto_create_table: true
31+
schema_file: example/schema.json
32+
time_partitioning:
33+
type: 'DAY'
34+
field: 'timestamp'
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
in:
2+
type: file
3+
path_prefix: example/example.csv
4+
parser:
5+
type: csv
6+
charset: UTF-8
7+
newline: CRLF
8+
null_string: 'NULL'
9+
skip_header_lines: 1
10+
comment_line_marker: '#'
11+
columns:
12+
- {name: date, type: string}
13+
- {name: timestamp, type: timestamp, format: "%Y-%m-%d %H:%M:%S.%N", timezone: "+09:00"}
14+
- {name: "null", type: string}
15+
- {name: long, type: long}
16+
- {name: string, type: string}
17+
- {name: double, type: double}
18+
- {name: boolean, type: boolean}
19+
out:
20+
type: bigquery
21+
mode: replace
22+
auth_method: json_key
23+
json_keyfile: example/your-project-000.json
24+
dataset: your_dataset_name
25+
table: your_field_partitioned_table_name
26+
source_format: NEWLINE_DELIMITED_JSON
27+
compression: NONE
28+
auto_create_dataset: true
29+
auto_create_table: true
30+
schema_file: example/schema.json
31+
time_partitioning:
32+
type: 'DAY'
33+
field: 'timestamp'

example/example.jsonl

Lines changed: 0 additions & 16 deletions
This file was deleted.

lib/embulk/output/bigquery.rb

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -306,33 +306,48 @@ def self.auto_create(task, bigquery)
306306

307307
case task['mode']
308308
when 'delete_in_advance'
309-
bigquery.delete_table_or_partition(task['table'])
309+
bigquery.delete_partition(task['table'])
310310
bigquery.create_table_if_not_exists(task['table'])
311-
when 'replace', 'replace_backup', 'append'
311+
when 'replace'
312312
bigquery.create_table_if_not_exists(task['temp_table'])
313-
if task['time_partitioning']
313+
if Helper.has_partition_decorator?(task['table'])
314314
if task['auto_create_table']
315315
bigquery.create_table_if_not_exists(task['table'])
316316
else
317317
bigquery.get_table(task['table']) # raises NotFoundError
318318
end
319319
end
320-
else # append_direct
321-
if task['auto_create_table']
322-
bigquery.create_table_if_not_exists(task['table'])
323-
else
324-
bigquery.get_table(task['table']) # raises NotFoundError
320+
when 'append'
321+
bigquery.create_table_if_not_exists(task['temp_table'])
322+
if Helper.has_partition_decorator?(task['table'])
323+
if task['auto_create_table']
324+
bigquery.create_table_if_not_exists(task['table'])
325+
else
326+
bigquery.get_table(task['table']) # raises NotFoundError
327+
end
325328
end
326-
end
327-
328-
if task['mode'] == 'replace_backup'
329-
if task['time_partitioning'] and Helper.has_partition_decorator?(task['table_old'])
329+
when 'replace_backup'
330+
bigquery.create_table_if_not_exists(task['temp_table'])
331+
if Helper.has_partition_decorator?(task['table'])
332+
if task['auto_create_table']
333+
bigquery.create_table_if_not_exists(task['table'])
334+
else
335+
bigquery.get_table(task['table']) # raises NotFoundError
336+
end
337+
end
338+
if Helper.has_partition_decorator?(task['table_old'])
330339
if task['auto_create_table']
331340
bigquery.create_table_if_not_exists(task['table_old'], dataset: task['dataset_old'])
332341
else
333342
bigquery.get_table(task['table_old'], dataset: task['dataset_old']) # raises NotFoundError
334343
end
335344
end
345+
else # append_direct
346+
if task['auto_create_table']
347+
bigquery.create_table_if_not_exists(task['table'])
348+
else
349+
bigquery.get_table(task['table']) # raises NotFoundError
350+
end
336351
end
337352
end
338353

lib/embulk/output/bigquery/bigquery_client.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,10 @@ def delete_table(table, dataset: nil)
470470
delete_table_or_partition(table, dataset: dataset)
471471
end
472472

473+
def delete_partition(table, dataset: nil)
474+
delete_table_or_partition(table, dataset: dataset)
475+
end
476+
473477
# if `table` with a partition decorator is given, a partition is deleted.
474478
def delete_table_or_partition(table, dataset: nil)
475479
begin

test/test_bigquery_client.rb

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -96,20 +96,20 @@ def test_get_dataset_not_found
9696
end
9797
end
9898

99-
sub_test_case "create_table" do
100-
def test_create_table
99+
sub_test_case "create_table_if_not_exists" do
100+
def test_create_table_if_not_exists
101101
client.delete_table('your_table_name')
102-
assert_nothing_raised { client.create_table('your_table_name') }
102+
assert_nothing_raised { client.create_table_if_not_exists('your_table_name') }
103103
end
104104

105-
def test_create_table_already_exists
106-
assert_nothing_raised { client.create_table('your_table_name') }
105+
def test_create_table_if_not_exists_already_exists
106+
assert_nothing_raised { client.create_table_if_not_exists('your_table_name') }
107107
end
108108

109109
def test_create_partitioned_table
110110
client.delete_table('your_table_name')
111111
assert_nothing_raised do
112-
client.create_table('your_table_name$20160929', options:{
112+
client.create_table_if_not_exists('your_table_name$20160929', options:{
113113
'time_partitioning' => {'type'=>'DAY', 'expiration_ms'=>1000}
114114
})
115115
end
@@ -118,7 +118,7 @@ def test_create_partitioned_table
118118

119119
sub_test_case "delete_table" do
120120
def test_delete_table
121-
client.create_table('your_table_name')
121+
client.create_table_if_not_exists('your_table_name')
122122
assert_nothing_raised { client.delete_table('your_table_name') }
123123
end
124124

@@ -127,14 +127,14 @@ def test_delete_table_not_found
127127
end
128128

129129
def test_delete_partitioned_table
130-
client.create_table('your_table_name')
130+
client.create_table_if_not_exists('your_table_name')
131131
assert_nothing_raised { client.delete_table('your_table_name$20160929') }
132132
end
133133
end
134134

135135
sub_test_case "get_table" do
136136
def test_get_table
137-
client.create_table('your_table_name')
137+
client.create_table_if_not_exists('your_table_name')
138138
assert_nothing_raised { client.get_table('your_table_name') }
139139
end
140140

@@ -146,23 +146,23 @@ def test_get_table_not_found
146146
end
147147

148148
def test_get_partitioned_table
149-
client.create_table('your_table_name')
149+
client.create_table_if_not_exists('your_table_name')
150150
assert_nothing_raised { client.get_table('your_table_name$20160929') }
151151
end
152152
end
153153

154154
sub_test_case "delete_partition" do
155155
def test_delete_partition
156156
client.delete_table('your_table_name')
157-
client.create_table('your_table_name$20160929')
157+
client.create_table_if_not_exists('your_table_name$20160929')
158158
assert_nothing_raised { client.delete_partition('your_table_name$20160929') }
159159
ensure
160160
client.delete_table('your_table_name')
161161
end
162162

163163
def test_delete_partition_of_non_partitioned_table
164164
client.delete_table('your_table_name')
165-
client.create_table('your_table_name')
165+
client.create_table_if_not_exists('your_table_name')
166166
assert_raise { client.delete_partition('your_table_name$20160929') }
167167
ensure
168168
client.delete_table('your_table_name')
@@ -175,7 +175,7 @@ def test_delete_partition_table_not_found
175175

176176
sub_test_case "fields" do
177177
def test_fields_from_table
178-
client.create_table('your_table_name')
178+
client.create_table_if_not_exists('your_table_name')
179179
fields = client.fields_from_table('your_table_name')
180180
expected = [
181181
{:type=>"BOOLEAN", :name=>"boolean"},
@@ -190,15 +190,15 @@ def test_fields_from_table
190190
end
191191

192192
sub_test_case "copy" do
193-
def test_create_table
194-
client.create_table('your_table_name')
193+
def test_create_table_if_not_exists
194+
client.create_table_if_not_exists('your_table_name')
195195
assert_nothing_raised { client.copy('your_table_name', 'your_table_name_old') }
196196
end
197197
end
198198

199199
sub_test_case "load" do
200200
def test_load
201-
client.create_table('your_table_name')
201+
client.create_table_if_not_exists('your_table_name')
202202
File.write("tmp/your_file_name.csv", record.to_csv)
203203
assert_nothing_raised { client.load("/tmp/your_file_name.csv", 'your_table_name') }
204204
end

0 commit comments

Comments
 (0)