Skip to content

Commit cb8c50f

Browse files
authored
Merge pull request #126 from ck-fm0211/add-billing-project
Specify the billing project and the project to which the data will be loaded separately.
2 parents b7ad847 + 720b218 commit cb8c50f

File tree

7 files changed

+84
-35
lines changed

7 files changed

+84
-35
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ OAuth flow for installed applications.
3333
| auth_method | string | optional | "application\_default" | See [Authentication](#authentication) |
3434
| json_keyfile | string | optional | | keyfile path or `content` |
3535
| project | string | required unless service\_account's `json_keyfile` is given. | | project\_id |
36+
| destination_project | string | optional | `project` value | A destination project to which the data will be loaded. Use this if you want to separate a billing project (the `project` value) and a destination project (the `destination_project` value). |
3637
| dataset | string | required | | dataset |
3738
| location | string | optional | nil | geographic location of dataset. See [Location](#location) |
3839
| table | string | required | | table name, or table name with a partition decorator such as `table_name$20160929`|
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
in:
2+
type: file
3+
path_prefix: example/example.csv
4+
parser:
5+
type: csv
6+
charset: UTF-8
7+
newline: CRLF
8+
null_string: 'NULL'
9+
skip_header_lines: 1
10+
comment_line_marker: '#'
11+
columns:
12+
- {name: date, type: string}
13+
- {name: timestamp, type: timestamp, format: "%Y-%m-%d %H:%M:%S.%N", timezone: "+09:00"}
14+
- {name: "null", type: string}
15+
- {name: long, type: long}
16+
- {name: string, type: string}
17+
- {name: double, type: double}
18+
- {name: boolean, type: boolean}
19+
out:
20+
type: bigquery
21+
mode: replace
22+
auth_method: service_account
23+
json_keyfile: example/your-project-000.json
24+
project: your_project_name
25+
destination_project: your_destination_project_name
26+
dataset: your_dataset_name
27+
table: your_table_name
28+
source_format: NEWLINE_DELIMITED_JSON
29+
compression: NONE
30+
auto_create_dataset: true
31+
auto_create_table: true
32+
schema_file: example/schema.json

lib/embulk/output/bigquery.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ def self.configure(config, schema, task_count)
3636
'auth_method' => config.param('auth_method', :string, :default => 'application_default'),
3737
'json_keyfile' => config.param('json_keyfile', LocalFile, :default => nil),
3838
'project' => config.param('project', :string, :default => nil),
39+
'destination_project' => config.param('destination_project', :string, :default => nil),
3940
'dataset' => config.param('dataset', :string),
4041
'location' => config.param('location', :string, :default => nil),
4142
'table' => config.param('table', :string),
@@ -141,6 +142,7 @@ def self.configure(config, schema, task_count)
141142
if task['project'].nil?
142143
raise ConfigError.new "Required field \"project\" is not set"
143144
end
145+
task['destination_project'] ||= task['project']
144146

145147
if (task['payload_column'] or task['payload_column_index']) and task['auto_create_table']
146148
if task['schema_file'].nil? and task['template_table'].nil?

lib/embulk/output/bigquery/bigquery_client.rb

Lines changed: 30 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ def initialize(task, schema, fields = nil)
1818
@schema = schema
1919
reset_fields(fields) if fields
2020
@project = @task['project']
21+
@destination_project = @task['destination_project']
2122
@dataset = @task['dataset']
2223
@location = @task['location']
2324
@location_for_log = @location.nil? ? 'us/eu' : @location
@@ -80,7 +81,7 @@ def load_from_gcs(object_uris, table)
8081
# As https://cloud.google.com/bigquery/docs/managing_jobs_datasets_projects#managingjobs says,
8182
# we should generate job_id in client code, otherwise, retrying would cause duplication
8283
job_id = "embulk_load_job_#{SecureRandom.uuid}"
83-
Embulk.logger.info { "embulk-output-bigquery: Load job starting... job_id:[#{job_id}] #{object_uris} => #{@project}:#{@dataset}.#{table} in #{@location_for_log}" }
84+
Embulk.logger.info { "embulk-output-bigquery: Load job starting... job_id:[#{job_id}] #{object_uris} => #{@destination_project}:#{@dataset}.#{table} in #{@location_for_log}" }
8485

8586
body = {
8687
job_reference: {
@@ -90,7 +91,7 @@ def load_from_gcs(object_uris, table)
9091
configuration: {
9192
load: {
9293
destination_table: {
93-
project_id: @project,
94+
project_id: @destination_project,
9495
dataset_id: @dataset,
9596
table_id: table,
9697
},
@@ -130,7 +131,7 @@ def load_from_gcs(object_uris, table)
130131
Embulk.logger.error {
131132
"embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts}), response:#{response}"
132133
}
133-
raise Error, "failed to load #{object_uris} to #{@project}:#{@dataset}.#{table} in #{@location_for_log}, response:#{response}"
134+
raise Error, "failed to load #{object_uris} to #{@destination_project}:#{@dataset}.#{table} in #{@location_for_log}, response:#{response}"
134135
end
135136
end
136137
end
@@ -171,7 +172,7 @@ def load(path, table, write_disposition: 'WRITE_APPEND')
171172
# As https://cloud.google.com/bigquery/docs/managing_jobs_datasets_projects#managingjobs says,
172173
# we should generate job_id in client code, otherwise, retrying would cause duplication
173174
job_id = "embulk_load_job_#{SecureRandom.uuid}"
174-
Embulk.logger.info { "embulk-output-bigquery: Load job starting... job_id:[#{job_id}] #{path} => #{@project}:#{@dataset}.#{table} in #{@location_for_log}" }
175+
Embulk.logger.info { "embulk-output-bigquery: Load job starting... job_id:[#{job_id}] #{path} => #{@destination_project}:#{@dataset}.#{table} in #{@location_for_log}" }
175176
else
176177
Embulk.logger.info { "embulk-output-bigquery: Load job starting... #{path} does not exist, skipped" }
177178
return
@@ -185,7 +186,7 @@ def load(path, table, write_disposition: 'WRITE_APPEND')
185186
configuration: {
186187
load: {
187188
destination_table: {
188-
project_id: @project,
189+
project_id: @destination_project,
189190
dataset_id: @dataset,
190191
table_id: table,
191192
},
@@ -232,7 +233,7 @@ def load(path, table, write_disposition: 'WRITE_APPEND')
232233
Embulk.logger.error {
233234
"embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts}), response:#{response}"
234235
}
235-
raise Error, "failed to load #{path} to #{@project}:#{@dataset}.#{table} in #{@location_for_log}, response:#{response}"
236+
raise Error, "failed to load #{path} to #{@destination_project}:#{@dataset}.#{table} in #{@location_for_log}, response:#{response}"
236237
end
237238
end
238239
end
@@ -245,7 +246,7 @@ def copy(source_table, destination_table, destination_dataset = nil, write_dispo
245246

246247
Embulk.logger.info {
247248
"embulk-output-bigquery: Copy job starting... job_id:[#{job_id}] " \
248-
"#{@project}:#{@dataset}.#{source_table} => #{@project}:#{destination_dataset}.#{destination_table}"
249+
"#{@destination_project}:#{@dataset}.#{source_table} => #{@destination_project}:#{destination_dataset}.#{destination_table}"
249250
}
250251

251252
body = {
@@ -258,12 +259,12 @@ def copy(source_table, destination_table, destination_dataset = nil, write_dispo
258259
create_deposition: 'CREATE_IF_NEEDED',
259260
write_disposition: write_disposition,
260261
source_table: {
261-
project_id: @project,
262+
project_id: @destination_project,
262263
dataset_id: @dataset,
263264
table_id: source_table,
264265
},
265266
destination_table: {
266-
project_id: @project,
267+
project_id: @destination_project,
267268
dataset_id: destination_dataset,
268269
table_id: destination_table,
269270
},
@@ -284,8 +285,8 @@ def copy(source_table, destination_table, destination_dataset = nil, write_dispo
284285
Embulk.logger.error {
285286
"embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts}), response:#{response}"
286287
}
287-
raise Error, "failed to copy #{@project}:#{@dataset}.#{source_table} " \
288-
"to #{@project}:#{destination_dataset}.#{destination_table}, response:#{response}"
288+
raise Error, "failed to copy #{@destination_project}:#{@dataset}.#{source_table} " \
289+
"to #{@destination_project}:#{destination_dataset}.#{destination_table}, response:#{response}"
289290
end
290291
end
291292
end
@@ -354,7 +355,7 @@ def wait_load(kind, response)
354355
def create_dataset(dataset = nil, reference: nil)
355356
dataset ||= @dataset
356357
begin
357-
Embulk.logger.info { "embulk-output-bigquery: Create dataset... #{@project}:#{dataset} in #{@location_for_log}" }
358+
Embulk.logger.info { "embulk-output-bigquery: Create dataset... #{@destination_project}:#{dataset} in #{@location_for_log}" }
358359
hint = {}
359360
if reference
360361
response = get_dataset(reference)
@@ -382,25 +383,25 @@ def create_dataset(dataset = nil, reference: nil)
382383
Embulk.logger.error {
383384
"embulk-output-bigquery: insert_dataset(#{@project}, #{body}, #{opts}), response:#{response}"
384385
}
385-
raise Error, "failed to create dataset #{@project}:#{dataset} in #{@location_for_log}, response:#{response}"
386+
raise Error, "failed to create dataset #{@destination_project}:#{dataset} in #{@location_for_log}, response:#{response}"
386387
end
387388
end
388389

389390
def get_dataset(dataset = nil)
390391
dataset ||= @dataset
391392
begin
392-
Embulk.logger.info { "embulk-output-bigquery: Get dataset... #{@project}:#{dataset}" }
393-
with_network_retry { client.get_dataset(@project, dataset) }
393+
Embulk.logger.info { "embulk-output-bigquery: Get dataset... #{@destination_project}:#{dataset}" }
394+
with_network_retry { client.get_dataset(@destination_project, dataset) }
394395
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
395396
if e.status_code == 404
396-
raise NotFoundError, "Dataset #{@project}:#{dataset} is not found"
397+
raise NotFoundError, "Dataset #{@destination_project}:#{dataset} is not found"
397398
end
398399

399400
response = {status_code: e.status_code, message: e.message, error_class: e.class}
400401
Embulk.logger.error {
401-
"embulk-output-bigquery: get_dataset(#{@project}, #{dataset}), response:#{response}"
402+
"embulk-output-bigquery: get_dataset(#{@destination_project}, #{dataset}), response:#{response}"
402403
}
403-
raise Error, "failed to get dataset #{@project}:#{dataset}, response:#{response}"
404+
raise Error, "failed to get dataset #{@destination_project}:#{dataset}, response:#{response}"
404405
end
405406
end
406407

@@ -414,7 +415,7 @@ def create_table_if_not_exists(table, dataset: nil, options: nil)
414415
table = Helper.chomp_partition_decorator(table)
415416
end
416417

417-
Embulk.logger.info { "embulk-output-bigquery: Create table... #{@project}:#{dataset}.#{table}" }
418+
Embulk.logger.info { "embulk-output-bigquery: Create table... #{@destination_project}:#{dataset}.#{table}" }
418419
body = {
419420
table_reference: {
420421
table_id: table,
@@ -452,7 +453,7 @@ def create_table_if_not_exists(table, dataset: nil, options: nil)
452453
Embulk.logger.error {
453454
"embulk-output-bigquery: insert_table(#{@project}, #{dataset}, #{@location_for_log}, #{body}, #{opts}), response:#{response}"
454455
}
455-
raise Error, "failed to create table #{@project}:#{dataset}.#{table} in #{@location_for_log}, response:#{response}"
456+
raise Error, "failed to create table #{@destination_project}:#{dataset}.#{table} in #{@location_for_log}, response:#{response}"
456457
end
457458
end
458459

@@ -469,8 +470,8 @@ def delete_partition(table, dataset: nil)
469470
def delete_table_or_partition(table, dataset: nil)
470471
begin
471472
dataset ||= @dataset
472-
Embulk.logger.info { "embulk-output-bigquery: Delete table... #{@project}:#{dataset}.#{table}" }
473-
with_network_retry { client.delete_table(@project, dataset, table) }
473+
Embulk.logger.info { "embulk-output-bigquery: Delete table... #{@destination_project}:#{dataset}.#{table}" }
474+
with_network_retry { client.delete_table(@destination_project, dataset, table) }
474475
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
475476
if e.status_code == 404 && /Not found:/ =~ e.message
476477
# ignore 'Not Found' error
@@ -479,9 +480,9 @@ def delete_table_or_partition(table, dataset: nil)
479480

480481
response = {status_code: e.status_code, message: e.message, error_class: e.class}
481482
Embulk.logger.error {
482-
"embulk-output-bigquery: delete_table(#{@project}, #{dataset}, #{table}), response:#{response}"
483+
"embulk-output-bigquery: delete_table(#{@destination_project}, #{dataset}, #{table}), response:#{response}"
483484
}
484-
raise Error, "failed to delete table #{@project}:#{dataset}.#{table}, response:#{response}"
485+
raise Error, "failed to delete table #{@destination_project}:#{dataset}.#{table}, response:#{response}"
485486
end
486487
end
487488

@@ -497,18 +498,18 @@ def get_partition(table, dataset: nil)
497498
def get_table_or_partition(table, dataset: nil)
498499
begin
499500
dataset ||= @dataset
500-
Embulk.logger.info { "embulk-output-bigquery: Get table... #{@project}:#{dataset}.#{table}" }
501-
with_network_retry { client.get_table(@project, dataset, table) }
501+
Embulk.logger.info { "embulk-output-bigquery: Get table... #{@destination_project}:#{dataset}.#{table}" }
502+
with_network_retry { client.get_table(@destination_project, dataset, table) }
502503
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
503504
if e.status_code == 404
504-
raise NotFoundError, "Table #{@project}:#{dataset}.#{table} is not found"
505+
raise NotFoundError, "Table #{@destination_project}:#{dataset}.#{table} is not found"
505506
end
506507

507508
response = {status_code: e.status_code, message: e.message, error_class: e.class}
508509
Embulk.logger.error {
509-
"embulk-output-bigquery: get_table(#{@project}, #{dataset}, #{table}), response:#{response}"
510+
"embulk-output-bigquery: get_table(#{@destination_project}, #{dataset}, #{table}), response:#{response}"
510511
}
511-
raise Error, "failed to get table #{@project}:#{dataset}.#{table}, response:#{response}"
512+
raise Error, "failed to get table #{@destination_project}:#{dataset}.#{table}, response:#{response}"
512513
end
513514
end
514515
end

lib/embulk/output/bigquery/gcs_client.rb

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,15 @@ def initialize(task)
1616
super(task, scope, client_class)
1717

1818
@project = @task['project']
19+
@destination_project = @task['destination_project']
1920
@bucket = @task['gcs_bucket']
2021
@location = @task['location']
2122
end
2223

2324
def insert_temporary_bucket(bucket = nil)
2425
bucket ||= @bucket
2526
begin
26-
Embulk.logger.info { "embulk-output-bigquery: Insert bucket... #{@project}:#{bucket}" }
27+
Embulk.logger.info { "embulk-output-bigquery: Insert bucket... #{@destination_project}:#{bucket}" }
2728
body = {
2829
name: bucket,
2930
lifecycle: {
@@ -57,7 +58,7 @@ def insert_temporary_bucket(bucket = nil)
5758
Embulk.logger.error {
5859
"embulk-output-bigquery: insert_temporary_bucket(#{@project}, #{body}, #{opts}), response:#{response}"
5960
}
60-
raise Error, "failed to insert bucket #{@project}:#{bucket}, response:#{response}"
61+
raise Error, "failed to insert bucket #{@destination_project}:#{bucket}, response:#{response}"
6162
end
6263
end
6364

@@ -69,7 +70,7 @@ def insert_object(path, object: nil, bucket: nil)
6970

7071
started = Time.now
7172
begin
72-
Embulk.logger.info { "embulk-output-bigquery: Insert object... #{path} => #{@project}:#{object_uri}" }
73+
Embulk.logger.info { "embulk-output-bigquery: Insert object... #{path} => #{@destination_project}:#{object_uri}" }
7374
body = {
7475
name: object,
7576
}
@@ -86,7 +87,7 @@ def insert_object(path, object: nil, bucket: nil)
8687
Embulk.logger.error {
8788
"embulk-output-bigquery: insert_object(#{bucket}, #{body}, #{opts}), response:#{response}"
8889
}
89-
raise Error, "failed to insert object #{@project}:#{object_uri}, response:#{response}"
90+
raise Error, "failed to insert object #{@destination_project}:#{object_uri}, response:#{response}"
9091
end
9192
end
9293

@@ -109,7 +110,7 @@ def delete_object(object, bucket: nil)
109110
object = object.start_with?('/') ? object[1..-1] : object
110111
object_uri = URI.join("gs://#{bucket}", object).to_s
111112
begin
112-
Embulk.logger.info { "embulk-output-bigquery: Delete object... #{@project}:#{object_uri}" }
113+
Embulk.logger.info { "embulk-output-bigquery: Delete object... #{@destination_project}:#{object_uri}" }
113114
opts = {}
114115

115116
Embulk.logger.debug { "embulk-output-bigquery: delete_object(#{bucket}, #{object}, #{opts})" }
@@ -122,7 +123,7 @@ def delete_object(object, bucket: nil)
122123
Embulk.logger.error {
123124
"embulk-output-bigquery: delete_object(#{bucket}, #{object}, #{opts}), response:#{response}"
124125
}
125-
raise Error, "failed to delete object #{@project}:#{object_uri}, response:#{response}"
126+
raise Error, "failed to delete object #{@destination_project}:#{object_uri}, response:#{response}"
126127
end
127128
end
128129
end

test/test_bigquery_client.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ def client(task = {})
2929
def least_task
3030
{
3131
'project' => JSON.parse(File.read(JSON_KEYFILE))['project_id'],
32+
'destination_project' => JSON.parse(File.read(JSON_KEYFILE))['project_id'],
3233
'dataset' => 'your_dataset_name',
3334
'table' => 'your_table_name',
3435
'auth_method' => 'json_key',

0 commit comments

Comments
 (0)