Skip to content

Commit 61937a9

Browse files
authored
Merge pull request #31 from trocco-io/added_merge_mode
Added merge mode
2 parents e109e0c + fbd2cce commit 61937a9

File tree

5 files changed

+173
-6
lines changed

5 files changed

+173
-6
lines changed

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ OAuth flow for installed applications.
6161
| auto_create_gcs_bucket | boolean | optional | false | See [GCS Bucket](#gcs-bucket) |
6262
| progress_log_interval | float | optional | nil (Disabled) | Progress log interval. The progress log is disabled by nil (default). NOTE: This option may be removed in a future because a filter plugin can achieve the same goal |
6363
| description | string | optional | nil | description of table |
64+
| merge_keys | array | optional | | key column names for merging records in merge mode (string array, required in merge mode if table doesn't have primary key) |
65+
| merge_rule | array | optional | | list of column assignments for updating existing records used in merge mode, for example foo = T.foo + S.foo (T means target table and S means source table). (string array, default: always overwrites with new values) |
6466

6567
Client or request options
6668

@@ -174,6 +176,11 @@ NOTE: BigQuery does not support replacing (actually, copying into) a non-partiti
174176
1. Delete destination table (or partition), if it exists.
175177
2. Load to destination table (or partition).
176178

179+
##### merge
180+
181+
1. Load to temporary table (Create and WRITE_APPEND in parallel)
182+
2. Merge temporary table to destination table (or partition). (Use query job instead of copy job)
183+
177184
### Authentication
178185

179186
There are four authentication methods

lib/embulk/output/bigquery.rb

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ def self.configure(config, schema, task_count)
9191
'time_partitioning' => config.param('time_partitioning', :hash, :default => nil),
9292
'clustering' => config.param('clustering', :hash, :default => nil), # google-api-ruby-client >= v0.21.0
9393
'schema_update_options' => config.param('schema_update_options', :array, :default => nil),
94+
'merge_keys' => config.param('merge_keys', :array, :default => []),
95+
'merge_rule' => config.param('merge_rule', :array, :default => []),
9496

9597
'temporary_table_expiration' => config.param('temporary_table_expiration', :integer, :default => nil),
9698

@@ -103,11 +105,11 @@ def self.configure(config, schema, task_count)
103105
now = Time.now
104106

105107
task['mode'] = task['mode'].downcase
106-
unless %w[append append_direct replace delete_in_advance replace_backup].include?(task['mode'])
107-
raise ConfigError.new "`mode` must be one of append, append_direct, replace, delete_in_advance, replace_backup"
108+
unless %w[append append_direct replace delete_in_advance replace_backup merge].include?(task['mode'])
109+
raise ConfigError.new "`mode` must be one of append, append_direct, replace, delete_in_advance, replace_backup, merge"
108110
end
109111

110-
if %w[append replace delete_in_advance replace_backup].include?(task['mode']) and !task['auto_create_table']
112+
if %w[append replace delete_in_advance replace_backup merge].include?(task['mode']) and !task['auto_create_table']
111113
raise ConfigError.new "`mode: #{task['mode']}` requires `auto_create_table: true`"
112114
end
113115

@@ -209,7 +211,7 @@ def self.configure(config, schema, task_count)
209211

210212
unique_name = SecureRandom.uuid.gsub('-', '_')
211213

212-
if %w[replace replace_backup append].include?(task['mode'])
214+
if %w[replace replace_backup append merge].include?(task['mode'])
213215
task['temp_table'] ||= "LOAD_TEMP_#{unique_name}_#{task['table']}"
214216
else
215217
task['temp_table'] = nil
@@ -317,6 +319,9 @@ def self.auto_create(task, bigquery)
317319
when 'append'
318320
bigquery.create_table_if_not_exists(task['temp_table'], options: temp_options)
319321
bigquery.create_table_if_not_exists(task['table']) # needs for when task['table'] is a partition
322+
when 'merge'
323+
bigquery.create_table_if_not_exists(task['temp_table'], options: temp_options)
324+
bigquery.create_table_if_not_exists(task['table']) # needs for when task['table'] is a partition
320325
when 'replace_backup'
321326
bigquery.create_table_if_not_exists(task['temp_table'], options: temp_options)
322327
bigquery.create_table_if_not_exists(task['table'])
@@ -401,7 +406,10 @@ def self.transaction(config, schema, task_count, &control)
401406
end
402407

403408
if task['temp_table']
404-
if task['mode'] == 'append'
409+
case task['mode']
410+
when 'merge'
411+
bigquery.merge(task['temp_table'], task['table'], task['merge_keys'], task['merge_rule'])
412+
when 'append'
405413
bigquery.copy(task['temp_table'], task['table'], write_disposition: 'WRITE_APPEND')
406414
else # replace or replace_backup
407415
bigquery.copy(task['temp_table'], task['table'], write_disposition: 'WRITE_TRUNCATE')
@@ -413,7 +421,7 @@ def self.transaction(config, schema, task_count, &control)
413421

414422
ensure
415423
begin
416-
if task['temp_table'] # append or replace or replace_backup
424+
if task['temp_table'] # append or replace or replace_backup or merge
417425
bigquery.delete_table(task['temp_table'])
418426
end
419427
ensure

lib/embulk/output/bigquery/bigquery_client.rb

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -544,6 +544,125 @@ def patch_description(fields, column_options)
544544
with_network_retry { client.patch_table(@project, @dataset, table_id, table) }
545545
end
546546
end
547+
548+
def merge(source_table, target_table, merge_keys, merge_rule)
549+
columns = @schema.map { |column| column[:name] }
550+
query = <<~EOD
551+
MERGE `#{@dataset}`.`#{target_table}` T
552+
USING `#{@dataset}`.`#{source_table}` S
553+
ON #{join_merge_keys(merge_keys.empty? ? merge_keys(target_table) : merge_keys)}
554+
WHEN MATCHED THEN
555+
UPDATE SET #{join_merge_rule_or_columns(merge_rule, columns)}
556+
WHEN NOT MATCHED THEN
557+
INSERT (#{join_columns(columns)})
558+
VALUES (#{join_columns(columns)})
559+
EOD
560+
Embulk.logger.info { "embulk-output-bigquery: Execute query... #{query.gsub(/\s+/, ' ')}" }
561+
execute_query(query)
562+
end
563+
564+
def merge_keys(table)
565+
query = <<~EOD
566+
SELECT
567+
KCU.COLUMN_NAME
568+
FROM
569+
`#{@dataset}`.INFORMATION_SCHEMA.KEY_COLUMN_USAGE KCU
570+
JOIN
571+
`#{@dataset}`.INFORMATION_SCHEMA.TABLE_CONSTRAINTS TC
572+
ON
573+
KCU.CONSTRAINT_CATALOG = TC.CONSTRAINT_CATALOG AND
574+
KCU.CONSTRAINT_SCHEMA = TC.CONSTRAINT_SCHEMA AND
575+
KCU.CONSTRAINT_NAME = TC.CONSTRAINT_NAME AND
576+
KCU.TABLE_CATALOG = TC.TABLE_CATALOG AND
577+
KCU.TABLE_SCHEMA = TC.TABLE_SCHEMA AND
578+
KCU.TABLE_NAME = TC.TABLE_NAME
579+
WHERE
580+
TC.TABLE_NAME = '#{table}' AND
581+
TC.CONSTRAINT_TYPE = 'PRIMARY KEY'
582+
ORDER BY
583+
KCU.ORDINAL_POSITION
584+
EOD
585+
rows = []
586+
run_query(query) { |response| rows.concat(response[:rows] || []) }
587+
rows.flat_map { |row| row[:f] }.map { |cell| cell[:v] }
588+
end
589+
590+
def run_query(query, &block)
591+
response = execute_query(query)
592+
response = query_results(response, &block) while response
593+
end
594+
595+
def query_results(response)
596+
with_job_retry do
597+
begin
598+
job_id = response[:job_reference][:job_id]
599+
page_token = response[:page_token].to_s unless response[:page_token].to_s.empty?
600+
response = with_network_retry { client.get_job_query_results(@project, job_id, page_token: page_token) }.to_h
601+
yield response
602+
response unless response[:page_token].to_s.empty?
603+
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
604+
response = {status_code: e.status_code, message: e.message, error_class: e.class}
605+
Embulk.logger.error {
606+
"embulk-output-bigquery: get_job_query_results(#{@project}, #{job_id}), response:#{response}"
607+
}
608+
raise Error, "failed to get query results, response:#{response}"
609+
end
610+
end
611+
end
612+
613+
def join_merge_keys(merge_keys)
614+
raise "merge key or primary key is required" if merge_keys.empty?
615+
616+
merge_keys.map { |merge_key| "T.`#{merge_key}` = S.`#{merge_key}`" }.join(" AND ")
617+
end
618+
619+
def join_merge_rule_or_columns(merge_rule, columns)
620+
merge_rule_or_columns = merge_rule.empty? ? columns.map { |column| "T.`#{column}` = S.`#{column}`" } : merge_rule
621+
merge_rule_or_columns.join(", ")
622+
end
623+
624+
def join_columns(columns)
625+
columns.map { |column| "`#{column}`" }.join(", ")
626+
end
627+
628+
def execute_query(query)
629+
with_job_retry do
630+
begin
631+
job_id = "embulk_query_job_#{SecureRandom.uuid}"
632+
633+
Embulk.logger.info {
634+
"embulk-output-bigquery: Query job starting... job_id:[#{job_id}]"
635+
}
636+
637+
body = {
638+
job_reference: {
639+
project_id: @project,
640+
job_id: job_id,
641+
},
642+
configuration: {
643+
query: {
644+
query: query,
645+
use_legacy_sql: false,
646+
}
647+
}
648+
}
649+
650+
if @location
651+
body[:job_reference][:location] = @location
652+
end
653+
654+
opts = {}
655+
response = with_network_retry { client.insert_job(@project, body, **opts) }
656+
wait_load('Query', response).to_h
657+
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
658+
response = {status_code: e.status_code, message: e.message, error_class: e.class}
659+
Embulk.logger.error {
660+
"embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts}), response:#{response}"
661+
}
662+
raise Error, "failed to query, response:#{response}"
663+
end
664+
end
665+
end
547666
end
548667
end
549668
end

test/test_configure.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,9 @@ def test_mode
100100

101101
config = least_config.merge('mode' => 'replace_backup')
102102
assert_raise { Bigquery.configure(config, schema, processor_count) }
103+
104+
config = least_config.merge('mode' => 'merge')
105+
assert_nothing_raised { Bigquery.configure(config, schema, processor_count) }
103106
end
104107

105108
def test_location

test/test_transaction.rb

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,36 @@ def test_append_with_partitioning
230230
Bigquery.transaction(config, schema, processor_count, &control)
231231
end
232232
end
233+
234+
sub_test_case "merge" do
235+
def test_merge
236+
config = least_config.merge('mode' => 'merge')
237+
task = Bigquery.configure(config, schema, processor_count)
238+
any_instance_of(BigqueryClient) do |obj|
239+
mock(obj).get_dataset(config['dataset'])
240+
mock(obj).create_table_if_not_exists(config['temp_table'], options: {"expiration_time"=>nil})
241+
mock(obj).create_table_if_not_exists(config['table'])
242+
mock(obj).merge(config['temp_table'], config['table'], task['merge_keys'], task['merge_rule'])
243+
mock(obj).delete_table(config['temp_table'])
244+
mock(obj).patch_table
245+
end
246+
Bigquery.transaction(config, schema, processor_count, &control)
247+
end
248+
249+
def test_merge_with_partitioning
250+
config = least_config.merge('mode' => 'merge', 'table' => 'table$20160929', 'auto_create_table' => true)
251+
task = Bigquery.configure(config, schema, processor_count)
252+
any_instance_of(BigqueryClient) do |obj|
253+
mock(obj).get_dataset(config['dataset'])
254+
mock(obj).create_table_if_not_exists(config['temp_table'], options: {"expiration_time"=>nil})
255+
mock(obj).create_table_if_not_exists(config['table'])
256+
mock(obj).merge(config['temp_table'], config['table'], task['merge_keys'], task['merge_rule'])
257+
mock(obj).delete_table(config['temp_table'])
258+
mock(obj).patch_table
259+
end
260+
Bigquery.transaction(config, schema, processor_count, &control)
261+
end
262+
end
233263
end
234264
end
235265
end

0 commit comments

Comments
 (0)