From 593013aa15f2352afe98a5bbe1320d8fd149a34d Mon Sep 17 00:00:00 2001 From: chikamura Date: Sat, 5 Apr 2025 20:23:35 +0900 Subject: [PATCH 01/10] inherit column descriptions when mode replace is used. --- lib/embulk/output/bigquery/bigquery_client.rb | 25 ++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/lib/embulk/output/bigquery/bigquery_client.rb b/lib/embulk/output/bigquery/bigquery_client.rb index b42f9b9..c587afb 100644 --- a/lib/embulk/output/bigquery/bigquery_client.rb +++ b/lib/embulk/output/bigquery/bigquery_client.rb @@ -30,6 +30,16 @@ def initialize(task, schema, fields = nil) @task['encoding'] ||= 'UTF-8' @task['ignore_unknown_values'] = false if @task['ignore_unknown_values'].nil? @task['allow_quoted_newlines'] = false if @task['allow_quoted_newlines'].nil? + + if @task['mode'] == 'replace' + begin + @src_table_schema = get_table(@task['table']).schema.fields + rescue NotFoundError + @src_table_schema = [] + end + else + @src_table_schema = [] + end end def fields @@ -524,13 +534,22 @@ def patch_table with_job_retry do table = get_table(@task['table']) - def patch_description(fields, column_options) + def patch_description(fields, column_options, src_fields) fields.map do |field| + src_field = src_fields.select { |s_field| s_field.name == field.name }.first + if src_field + field.update!(description: src_field.description) if src_field.description + if field.fields && src_field.fields + nested_fields = patch_description(field.fields, [], src_field.fields) + field.update!(fields: nested_fields) + end + end + column_option = column_options.select{|col_opt| col_opt['name'] == field.name}.first if column_option field.update!(description: column_option['description']) if column_option['description'] if field.fields && column_option['fields'] - nested_fields = patch_description(field.fields, column_option['fields']) + nested_fields = patch_description(field.fields, column_option['fields'], []) field.update!(fields: nested_fields) end end @@ -538,7 +557,7 @@ def patch_description(fields, column_options) end end - fields = patch_description(table.schema.fields, @task['column_options']) + fields = patch_description(table.schema.fields, @task['column_options'], @src_table_schema) table.schema.update!(fields: fields) table_id = Helper.chomp_partition_decorator(@task['table']) with_network_retry { client.patch_table(@project, @dataset, table_id, table) } From e934ce2e78375ad2d613ada3c5289496dba748f3 Mon Sep 17 00:00:00 2001 From: chikamura Date: Sat, 5 Apr 2025 20:40:43 +0900 Subject: [PATCH 02/10] inherit table description when mode replace is used --- lib/embulk/output/bigquery/bigquery_client.rb | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/lib/embulk/output/bigquery/bigquery_client.rb b/lib/embulk/output/bigquery/bigquery_client.rb index c587afb..c3c2e18 100644 --- a/lib/embulk/output/bigquery/bigquery_client.rb +++ b/lib/embulk/output/bigquery/bigquery_client.rb @@ -33,12 +33,12 @@ def initialize(task, schema, fields = nil) if @task['mode'] == 'replace' begin - @src_table_schema = get_table(@task['table']).schema.fields + @src_table = get_table(@task['table']) rescue NotFoundError - @src_table_schema = [] + @src_table = nil end else - @src_table_schema = [] + @src_table = nil end end @@ -430,7 +430,7 @@ def create_table_if_not_exists(table, dataset: nil, options: nil) table_reference: { table_id: table, }, - description: @task['description'], + description: @task['description'] || @src_table&.description, schema: { fields: fields, } @@ -536,7 +536,7 @@ def patch_table def patch_description(fields, column_options, src_fields) fields.map do |field| - src_field = src_fields.select { |s_field| s_field.name == field.name }.first + src_field = src_fields&.select {|s_field| s_field.name == field.name}&.first if src_field field.update!(description: src_field.description) if src_field.description if field.fields && src_field.fields @@ -557,7 +557,7 @@ def patch_description(fields, column_options, src_fields) end end - fields = patch_description(table.schema.fields, @task['column_options'], @src_table_schema) + fields = patch_description(table.schema.fields, @task['column_options'], @src_table&.schema&.fields) table.schema.update!(fields: fields) table_id = Helper.chomp_partition_decorator(@task['table']) with_network_retry { client.patch_table(@project, @dataset, table_id, table) } From bd545755303b1f0c7521818769fb74c3ae4da347 Mon Sep 17 00:00:00 2001 From: chikamura Date: Sat, 5 Apr 2025 21:09:59 +0900 Subject: [PATCH 03/10] udpate runs-on from ubuntu-latest to ubuntu-22.04 --- .github/workflows/check.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/check.yml b/.github/workflows/check.yml index 535d915..1141a67 100644 --- a/.github/workflows/check.yml +++ b/.github/workflows/check.yml @@ -2,7 +2,9 @@ name: Check on: [ pull_request, push ] jobs: check: - runs-on: ubuntu-latest + # https://github.com/ruby/ruby-builder/releases/tag/toolcache + # No support for jruby-9.1.17.0 after ubuntu-24.04 + runs-on: ubuntu-22.04 # push: always run. # pull_request: run only when the PR is submitted from a forked repository, not within this repository. if: github.event_name == 'push' || github.event.pull_request.head.repo.full_name != github.repository From 5b081470b1ed72a826a54ccab306d6512c9e7678 Mon Sep 17 00:00:00 2001 From: chikamura Date: Sun, 6 Apr 2025 10:15:33 +0900 Subject: [PATCH 04/10] fix replace test --- test/test_transaction.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/test_transaction.rb b/test/test_transaction.rb index bc59510..32c04b3 100644 --- a/test/test_transaction.rb +++ b/test/test_transaction.rb @@ -115,6 +115,7 @@ def test_replace task = Bigquery.configure(config, schema, processor_count) any_instance_of(BigqueryClient) do |obj| mock(obj).get_dataset(config['dataset']) + mock(obj).get_table(config['table']) mock(obj).create_table_if_not_exists(config['temp_table'], options: {"expiration_time"=>nil}) mock(obj).create_table_if_not_exists(config['table']) mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_TRUNCATE') @@ -129,6 +130,7 @@ def test_replace_with_partitioning task = Bigquery.configure(config, schema, processor_count) any_instance_of(BigqueryClient) do |obj| mock(obj).get_dataset(config['dataset']) + mock(obj).get_table(config['table']) mock(obj).create_table_if_not_exists(config['temp_table'], options: {"expiration_time"=>nil}) mock(obj).create_table_if_not_exists(config['table']) mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_TRUNCATE') From 78907f36b4b27c0ef6cec03e0e457be15deee95b Mon Sep 17 00:00:00 2001 From: chikamura Date: Sun, 6 Apr 2025 10:36:27 +0900 Subject: [PATCH 05/10] remove unnecessary codes --- lib/embulk/output/bigquery/bigquery_client.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/embulk/output/bigquery/bigquery_client.rb b/lib/embulk/output/bigquery/bigquery_client.rb index c3c2e18..bcaa458 100644 --- a/lib/embulk/output/bigquery/bigquery_client.rb +++ b/lib/embulk/output/bigquery/bigquery_client.rb @@ -430,7 +430,7 @@ def create_table_if_not_exists(table, dataset: nil, options: nil) table_reference: { table_id: table, }, - description: @task['description'] || @src_table&.description, + description: @task['description'], schema: { fields: fields, } From bfd41935e764f6c7fde6be4d15028a1d594e3c9a Mon Sep 17 00:00:00 2001 From: chikamura Date: Sun, 6 Apr 2025 11:06:09 +0900 Subject: [PATCH 06/10] add retain_column_descriptions option --- README.md | 1 + lib/embulk/output/bigquery.rb | 1 + lib/embulk/output/bigquery/bigquery_client.rb | 2 +- 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index a5109f1..eee4957 100644 --- a/README.md +++ b/README.md @@ -63,6 +63,7 @@ OAuth flow for installed applications. | description | string | optional | nil | description of table | | merge_keys | array | optional | | key column names for merging records in merge mode (string array, required in merge mode if table doesn't have primary key) | | merge_rule | array | optional | | list of column assignments for updating existing records used in merge mode, for example foo = T.foo + S.foo (T means target table and S means source table). (string array, default: always overwrites with new values) | +| retain_column_descriptions | boolean | optional | false | In case of replace mode, the column's descriptions are taken over. | Client or request options diff --git a/lib/embulk/output/bigquery.rb b/lib/embulk/output/bigquery.rb index 8ada6ab..60ab5eb 100644 --- a/lib/embulk/output/bigquery.rb +++ b/lib/embulk/output/bigquery.rb @@ -64,6 +64,7 @@ def self.configure(config, schema, task_count) 'payload_column_index' => config.param('payload_column_index', :integer, :default => nil), 'description' => config.param('description', :string, :default => nil), + 'retain_column_descriptions' => config.param('retain_column_descriptions', :bool, :default => false), 'open_timeout_sec' => config.param('open_timeout_sec', :integer, :default => nil), 'timeout_sec' => config.param('timeout_sec', :integer, :default => nil), # google-api-ruby-client < v0.11.0 diff --git a/lib/embulk/output/bigquery/bigquery_client.rb b/lib/embulk/output/bigquery/bigquery_client.rb index bcaa458..eee4a6b 100644 --- a/lib/embulk/output/bigquery/bigquery_client.rb +++ b/lib/embulk/output/bigquery/bigquery_client.rb @@ -31,7 +31,7 @@ def initialize(task, schema, fields = nil) @task['ignore_unknown_values'] = false if @task['ignore_unknown_values'].nil? @task['allow_quoted_newlines'] = false if @task['allow_quoted_newlines'].nil? - if @task['mode'] == 'replace' + if @task['mode'] == 'replace' && @task['retain_column_descriptions'] begin @src_table = get_table(@task['table']) rescue NotFoundError From 3d5e686bcac6bcf91b7bd88c566429be47947c28 Mon Sep 17 00:00:00 2001 From: chikamura Date: Sun, 6 Apr 2025 11:14:11 +0900 Subject: [PATCH 07/10] fix replace test --- test/test_transaction.rb | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/test/test_transaction.rb b/test/test_transaction.rb index 32c04b3..094df36 100644 --- a/test/test_transaction.rb +++ b/test/test_transaction.rb @@ -115,7 +115,6 @@ def test_replace task = Bigquery.configure(config, schema, processor_count) any_instance_of(BigqueryClient) do |obj| mock(obj).get_dataset(config['dataset']) - mock(obj).get_table(config['table']) mock(obj).create_table_if_not_exists(config['temp_table'], options: {"expiration_time"=>nil}) mock(obj).create_table_if_not_exists(config['table']) mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_TRUNCATE') @@ -128,6 +127,20 @@ def test_replace def test_replace_with_partitioning config = least_config.merge('mode' => 'replace', 'table' => 'table$20160929') task = Bigquery.configure(config, schema, processor_count) + any_instance_of(BigqueryClient) do |obj| + mock(obj).get_dataset(config['dataset']) + mock(obj).create_table_if_not_exists(config['temp_table'], options: {"expiration_time"=>nil}) + mock(obj).create_table_if_not_exists(config['table']) + mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_TRUNCATE') + mock(obj).delete_table(config['temp_table']) + mock(obj).patch_table + end + Bigquery.transaction(config, schema, processor_count, &control) + end + + def test_replace_with_retain_column_descriptions + config = least_config.merge('mode' => 'replace', 'retain_column_descriptions' => true) + task = Bigquery.configure(config, schema, processor_count) any_instance_of(BigqueryClient) do |obj| mock(obj).get_dataset(config['dataset']) mock(obj).get_table(config['table']) From bb63522acc26ee8bfe1fd193678c74dc18a55521 Mon Sep 17 00:00:00 2001 From: chikamura Date: Mon, 7 Apr 2025 06:54:59 +0900 Subject: [PATCH 08/10] refatoring bigquery_client.src_table --- lib/embulk/output/bigquery/bigquery_client.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/embulk/output/bigquery/bigquery_client.rb b/lib/embulk/output/bigquery/bigquery_client.rb index eee4a6b..3841643 100644 --- a/lib/embulk/output/bigquery/bigquery_client.rb +++ b/lib/embulk/output/bigquery/bigquery_client.rb @@ -536,7 +536,7 @@ def patch_table def patch_description(fields, column_options, src_fields) fields.map do |field| - src_field = src_fields&.select {|s_field| s_field.name == field.name}&.first + src_field = src_fields.select {|s_field| s_field.name == field.name}.first if src_field field.update!(description: src_field.description) if src_field.description if field.fields && src_field.fields @@ -557,7 +557,7 @@ def patch_description(fields, column_options, src_fields) end end - fields = patch_description(table.schema.fields, @task['column_options'], @src_table&.schema&.fields) + fields = patch_description(table.schema.fields, @task['column_options'], @src_table ? @src_table.schema.fields : []) table.schema.update!(fields: fields) table_id = Helper.chomp_partition_decorator(@task['table']) with_network_retry { client.patch_table(@project, @dataset, table_id, table) } From af98a7a995cd8c7b88d2f848402c534ce0cbd07e Mon Sep 17 00:00:00 2001 From: chikamura Date: Sat, 12 Apr 2025 14:41:06 +0900 Subject: [PATCH 09/10] add retain_column_policy_tags option --- README.md | 1 + lib/embulk/output/bigquery.rb | 1 + lib/embulk/output/bigquery/bigquery_client.rb | 63 +++++++++---- .../bigquery_service_with_policy_tag.rb | 88 +++++++++++++++++++ test/test_transaction.rb | 17 +++- 5 files changed, 152 insertions(+), 18 deletions(-) create mode 100644 lib/embulk/output/bigquery/bigquery_service_with_policy_tag.rb diff --git a/README.md b/README.md index eee4957..419616b 100644 --- a/README.md +++ b/README.md @@ -64,6 +64,7 @@ OAuth flow for installed applications. | merge_keys | array | optional | | key column names for merging records in merge mode (string array, required in merge mode if table doesn't have primary key) | | merge_rule | array | optional | | list of column assignments for updating existing records used in merge mode, for example foo = T.foo + S.foo (T means target table and S means source table). (string array, default: always overwrites with new values) | | retain_column_descriptions | boolean | optional | false | In case of replace mode, the column's descriptions are taken over. | +| retain_column_policy_tags | boolean | optional | false | In case of replace mode, the table policy tags are taken over. | Client or request options diff --git a/lib/embulk/output/bigquery.rb b/lib/embulk/output/bigquery.rb index 60ab5eb..904b5a8 100644 --- a/lib/embulk/output/bigquery.rb +++ b/lib/embulk/output/bigquery.rb @@ -65,6 +65,7 @@ def self.configure(config, schema, task_count) 'description' => config.param('description', :string, :default => nil), 'retain_column_descriptions' => config.param('retain_column_descriptions', :bool, :default => false), + 'retain_column_policy_tags' => config.param('retain_column_policy_tags', :bool, :default => false), 'open_timeout_sec' => config.param('open_timeout_sec', :integer, :default => nil), 'timeout_sec' => config.param('timeout_sec', :integer, :default => nil), # google-api-ruby-client < v0.11.0 diff --git a/lib/embulk/output/bigquery/bigquery_client.rb b/lib/embulk/output/bigquery/bigquery_client.rb index 3841643..1fb74bb 100644 --- a/lib/embulk/output/bigquery/bigquery_client.rb +++ b/lib/embulk/output/bigquery/bigquery_client.rb @@ -3,6 +3,7 @@ require 'thwait' require_relative 'google_client' require_relative 'helper' +require_relative 'bigquery_service_with_policy_tag' module Embulk module Output @@ -12,7 +13,7 @@ class BigqueryClient < GoogleClient def initialize(task, schema, fields = nil) scope = "https://www.googleapis.com/auth/bigquery" - client_class = Google::Apis::BigqueryV2::BigqueryService + client_class = BigqueryServiceWithPolicyTag super(task, scope, client_class) @schema = schema @@ -31,15 +32,17 @@ def initialize(task, schema, fields = nil) @task['ignore_unknown_values'] = false if @task['ignore_unknown_values'].nil? @task['allow_quoted_newlines'] = false if @task['allow_quoted_newlines'].nil? - if @task['mode'] == 'replace' && @task['retain_column_descriptions'] - begin - @src_table = get_table(@task['table']) - rescue NotFoundError - @src_table = nil - end - else - @src_table = nil - end + @src_fields = need_takeover ? fetch_src_fields : [] + end + + def need_takeover + (@task['mode'] == 'replace') && (@task['retain_column_descriptions'] || @task['retain_column_policy_tags']) + end + + def fetch_src_fields + get_table_with_policy_tags(@task['table'])&.schema&.fields || [] + rescue NotFoundError + [] end def fields @@ -529,18 +532,38 @@ def get_table_or_partition(table, dataset: nil) end end + def get_table_with_policy_tags(table, dataset: nil) + begin + table = Helper.chomp_partition_decorator(table) + dataset ||= @dataset + Embulk.logger.info { "embulk-output-bigquery: Get table With PolicyTags... #{@destination_project}:#{dataset}.#{table}" } + with_network_retry { client.get_table_with_policy_tags(@destination_project, dataset, table) } + rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e + if e.status_code == 404 + raise NotFoundError, "Table #{@destination_project}:#{dataset}.#{table} is not found" + end + + response = {status_code: e.status_code, message: e.message, error_class: e.class} + Embulk.logger.error { + "embulk-output-bigquery: get_table(#{@destination_project}, #{dataset}, #{table}), response:#{response}" + } + raise Error, "failed to get table #{@destination_project}:#{dataset}.#{table}, response:#{response}" + end + end + # update only column.description def patch_table with_job_retry do - table = get_table(@task['table']) + table = need_takeover ? get_table_with_policy_tags(@task['table']) : get_table(@task['table']) - def patch_description(fields, column_options, src_fields) + def patch_description_and_policy_tags(fields, column_options, src_fields) fields.map do |field| src_field = src_fields.select {|s_field| s_field.name == field.name}.first if src_field - field.update!(description: src_field.description) if src_field.description + field.update!(description: src_field.description) if @task['retain_column_descriptions'] && src_field.description + field.update!(policy_tags: src_field.policy_tags) if @task['retain_column_policy_tags'] && src_field.policy_tags if field.fields && src_field.fields - nested_fields = patch_description(field.fields, [], src_field.fields) + nested_fields = patch_description_and_policy_tags(field.fields, [], src_field.fields) field.update!(fields: nested_fields) end end @@ -549,7 +572,7 @@ def patch_description(fields, column_options, src_fields) if column_option field.update!(description: column_option['description']) if column_option['description'] if field.fields && column_option['fields'] - nested_fields = patch_description(field.fields, column_option['fields'], []) + nested_fields = patch_description_and_policy_tags(field.fields, column_option['fields'], []) field.update!(fields: nested_fields) end end @@ -557,10 +580,16 @@ def patch_description(fields, column_options, src_fields) end end - fields = patch_description(table.schema.fields, @task['column_options'], @src_table ? @src_table.schema.fields : []) + fields = patch_description_and_policy_tags(table.schema.fields, @task['column_options'], @src_fields) table.schema.update!(fields: fields) table_id = Helper.chomp_partition_decorator(@task['table']) - with_network_retry { client.patch_table(@project, @dataset, table_id, table) } + with_network_retry do + if need_takeover + client.patch_table_with_policy_tags(@project, @dataset, table_id, table) + else + client.patch_table(@project, @dataset, table_id, table) + end + end end end diff --git a/lib/embulk/output/bigquery/bigquery_service_with_policy_tag.rb b/lib/embulk/output/bigquery/bigquery_service_with_policy_tag.rb new file mode 100644 index 0000000..e5b5953 --- /dev/null +++ b/lib/embulk/output/bigquery/bigquery_service_with_policy_tag.rb @@ -0,0 +1,88 @@ +require 'google/apis/bigquery_v2' + +module Embulk + module Output + class Bigquery < OutputPlugin + # NOTE: + # Due to the JRuby version constraint in Embulk v0.9, it’s not possible to upgrade to a version of the google-api-client (0.37.0 or later) that includes support for policy_tags. + # So the workaround was implemented using a patch-like solution as shown below. + class BigqueryServiceWithPolicyTag < Google::Apis::BigqueryV2::BigqueryService + def get_table_with_policy_tags(project_id, dataset_id, table_id, selected_fields: nil, fields: nil, quota_user: nil, user_ip: nil, options: nil, &block) + command = make_simple_command(:get, 'projects/{projectId}/datasets/{datasetId}/tables/{tableId}', options) + command.response_representation = TableWithPolicyTag::Representation + command.response_class = TableWithPolicyTag + command.params['projectId'] = project_id unless project_id.nil? + command.params['datasetId'] = dataset_id unless dataset_id.nil? + command.params['tableId'] = table_id unless table_id.nil? + command.query['selectedFields'] = selected_fields unless selected_fields.nil? + command.query['fields'] = fields unless fields.nil? + command.query['quotaUser'] = quota_user unless quota_user.nil? + command.query['userIp'] = user_ip unless user_ip.nil? + execute_or_queue_command(command, &block) + end + + def patch_table_with_policy_tags(project_id, dataset_id, table_id, table_object = nil, autodetect_schema: nil, fields: nil, quota_user: nil, options: nil, &block) + command = make_simple_command(:patch, 'projects/{+projectId}/datasets/{+datasetId}/tables/{+tableId}', options) + command.request_representation = TableWithPolicyTag::Representation + command.request_object = table_object + command.response_representation = TableWithPolicyTag::Representation + command.response_class = TableWithPolicyTag + command.params['projectId'] = project_id unless project_id.nil? + command.params['datasetId'] = dataset_id unless dataset_id.nil? + command.params['tableId'] = table_id unless table_id.nil? + command.query['autodetect_schema'] = autodetect_schema unless autodetect_schema.nil? + command.query['fields'] = fields unless fields.nil? + command.query['quotaUser'] = quota_user unless quota_user.nil? + execute_or_queue_command(command, &block) + end + end + + class TableFieldSchemaWithPolicyTag < Google::Apis::BigqueryV2::TableFieldSchema + class PolicyTags + include Google::Apis::Core::JsonObjectSupport + include Google::Apis::Core::Hashable + + class Representation < Google::Apis::Core::JsonRepresentation + collection :names, as: 'names' + end + + attr_accessor :names + + def initialize(**args) + update!(**args) + end + + def update!(**args) + @names = args[:names] if args.key?(:names) + end + end + + include Google::Apis::Core::Hashable + + attr_accessor :policy_tags + + def update!(**args) + super + @policy_tags = args[:policy_tags] if args.key?(:policy_tags) + end + + class Representation < Google::Apis::BigqueryV2::TableFieldSchema::Representation + collection :fields, as: 'fields', class: TableFieldSchemaWithPolicyTag, decorator: TableFieldSchemaWithPolicyTag::Representation + property :policy_tags, as: 'policyTags', class: TableFieldSchemaWithPolicyTag::PolicyTags, decorator: TableFieldSchemaWithPolicyTag::PolicyTags::Representation + end + end + + class TableSchemaWithPolicyTag < Google::Apis::BigqueryV2::TableSchema + class Representation < Google::Apis::BigqueryV2::TableSchema::Representation + collection :fields, as: 'fields', class: TableFieldSchemaWithPolicyTag, decorator: TableFieldSchemaWithPolicyTag::Representation + end + end + + class TableWithPolicyTag < Google::Apis::BigqueryV2::Table + class Representation < Google::Apis::BigqueryV2::Table::Representation + property :schema, as: 'schema', class: TableSchemaWithPolicyTag, decorator: TableSchemaWithPolicyTag::Representation + end + end + end + end +end diff --git a/test/test_transaction.rb b/test/test_transaction.rb index 094df36..66bf05a 100644 --- a/test/test_transaction.rb +++ b/test/test_transaction.rb @@ -143,7 +143,22 @@ def test_replace_with_retain_column_descriptions task = Bigquery.configure(config, schema, processor_count) any_instance_of(BigqueryClient) do |obj| mock(obj).get_dataset(config['dataset']) - mock(obj).get_table(config['table']) + mock(obj).get_table_with_policy_tags(config['table']) + mock(obj).create_table_if_not_exists(config['temp_table'], options: {"expiration_time"=>nil}) + mock(obj).create_table_if_not_exists(config['table']) + mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_TRUNCATE') + mock(obj).delete_table(config['temp_table']) + mock(obj).patch_table + end + Bigquery.transaction(config, schema, processor_count, &control) + end + + def test_replace_with_retain_column_policy_tags + config = least_config.merge('mode' => 'replace', 'retain_column_policy_tags' => true) + task = Bigquery.configure(config, schema, processor_count) + any_instance_of(BigqueryClient) do |obj| + mock(obj).get_dataset(config['dataset']) + mock(obj).get_table_with_policy_tags(config['table']) mock(obj).create_table_if_not_exists(config['temp_table'], options: {"expiration_time"=>nil}) mock(obj).create_table_if_not_exists(config['table']) mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_TRUNCATE') From ae6b2442b6c949a10fb6793a8a4667fc647c62ab Mon Sep 17 00:00:00 2001 From: chikamura Date: Sun, 13 Apr 2025 06:08:31 +0900 Subject: [PATCH 10/10] refactoring --- ...fig_replace_retain_column_descriptions.yml | 31 +++++++++++++++ ...nfig_replace_retain_column_policy_tags.yml | 31 +++++++++++++++ lib/embulk/output/bigquery/bigquery_client.rb | 38 +++---------------- .../bigquery_service_with_policy_tag.rb | 4 +- test/test_transaction.rb | 4 +- 5 files changed, 71 insertions(+), 37 deletions(-) create mode 100644 example/config_replace_retain_column_descriptions.yml create mode 100644 example/config_replace_retain_column_policy_tags.yml diff --git a/example/config_replace_retain_column_descriptions.yml b/example/config_replace_retain_column_descriptions.yml new file mode 100644 index 0000000..caca4bb --- /dev/null +++ b/example/config_replace_retain_column_descriptions.yml @@ -0,0 +1,31 @@ +in: + type: file + path_prefix: example/example.csv + parser: + type: csv + charset: UTF-8 + newline: CRLF + null_string: 'NULL' + skip_header_lines: 1 + comment_line_marker: '#' + columns: + - {name: date, type: string} + - {name: timestamp, type: timestamp, format: "%Y-%m-%d %H:%M:%S.%N", timezone: "+09:00"} + - {name: "null", type: string} + - {name: long, type: long} + - {name: string, type: string} + - {name: double, type: double} + - {name: boolean, type: boolean} +out: + type: bigquery + mode: replace + auth_method: service_account + json_keyfile: example/your-project-000.json + dataset: your_dataset_name + table: your_table_name + source_format: NEWLINE_DELIMITED_JSON + compression: NONE + auto_create_dataset: true + auto_create_table: true + schema_file: example/schema.json + retain_column_descriptions: true diff --git a/example/config_replace_retain_column_policy_tags.yml b/example/config_replace_retain_column_policy_tags.yml new file mode 100644 index 0000000..db81664 --- /dev/null +++ b/example/config_replace_retain_column_policy_tags.yml @@ -0,0 +1,31 @@ +in: + type: file + path_prefix: example/example.csv + parser: + type: csv + charset: UTF-8 + newline: CRLF + null_string: 'NULL' + skip_header_lines: 1 + comment_line_marker: '#' + columns: + - {name: date, type: string} + - {name: timestamp, type: timestamp, format: "%Y-%m-%d %H:%M:%S.%N", timezone: "+09:00"} + - {name: "null", type: string} + - {name: long, type: long} + - {name: string, type: string} + - {name: double, type: double} + - {name: boolean, type: boolean} +out: + type: bigquery + mode: replace + auth_method: service_account + json_keyfile: example/your-project-000.json + dataset: your_dataset_name + table: your_table_name + source_format: NEWLINE_DELIMITED_JSON + compression: NONE + auto_create_dataset: true + auto_create_table: true + schema_file: example/schema.json + retain_column_policy_tags: true diff --git a/lib/embulk/output/bigquery/bigquery_client.rb b/lib/embulk/output/bigquery/bigquery_client.rb index 1fb74bb..d43111b 100644 --- a/lib/embulk/output/bigquery/bigquery_client.rb +++ b/lib/embulk/output/bigquery/bigquery_client.rb @@ -13,7 +13,8 @@ class BigqueryClient < GoogleClient def initialize(task, schema, fields = nil) scope = "https://www.googleapis.com/auth/bigquery" - client_class = BigqueryServiceWithPolicyTag + need_takeover = (task['mode'] == 'replace') && (task['retain_column_descriptions'] || task['retain_column_policy_tags']) + client_class = need_takeover ? BigqueryServiceWithPolicyTag : Google::Apis::BigqueryV2::BigqueryService super(task, scope, client_class) @schema = schema @@ -35,12 +36,8 @@ def initialize(task, schema, fields = nil) @src_fields = need_takeover ? fetch_src_fields : [] end - def need_takeover - (@task['mode'] == 'replace') && (@task['retain_column_descriptions'] || @task['retain_column_policy_tags']) - end - def fetch_src_fields - get_table_with_policy_tags(@task['table'])&.schema&.fields || [] + get_table(@task['table'])&.schema&.fields || [] rescue NotFoundError [] end @@ -532,29 +529,10 @@ def get_table_or_partition(table, dataset: nil) end end - def get_table_with_policy_tags(table, dataset: nil) - begin - table = Helper.chomp_partition_decorator(table) - dataset ||= @dataset - Embulk.logger.info { "embulk-output-bigquery: Get table With PolicyTags... #{@destination_project}:#{dataset}.#{table}" } - with_network_retry { client.get_table_with_policy_tags(@destination_project, dataset, table) } - rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e - if e.status_code == 404 - raise NotFoundError, "Table #{@destination_project}:#{dataset}.#{table} is not found" - end - - response = {status_code: e.status_code, message: e.message, error_class: e.class} - Embulk.logger.error { - "embulk-output-bigquery: get_table(#{@destination_project}, #{dataset}, #{table}), response:#{response}" - } - raise Error, "failed to get table #{@destination_project}:#{dataset}.#{table}, response:#{response}" - end - end - # update only column.description def patch_table with_job_retry do - table = need_takeover ? get_table_with_policy_tags(@task['table']) : get_table(@task['table']) + table = get_table(@task['table']) def patch_description_and_policy_tags(fields, column_options, src_fields) fields.map do |field| @@ -583,13 +561,7 @@ def patch_description_and_policy_tags(fields, column_options, src_fields) fields = patch_description_and_policy_tags(table.schema.fields, @task['column_options'], @src_fields) table.schema.update!(fields: fields) table_id = Helper.chomp_partition_decorator(@task['table']) - with_network_retry do - if need_takeover - client.patch_table_with_policy_tags(@project, @dataset, table_id, table) - else - client.patch_table(@project, @dataset, table_id, table) - end - end + with_network_retry { client.patch_table(@project, @dataset, table_id, table) } end end diff --git a/lib/embulk/output/bigquery/bigquery_service_with_policy_tag.rb b/lib/embulk/output/bigquery/bigquery_service_with_policy_tag.rb index e5b5953..011d7f0 100644 --- a/lib/embulk/output/bigquery/bigquery_service_with_policy_tag.rb +++ b/lib/embulk/output/bigquery/bigquery_service_with_policy_tag.rb @@ -7,7 +7,7 @@ class Bigquery < OutputPlugin # Due to the JRuby version constraint in Embulk v0.9, it’s not possible to upgrade to a version of the google-api-client (0.37.0 or later) that includes support for policy_tags. # So the workaround was implemented using a patch-like solution as shown below. class BigqueryServiceWithPolicyTag < Google::Apis::BigqueryV2::BigqueryService - def get_table_with_policy_tags(project_id, dataset_id, table_id, selected_fields: nil, fields: nil, quota_user: nil, user_ip: nil, options: nil, &block) + def get_table(project_id, dataset_id, table_id, selected_fields: nil, fields: nil, quota_user: nil, user_ip: nil, options: nil, &block) command = make_simple_command(:get, 'projects/{projectId}/datasets/{datasetId}/tables/{tableId}', options) command.response_representation = TableWithPolicyTag::Representation command.response_class = TableWithPolicyTag @@ -21,7 +21,7 @@ def get_table_with_policy_tags(project_id, dataset_id, table_id, selected_fields execute_or_queue_command(command, &block) end - def patch_table_with_policy_tags(project_id, dataset_id, table_id, table_object = nil, autodetect_schema: nil, fields: nil, quota_user: nil, options: nil, &block) + def patch_table(project_id, dataset_id, table_id, table_object = nil, autodetect_schema: nil, fields: nil, quota_user: nil, options: nil, &block) command = make_simple_command(:patch, 'projects/{+projectId}/datasets/{+datasetId}/tables/{+tableId}', options) command.request_representation = TableWithPolicyTag::Representation command.request_object = table_object diff --git a/test/test_transaction.rb b/test/test_transaction.rb index 66bf05a..1478ed7 100644 --- a/test/test_transaction.rb +++ b/test/test_transaction.rb @@ -143,7 +143,7 @@ def test_replace_with_retain_column_descriptions task = Bigquery.configure(config, schema, processor_count) any_instance_of(BigqueryClient) do |obj| mock(obj).get_dataset(config['dataset']) - mock(obj).get_table_with_policy_tags(config['table']) + mock(obj).get_table(config['table']) mock(obj).create_table_if_not_exists(config['temp_table'], options: {"expiration_time"=>nil}) mock(obj).create_table_if_not_exists(config['table']) mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_TRUNCATE') @@ -158,7 +158,7 @@ def test_replace_with_retain_column_policy_tags task = Bigquery.configure(config, schema, processor_count) any_instance_of(BigqueryClient) do |obj| mock(obj).get_dataset(config['dataset']) - mock(obj).get_table_with_policy_tags(config['table']) + mock(obj).get_table(config['table']) mock(obj).create_table_if_not_exists(config['temp_table'], options: {"expiration_time"=>nil}) mock(obj).create_table_if_not_exists(config['table']) mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_TRUNCATE')