diff --git a/.github/workflows/check.yml b/.github/workflows/check.yml index 535d915..1141a67 100644 --- a/.github/workflows/check.yml +++ b/.github/workflows/check.yml @@ -2,7 +2,9 @@ name: Check on: [ pull_request, push ] jobs: check: - runs-on: ubuntu-latest + # https://github.com/ruby/ruby-builder/releases/tag/toolcache + # No support for jruby-9.1.17.0 after ubuntu-24.04 + runs-on: ubuntu-22.04 # push: always run. # pull_request: run only when the PR is submitted from a forked repository, not within this repository. if: github.event_name == 'push' || github.event.pull_request.head.repo.full_name != github.repository diff --git a/README.md b/README.md index a5109f1..419616b 100644 --- a/README.md +++ b/README.md @@ -63,6 +63,8 @@ OAuth flow for installed applications. | description | string | optional | nil | description of table | | merge_keys | array | optional | | key column names for merging records in merge mode (string array, required in merge mode if table doesn't have primary key) | | merge_rule | array | optional | | list of column assignments for updating existing records used in merge mode, for example foo = T.foo + S.foo (T means target table and S means source table). (string array, default: always overwrites with new values) | +| retain_column_descriptions | boolean | optional | false | In case of replace mode, the column's descriptions are taken over. | +| retain_column_policy_tags | boolean | optional | false | In case of replace mode, the table policy tags are taken over. | Client or request options diff --git a/example/config_replace_retain_column_descriptions.yml b/example/config_replace_retain_column_descriptions.yml new file mode 100644 index 0000000..caca4bb --- /dev/null +++ b/example/config_replace_retain_column_descriptions.yml @@ -0,0 +1,31 @@ +in: + type: file + path_prefix: example/example.csv + parser: + type: csv + charset: UTF-8 + newline: CRLF + null_string: 'NULL' + skip_header_lines: 1 + comment_line_marker: '#' + columns: + - {name: date, type: string} + - {name: timestamp, type: timestamp, format: "%Y-%m-%d %H:%M:%S.%N", timezone: "+09:00"} + - {name: "null", type: string} + - {name: long, type: long} + - {name: string, type: string} + - {name: double, type: double} + - {name: boolean, type: boolean} +out: + type: bigquery + mode: replace + auth_method: service_account + json_keyfile: example/your-project-000.json + dataset: your_dataset_name + table: your_table_name + source_format: NEWLINE_DELIMITED_JSON + compression: NONE + auto_create_dataset: true + auto_create_table: true + schema_file: example/schema.json + retain_column_descriptions: true diff --git a/example/config_replace_retain_column_policy_tags.yml b/example/config_replace_retain_column_policy_tags.yml new file mode 100644 index 0000000..db81664 --- /dev/null +++ b/example/config_replace_retain_column_policy_tags.yml @@ -0,0 +1,31 @@ +in: + type: file + path_prefix: example/example.csv + parser: + type: csv + charset: UTF-8 + newline: CRLF + null_string: 'NULL' + skip_header_lines: 1 + comment_line_marker: '#' + columns: + - {name: date, type: string} + - {name: timestamp, type: timestamp, format: "%Y-%m-%d %H:%M:%S.%N", timezone: "+09:00"} + - {name: "null", type: string} + - {name: long, type: long} + - {name: string, type: string} + - {name: double, type: double} + - {name: boolean, type: boolean} +out: + type: bigquery + mode: replace + auth_method: service_account + json_keyfile: example/your-project-000.json + dataset: your_dataset_name + table: your_table_name + source_format: NEWLINE_DELIMITED_JSON + compression: NONE + auto_create_dataset: true + auto_create_table: true + schema_file: example/schema.json + retain_column_policy_tags: true diff --git a/lib/embulk/output/bigquery.rb b/lib/embulk/output/bigquery.rb index 8ada6ab..904b5a8 100644 --- a/lib/embulk/output/bigquery.rb +++ b/lib/embulk/output/bigquery.rb @@ -64,6 +64,8 @@ def self.configure(config, schema, task_count) 'payload_column_index' => config.param('payload_column_index', :integer, :default => nil), 'description' => config.param('description', :string, :default => nil), + 'retain_column_descriptions' => config.param('retain_column_descriptions', :bool, :default => false), + 'retain_column_policy_tags' => config.param('retain_column_policy_tags', :bool, :default => false), 'open_timeout_sec' => config.param('open_timeout_sec', :integer, :default => nil), 'timeout_sec' => config.param('timeout_sec', :integer, :default => nil), # google-api-ruby-client < v0.11.0 diff --git a/lib/embulk/output/bigquery/bigquery_client.rb b/lib/embulk/output/bigquery/bigquery_client.rb index b42f9b9..d43111b 100644 --- a/lib/embulk/output/bigquery/bigquery_client.rb +++ b/lib/embulk/output/bigquery/bigquery_client.rb @@ -3,6 +3,7 @@ require 'thwait' require_relative 'google_client' require_relative 'helper' +require_relative 'bigquery_service_with_policy_tag' module Embulk module Output @@ -12,7 +13,8 @@ class BigqueryClient < GoogleClient def initialize(task, schema, fields = nil) scope = "https://www.googleapis.com/auth/bigquery" - client_class = Google::Apis::BigqueryV2::BigqueryService + need_takeover = (task['mode'] == 'replace') && (task['retain_column_descriptions'] || task['retain_column_policy_tags']) + client_class = need_takeover ? BigqueryServiceWithPolicyTag : Google::Apis::BigqueryV2::BigqueryService super(task, scope, client_class) @schema = schema @@ -30,6 +32,14 @@ def initialize(task, schema, fields = nil) @task['encoding'] ||= 'UTF-8' @task['ignore_unknown_values'] = false if @task['ignore_unknown_values'].nil? @task['allow_quoted_newlines'] = false if @task['allow_quoted_newlines'].nil? + + @src_fields = need_takeover ? fetch_src_fields : [] + end + + def fetch_src_fields + get_table(@task['table'])&.schema&.fields || [] + rescue NotFoundError + [] end def fields @@ -524,13 +534,23 @@ def patch_table with_job_retry do table = get_table(@task['table']) - def patch_description(fields, column_options) + def patch_description_and_policy_tags(fields, column_options, src_fields) fields.map do |field| + src_field = src_fields.select {|s_field| s_field.name == field.name}.first + if src_field + field.update!(description: src_field.description) if @task['retain_column_descriptions'] && src_field.description + field.update!(policy_tags: src_field.policy_tags) if @task['retain_column_policy_tags'] && src_field.policy_tags + if field.fields && src_field.fields + nested_fields = patch_description_and_policy_tags(field.fields, [], src_field.fields) + field.update!(fields: nested_fields) + end + end + column_option = column_options.select{|col_opt| col_opt['name'] == field.name}.first if column_option field.update!(description: column_option['description']) if column_option['description'] if field.fields && column_option['fields'] - nested_fields = patch_description(field.fields, column_option['fields']) + nested_fields = patch_description_and_policy_tags(field.fields, column_option['fields'], []) field.update!(fields: nested_fields) end end @@ -538,7 +558,7 @@ def patch_description(fields, column_options) end end - fields = patch_description(table.schema.fields, @task['column_options']) + fields = patch_description_and_policy_tags(table.schema.fields, @task['column_options'], @src_fields) table.schema.update!(fields: fields) table_id = Helper.chomp_partition_decorator(@task['table']) with_network_retry { client.patch_table(@project, @dataset, table_id, table) } diff --git a/lib/embulk/output/bigquery/bigquery_service_with_policy_tag.rb b/lib/embulk/output/bigquery/bigquery_service_with_policy_tag.rb new file mode 100644 index 0000000..011d7f0 --- /dev/null +++ b/lib/embulk/output/bigquery/bigquery_service_with_policy_tag.rb @@ -0,0 +1,88 @@ +require 'google/apis/bigquery_v2' + +module Embulk + module Output + class Bigquery < OutputPlugin + # NOTE: + # Due to the JRuby version constraint in Embulk v0.9, it’s not possible to upgrade to a version of the google-api-client (0.37.0 or later) that includes support for policy_tags. + # So the workaround was implemented using a patch-like solution as shown below. + class BigqueryServiceWithPolicyTag < Google::Apis::BigqueryV2::BigqueryService + def get_table(project_id, dataset_id, table_id, selected_fields: nil, fields: nil, quota_user: nil, user_ip: nil, options: nil, &block) + command = make_simple_command(:get, 'projects/{projectId}/datasets/{datasetId}/tables/{tableId}', options) + command.response_representation = TableWithPolicyTag::Representation + command.response_class = TableWithPolicyTag + command.params['projectId'] = project_id unless project_id.nil? + command.params['datasetId'] = dataset_id unless dataset_id.nil? + command.params['tableId'] = table_id unless table_id.nil? + command.query['selectedFields'] = selected_fields unless selected_fields.nil? + command.query['fields'] = fields unless fields.nil? + command.query['quotaUser'] = quota_user unless quota_user.nil? + command.query['userIp'] = user_ip unless user_ip.nil? + execute_or_queue_command(command, &block) + end + + def patch_table(project_id, dataset_id, table_id, table_object = nil, autodetect_schema: nil, fields: nil, quota_user: nil, options: nil, &block) + command = make_simple_command(:patch, 'projects/{+projectId}/datasets/{+datasetId}/tables/{+tableId}', options) + command.request_representation = TableWithPolicyTag::Representation + command.request_object = table_object + command.response_representation = TableWithPolicyTag::Representation + command.response_class = TableWithPolicyTag + command.params['projectId'] = project_id unless project_id.nil? + command.params['datasetId'] = dataset_id unless dataset_id.nil? + command.params['tableId'] = table_id unless table_id.nil? + command.query['autodetect_schema'] = autodetect_schema unless autodetect_schema.nil? + command.query['fields'] = fields unless fields.nil? + command.query['quotaUser'] = quota_user unless quota_user.nil? + execute_or_queue_command(command, &block) + end + end + + class TableFieldSchemaWithPolicyTag < Google::Apis::BigqueryV2::TableFieldSchema + class PolicyTags + include Google::Apis::Core::JsonObjectSupport + include Google::Apis::Core::Hashable + + class Representation < Google::Apis::Core::JsonRepresentation + collection :names, as: 'names' + end + + attr_accessor :names + + def initialize(**args) + update!(**args) + end + + def update!(**args) + @names = args[:names] if args.key?(:names) + end + end + + include Google::Apis::Core::Hashable + + attr_accessor :policy_tags + + def update!(**args) + super + @policy_tags = args[:policy_tags] if args.key?(:policy_tags) + end + + class Representation < Google::Apis::BigqueryV2::TableFieldSchema::Representation + collection :fields, as: 'fields', class: TableFieldSchemaWithPolicyTag, decorator: TableFieldSchemaWithPolicyTag::Representation + property :policy_tags, as: 'policyTags', class: TableFieldSchemaWithPolicyTag::PolicyTags, decorator: TableFieldSchemaWithPolicyTag::PolicyTags::Representation + end + end + + class TableSchemaWithPolicyTag < Google::Apis::BigqueryV2::TableSchema + class Representation < Google::Apis::BigqueryV2::TableSchema::Representation + collection :fields, as: 'fields', class: TableFieldSchemaWithPolicyTag, decorator: TableFieldSchemaWithPolicyTag::Representation + end + end + + class TableWithPolicyTag < Google::Apis::BigqueryV2::Table + class Representation < Google::Apis::BigqueryV2::Table::Representation + property :schema, as: 'schema', class: TableSchemaWithPolicyTag, decorator: TableSchemaWithPolicyTag::Representation + end + end + end + end +end diff --git a/test/test_transaction.rb b/test/test_transaction.rb index bc59510..1478ed7 100644 --- a/test/test_transaction.rb +++ b/test/test_transaction.rb @@ -137,6 +137,36 @@ def test_replace_with_partitioning end Bigquery.transaction(config, schema, processor_count, &control) end + + def test_replace_with_retain_column_descriptions + config = least_config.merge('mode' => 'replace', 'retain_column_descriptions' => true) + task = Bigquery.configure(config, schema, processor_count) + any_instance_of(BigqueryClient) do |obj| + mock(obj).get_dataset(config['dataset']) + mock(obj).get_table(config['table']) + mock(obj).create_table_if_not_exists(config['temp_table'], options: {"expiration_time"=>nil}) + mock(obj).create_table_if_not_exists(config['table']) + mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_TRUNCATE') + mock(obj).delete_table(config['temp_table']) + mock(obj).patch_table + end + Bigquery.transaction(config, schema, processor_count, &control) + end + + def test_replace_with_retain_column_policy_tags + config = least_config.merge('mode' => 'replace', 'retain_column_policy_tags' => true) + task = Bigquery.configure(config, schema, processor_count) + any_instance_of(BigqueryClient) do |obj| + mock(obj).get_dataset(config['dataset']) + mock(obj).get_table(config['table']) + mock(obj).create_table_if_not_exists(config['temp_table'], options: {"expiration_time"=>nil}) + mock(obj).create_table_if_not_exists(config['table']) + mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_TRUNCATE') + mock(obj).delete_table(config['temp_table']) + mock(obj).patch_table + end + Bigquery.transaction(config, schema, processor_count, &control) + end end sub_test_case "replace_backup" do