Skip to content

Added option to take over column’s description and policy_tags when in replace mode #33

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ name: Check
on: [ pull_request, push ]
jobs:
check:
runs-on: ubuntu-latest
# https://github.com/ruby/ruby-builder/releases/tag/toolcache
# No support for jruby-9.1.17.0 after ubuntu-24.04
runs-on: ubuntu-22.04
# push: always run.
# pull_request: run only when the PR is submitted from a forked repository, not within this repository.
if: github.event_name == 'push' || github.event.pull_request.head.repo.full_name != github.repository
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ OAuth flow for installed applications.
| description | string | optional | nil | description of table |
| merge_keys | array | optional | | key column names for merging records in merge mode (string array, required in merge mode if table doesn't have primary key) |
| merge_rule | array | optional | | list of column assignments for updating existing records used in merge mode, for example foo = T.foo + S.foo (T means target table and S means source table). (string array, default: always overwrites with new values) |
| retain_column_descriptions | boolean | optional | false | In case of replace mode, the column's descriptions are taken over. |
| retain_column_policy_tags | boolean | optional | false | In case of replace mode, the table policy tags are taken over. |

Client or request options

Expand Down
31 changes: 31 additions & 0 deletions example/config_replace_retain_column_descriptions.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
in:
type: file
path_prefix: example/example.csv
parser:
type: csv
charset: UTF-8
newline: CRLF
null_string: 'NULL'
skip_header_lines: 1
comment_line_marker: '#'
columns:
- {name: date, type: string}
- {name: timestamp, type: timestamp, format: "%Y-%m-%d %H:%M:%S.%N", timezone: "+09:00"}
- {name: "null", type: string}
- {name: long, type: long}
- {name: string, type: string}
- {name: double, type: double}
- {name: boolean, type: boolean}
out:
type: bigquery
mode: replace
auth_method: service_account
json_keyfile: example/your-project-000.json
dataset: your_dataset_name
table: your_table_name
source_format: NEWLINE_DELIMITED_JSON
compression: NONE
auto_create_dataset: true
auto_create_table: true
schema_file: example/schema.json
retain_column_descriptions: true
31 changes: 31 additions & 0 deletions example/config_replace_retain_column_policy_tags.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
in:
type: file
path_prefix: example/example.csv
parser:
type: csv
charset: UTF-8
newline: CRLF
null_string: 'NULL'
skip_header_lines: 1
comment_line_marker: '#'
columns:
- {name: date, type: string}
- {name: timestamp, type: timestamp, format: "%Y-%m-%d %H:%M:%S.%N", timezone: "+09:00"}
- {name: "null", type: string}
- {name: long, type: long}
- {name: string, type: string}
- {name: double, type: double}
- {name: boolean, type: boolean}
out:
type: bigquery
mode: replace
auth_method: service_account
json_keyfile: example/your-project-000.json
dataset: your_dataset_name
table: your_table_name
source_format: NEWLINE_DELIMITED_JSON
compression: NONE
auto_create_dataset: true
auto_create_table: true
schema_file: example/schema.json
retain_column_policy_tags: true
2 changes: 2 additions & 0 deletions lib/embulk/output/bigquery.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ def self.configure(config, schema, task_count)
'payload_column_index' => config.param('payload_column_index', :integer, :default => nil),

'description' => config.param('description', :string, :default => nil),
'retain_column_descriptions' => config.param('retain_column_descriptions', :bool, :default => false),
'retain_column_policy_tags' => config.param('retain_column_policy_tags', :bool, :default => false),

'open_timeout_sec' => config.param('open_timeout_sec', :integer, :default => nil),
'timeout_sec' => config.param('timeout_sec', :integer, :default => nil), # google-api-ruby-client < v0.11.0
Expand Down
28 changes: 24 additions & 4 deletions lib/embulk/output/bigquery/bigquery_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require 'thwait'
require_relative 'google_client'
require_relative 'helper'
require_relative 'bigquery_service_with_policy_tag'

module Embulk
module Output
Expand All @@ -12,7 +13,8 @@ class BigqueryClient < GoogleClient

def initialize(task, schema, fields = nil)
scope = "https://www.googleapis.com/auth/bigquery"
client_class = Google::Apis::BigqueryV2::BigqueryService
need_takeover = (task['mode'] == 'replace') && (task['retain_column_descriptions'] || task['retain_column_policy_tags'])
client_class = need_takeover ? BigqueryServiceWithPolicyTag : Google::Apis::BigqueryV2::BigqueryService
super(task, scope, client_class)

@schema = schema
Expand All @@ -30,6 +32,14 @@ def initialize(task, schema, fields = nil)
@task['encoding'] ||= 'UTF-8'
@task['ignore_unknown_values'] = false if @task['ignore_unknown_values'].nil?
@task['allow_quoted_newlines'] = false if @task['allow_quoted_newlines'].nil?

@src_fields = need_takeover ? fetch_src_fields : []
end

def fetch_src_fields
get_table(@task['table'])&.schema&.fields || []
rescue NotFoundError
[]
end

def fields
Expand Down Expand Up @@ -524,21 +534,31 @@ def patch_table
with_job_retry do
table = get_table(@task['table'])

def patch_description(fields, column_options)
def patch_description_and_policy_tags(fields, column_options, src_fields)
fields.map do |field|
src_field = src_fields.select {|s_field| s_field.name == field.name}.first
if src_field
field.update!(description: src_field.description) if @task['retain_column_descriptions'] && src_field.description
field.update!(policy_tags: src_field.policy_tags) if @task['retain_column_policy_tags'] && src_field.policy_tags
if field.fields && src_field.fields
nested_fields = patch_description_and_policy_tags(field.fields, [], src_field.fields)
field.update!(fields: nested_fields)
end
end

column_option = column_options.select{|col_opt| col_opt['name'] == field.name}.first
if column_option
field.update!(description: column_option['description']) if column_option['description']
if field.fields && column_option['fields']
nested_fields = patch_description(field.fields, column_option['fields'])
nested_fields = patch_description_and_policy_tags(field.fields, column_option['fields'], [])
field.update!(fields: nested_fields)
end
end
field
end
end

fields = patch_description(table.schema.fields, @task['column_options'])
fields = patch_description_and_policy_tags(table.schema.fields, @task['column_options'], @src_fields)
table.schema.update!(fields: fields)
table_id = Helper.chomp_partition_decorator(@task['table'])
with_network_retry { client.patch_table(@project, @dataset, table_id, table) }
Expand Down
88 changes: 88 additions & 0 deletions lib/embulk/output/bigquery/bigquery_service_with_policy_tag.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
require 'google/apis/bigquery_v2'

module Embulk
module Output
class Bigquery < OutputPlugin
# NOTE:
# Due to the JRuby version constraint in Embulk v0.9, it’s not possible to upgrade to a version of the google-api-client (0.37.0 or later) that includes support for policy_tags.
# So the workaround was implemented using a patch-like solution as shown below.
class BigqueryServiceWithPolicyTag < Google::Apis::BigqueryV2::BigqueryService
def get_table(project_id, dataset_id, table_id, selected_fields: nil, fields: nil, quota_user: nil, user_ip: nil, options: nil, &block)
command = make_simple_command(:get, 'projects/{projectId}/datasets/{datasetId}/tables/{tableId}', options)
command.response_representation = TableWithPolicyTag::Representation
command.response_class = TableWithPolicyTag
command.params['projectId'] = project_id unless project_id.nil?
command.params['datasetId'] = dataset_id unless dataset_id.nil?
command.params['tableId'] = table_id unless table_id.nil?
command.query['selectedFields'] = selected_fields unless selected_fields.nil?
command.query['fields'] = fields unless fields.nil?
command.query['quotaUser'] = quota_user unless quota_user.nil?
command.query['userIp'] = user_ip unless user_ip.nil?
execute_or_queue_command(command, &block)
end

def patch_table(project_id, dataset_id, table_id, table_object = nil, autodetect_schema: nil, fields: nil, quota_user: nil, options: nil, &block)
command = make_simple_command(:patch, 'projects/{+projectId}/datasets/{+datasetId}/tables/{+tableId}', options)
command.request_representation = TableWithPolicyTag::Representation
command.request_object = table_object
command.response_representation = TableWithPolicyTag::Representation
command.response_class = TableWithPolicyTag
command.params['projectId'] = project_id unless project_id.nil?
command.params['datasetId'] = dataset_id unless dataset_id.nil?
command.params['tableId'] = table_id unless table_id.nil?
command.query['autodetect_schema'] = autodetect_schema unless autodetect_schema.nil?
command.query['fields'] = fields unless fields.nil?
command.query['quotaUser'] = quota_user unless quota_user.nil?
execute_or_queue_command(command, &block)
end
end

class TableFieldSchemaWithPolicyTag < Google::Apis::BigqueryV2::TableFieldSchema
class PolicyTags
include Google::Apis::Core::JsonObjectSupport
include Google::Apis::Core::Hashable

class Representation < Google::Apis::Core::JsonRepresentation
collection :names, as: 'names'
end

attr_accessor :names

def initialize(**args)
update!(**args)
end

def update!(**args)
@names = args[:names] if args.key?(:names)
end
end

include Google::Apis::Core::Hashable

attr_accessor :policy_tags

def update!(**args)
super
@policy_tags = args[:policy_tags] if args.key?(:policy_tags)
end

class Representation < Google::Apis::BigqueryV2::TableFieldSchema::Representation
collection :fields, as: 'fields', class: TableFieldSchemaWithPolicyTag, decorator: TableFieldSchemaWithPolicyTag::Representation
property :policy_tags, as: 'policyTags', class: TableFieldSchemaWithPolicyTag::PolicyTags, decorator: TableFieldSchemaWithPolicyTag::PolicyTags::Representation
end
end

class TableSchemaWithPolicyTag < Google::Apis::BigqueryV2::TableSchema
class Representation < Google::Apis::BigqueryV2::TableSchema::Representation
collection :fields, as: 'fields', class: TableFieldSchemaWithPolicyTag, decorator: TableFieldSchemaWithPolicyTag::Representation
end
end

class TableWithPolicyTag < Google::Apis::BigqueryV2::Table
class Representation < Google::Apis::BigqueryV2::Table::Representation
property :schema, as: 'schema', class: TableSchemaWithPolicyTag, decorator: TableSchemaWithPolicyTag::Representation
end
end
end
end
end
30 changes: 30 additions & 0 deletions test/test_transaction.rb
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,36 @@ def test_replace_with_partitioning
end
Bigquery.transaction(config, schema, processor_count, &control)
end

def test_replace_with_retain_column_descriptions
config = least_config.merge('mode' => 'replace', 'retain_column_descriptions' => true)
task = Bigquery.configure(config, schema, processor_count)
any_instance_of(BigqueryClient) do |obj|
mock(obj).get_dataset(config['dataset'])
mock(obj).get_table(config['table'])
mock(obj).create_table_if_not_exists(config['temp_table'], options: {"expiration_time"=>nil})
mock(obj).create_table_if_not_exists(config['table'])
mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_TRUNCATE')
mock(obj).delete_table(config['temp_table'])
mock(obj).patch_table
end
Bigquery.transaction(config, schema, processor_count, &control)
end

def test_replace_with_retain_column_policy_tags
config = least_config.merge('mode' => 'replace', 'retain_column_policy_tags' => true)
task = Bigquery.configure(config, schema, processor_count)
any_instance_of(BigqueryClient) do |obj|
mock(obj).get_dataset(config['dataset'])
mock(obj).get_table(config['table'])
mock(obj).create_table_if_not_exists(config['temp_table'], options: {"expiration_time"=>nil})
mock(obj).create_table_if_not_exists(config['table'])
mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_TRUNCATE')
mock(obj).delete_table(config['temp_table'])
mock(obj).patch_table
end
Bigquery.transaction(config, schema, processor_count, &control)
end
end

sub_test_case "replace_backup" do
Expand Down