Skip to content

Commit 6e5d93c

Browse files
authored
Merge branch 'master' into support_description
2 parents 72372d1 + 0703085 commit 6e5d93c

File tree

12 files changed

+145
-37
lines changed

12 files changed

+145
-37
lines changed

.github/workflows/check.yml

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
name: Check
2+
on: [ pull_request, push ]
3+
jobs:
4+
check:
5+
runs-on: ubuntu-latest
6+
# push: always run.
7+
# pull_request: run only when the PR is submitted from a forked repository, not within this repository.
8+
if: github.event_name == 'push' || github.event.pull_request.head.repo.full_name != github.repository
9+
strategy:
10+
matrix:
11+
jruby_version:
12+
- 9.3.10.0
13+
- 9.4.2.0
14+
fail-fast: false
15+
steps:
16+
- uses: actions/checkout@v4
17+
- name: Set up OpenJDK 8
18+
uses: actions/setup-java@v4
19+
with:
20+
java-version: 8
21+
distribution: "temurin"
22+
- uses: ruby/setup-ruby@v1
23+
with:
24+
ruby-version: 'jruby-${{ matrix.jruby_version }}'
25+
bundler-cache: true
26+
- name: show ruby version
27+
run: ruby -v
28+
- name: bundle install
29+
run: bundle install
30+
#
31+
# This step avoids the following error in the JRuby 9.4 test.
32+
#
33+
# Gem::LoadError: You have already activated rake 13.0.6,
34+
# but your Gemfile requires rake 13.1.0. Prepending
35+
# `bundle exec` to your command may solve this.
36+
#
37+
- name: install rake 13.1.0
38+
run: gem install rake -v 13.1.0
39+
- name: install embulk.jar
40+
run: "curl -L -o embulk.jar https://github.com/embulk/embulk/releases/download/v0.10.49/embulk-0.10.49.jar"
41+
- name: rake test
42+
run: bundle exec env RUBYOPT="-r ./embulk.jar" rake test

.github/workflows/publish.yml

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
name: Publish
2+
on:
3+
push:
4+
tags:
5+
- "v0.*"
6+
jobs:
7+
publish:
8+
runs-on: ubuntu-latest
9+
environment: maven-central-and-ruby-gems
10+
strategy:
11+
fail-fast: true
12+
steps:
13+
- uses: actions/checkout@v4
14+
- name: Set up Ruby
15+
uses: ruby/setup-ruby@v1
16+
with:
17+
ruby-version: 3.3.0
18+
# get tag variable using {{ github.ref_name }}
19+
#
20+
# References:
21+
# * https://docs.github.com/en/actions/learn-github-actions/contexts#github-context
22+
# * https://docs.github.com/en/actions/learn-github-actions/variables#default-environment-variables
23+
- name: extract gem version from tag
24+
id: vars
25+
run: echo version=${{ github.ref_name }} | sed -e 's/v0/0/' >> $GITHUB_OUTPUT
26+
#
27+
# From gem push documents.
28+
#
29+
# The push command will use ~/.gem/credentials to authenticate to a server,
30+
# but you can use the RubyGems environment variable GEM_HOST_API_KEY
31+
# to set the api key to authenticate.
32+
#
33+
# https://guides.rubygems.org/command-reference/#gem-push
34+
#
35+
- name: Publish
36+
run: |
37+
rake build
38+
gem push pkg/${EMBULK_PLUGIN_NAME}-${{ steps.vars.outputs.version }}.gem
39+
env:
40+
EMBULK_PLUGIN_NAME: embulk-output-bigquery
41+
GEM_HOST_API_KEY: "${{secrets.RUBYGEMS_API_KEY}}"

CHANGELOG.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,13 @@
1+
## 0.7.0 - 2024-02-1
2+
* [enhancement] Add support Embulk 0.11.x
3+
4+
## 0.6.9 - 2023-03-16
5+
* [enhancement] Add SSLException to retry job (thanks to @mzumi)
6+
7+
## 0.6.8 - 2022-10-12
8+
* [enhancement] Support JSON type (thanks to @civitaspo )
9+
* [maintenance] Add an error message in order to retry (thanks to @mzumi)
10+
111
## 0.6.7 - 2021-09-10
212
* [enhancement] Add an expiration option of temporary table to clean up (thanks to @TKNGUE)
313

Gemfile

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
source 'https://rubygems.org/'
22

33
gemspec
4-
gem 'embulk', '< 0.10'
5-
gem 'liquid', '= 4.0.0' # the version included in embulk.jar
4+
gem 'embulk', '= 0.10.49'
65
gem 'embulk-parser-none'
76
gem 'embulk-parser-jsonl'
87
gem 'pry-nav'

README.md

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
# embulk-output-bigquery
22

3-
[![Build Status](https://secure.travis-ci.org/embulk/embulk-output-bigquery.png?branch=master)](http://travis-ci.org/embulk/embulk-output-bigquery)
4-
53
[Embulk](https://github.com/embulk/embulk/) output plugin to load/insert data into [Google BigQuery](https://cloud.google.com/bigquery/) using [direct insert](https://cloud.google.com/bigquery/loading-data-into-bigquery#loaddatapostrequest)
64

75
## Overview
@@ -14,6 +12,13 @@ https://developers.google.com/bigquery/loading-data-into-bigquery
1412
* **Cleanup supported**: no
1513
* **Dynamic table creating**: yes
1614

15+
### Supported Embulk
16+
17+
| gem version | Embulk version |
18+
|------------------|--------------------|
19+
| 0.7.0 and higher | v0.11.0 and higher |
20+
| 0.6.9 and lower | v0.9.X and lower |
21+
1722
### NOT IMPLEMENTED
1823
* insert data over streaming inserts
1924
* for continuous real-time insertions

embulk-output-bigquery.gemspec

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |spec|
22
spec.name = "embulk-output-bigquery"
3-
spec.version = "0.6.7"
3+
spec.version = "0.7.0"
44
spec.authors = ["Satoshi Akama", "Naotoshi Seo"]
55
spec.summary = "Google BigQuery output plugin for Embulk"
66
spec.description = "Embulk plugin that insert records to Google BigQuery."
@@ -14,18 +14,13 @@ Gem::Specification.new do |spec|
1414
spec.test_files = spec.files.grep(%r{^(test|spec)/})
1515
spec.require_paths = ["lib"]
1616

17-
# TODO
18-
# signet 0.12.0 and google-api-client 0.33.0 require >= Ruby 2.4.
19-
# Embulk 0.9 use JRuby 9.1.X.Y and it's compatible with Ruby 2.3.
20-
# So, force install signet < 0.12 and google-api-client < 0.33.0
21-
# Also, representable version >= 3.1.0 requires Ruby version >= 2.4
22-
spec.add_dependency 'signet', '~> 0.7', '< 0.12.0'
23-
spec.add_dependency 'google-api-client','< 0.33.0'
17+
# the latest version
18+
spec.add_dependency 'google-api-client','= 0.53.0'
2419
spec.add_dependency 'time_with_zone'
25-
spec.add_dependency "representable", ['~> 3.0.0', '< 3.1']
26-
# faraday 1.1.0 require >= Ruby 2.4.
27-
# googleauth 0.9.0 requires faraday ~> 0.12
28-
spec.add_dependency "faraday", '~> 0.12'
20+
spec.add_dependency 'thwait'
21+
# activesupport require Ruby >= 2.7.0
22+
# jruby-9.3.0.0 is MRI 2.6 compatible
23+
spec.add_dependency 'activesupport', "< 7.0"
2924

3025
spec.add_development_dependency 'bundler', ['>= 1.10.6']
3126
spec.add_development_dependency 'rake', ['>= 10.0']

lib/embulk/output/bigquery/bigquery_client.rb

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ def load_from_gcs(object_uris, table)
121121
opts = {}
122122

123123
Embulk.logger.debug { "embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts})" }
124-
response = with_network_retry { client.insert_job(@project, body, opts) }
124+
response = with_network_retry { client.insert_job(@project, body, **opts) }
125125
unless @task['is_skip_job_result_check']
126126
response = wait_load('Load', response)
127127
end
@@ -222,7 +222,7 @@ def load(path, table, write_disposition: 'WRITE_APPEND')
222222
# },
223223
}
224224
Embulk.logger.debug { "embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts})" }
225-
response = with_network_retry { client.insert_job(@project, body, opts) }
225+
response = with_network_retry { client.insert_job(@project, body, **opts) }
226226
if @task['is_skip_job_result_check']
227227
response
228228
else
@@ -278,7 +278,7 @@ def copy(source_table, destination_table, destination_dataset = nil, write_dispo
278278

279279
opts = {}
280280
Embulk.logger.debug { "embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts})" }
281-
response = with_network_retry { client.insert_job(@project, body, opts) }
281+
response = with_network_retry { client.insert_job(@project, body, **opts) }
282282
wait_load('Copy', response)
283283
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
284284
response = {status_code: e.status_code, message: e.message, error_class: e.class}
@@ -372,7 +372,7 @@ def create_dataset(dataset = nil, reference: nil)
372372
end
373373
opts = {}
374374
Embulk.logger.debug { "embulk-output-bigquery: insert_dataset(#{@project}, #{dataset}, #{@location_for_log}, #{body}, #{opts})" }
375-
with_network_retry { client.insert_dataset(@project, body, opts) }
375+
with_network_retry { client.insert_dataset(@project, body, **opts) }
376376
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
377377
if e.status_code == 409 && /Already Exists:/ =~ e.message
378378
# ignore 'Already Exists' error
@@ -448,7 +448,7 @@ def create_table_if_not_exists(table, dataset: nil, options: nil)
448448

449449
opts = {}
450450
Embulk.logger.debug { "embulk-output-bigquery: insert_table(#{@project}, #{dataset}, #{@location_for_log}, #{body}, #{opts})" }
451-
with_network_retry { client.insert_table(@project, dataset, body, opts) }
451+
with_network_retry { client.insert_table(@project, dataset, body, **opts) }
452452
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
453453
if e.status_code == 409 && /Already Exists:/ =~ e.message
454454
# ignore 'Already Exists' error

lib/embulk/output/bigquery/google_client.rb

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,14 @@ def with_network_retry(&block)
4949
retries = 0
5050
begin
5151
yield
52-
rescue ::Java::Java.net.SocketException, ::Java::Java.net.ConnectException => e
53-
if ['Broken pipe', 'Connection reset', 'Connection timed out'].select { |x| e.message.include?(x) }.empty?
52+
rescue ::Java::Java.net.SocketException, ::Java::Java.net.ConnectException, ::Java::JavaxNetSsl::SSLException => e
53+
retry_messages = [
54+
'Broken pipe',
55+
'Connection reset',
56+
'Connection timed out',
57+
'Connection or outbound has closed',
58+
]
59+
if retry_messages.select { |x| e.message.include?(x) }.empty?
5460
raise e
5561
else
5662
if retries < @task['retries']

lib/embulk/output/bigquery/value_converter_factory.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,10 @@ def json_converter
288288
Proc.new {|val|
289289
val
290290
}
291+
when 'JSON'
292+
Proc.new {|val|
293+
val
294+
}
291295
else
292296
raise NotSupportedType, "cannot take column type #{type} for json column"
293297
end

test/helper.rb

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@
44
require 'test/unit'
55
require 'test/unit/rr'
66

7+
# Embulk 0.10.x introduced new bootstrap mechanism.
8+
# https://github.com/embulk/embulk/blob/641f35fec064cca7b1a7314d634a4b64ef8637f1/embulk-ruby/test/vanilla/run-test.rb#L8-L13
9+
static_initializer = Java::org.embulk.EmbulkDependencyClassLoader.staticInitializer().useSelfContainedJarFiles()
10+
static_initializer.java_send :initialize
11+
12+
require 'embulk/java/bootstrap'
713
require 'embulk'
8-
begin
9-
# Embulk ~> 0.8.x
10-
Embulk.setup
11-
rescue NotImplementedError, NoMethodError, NameError
12-
# Embulk ~> 0.9.x
13-
require 'embulk/java/bootstrap'
14-
end
14+
1515
Embulk.logger = Embulk::Logger.new('/dev/null')
1616

1717
APP_ROOT = File.expand_path('../', __dir__)

test/test_transaction.rb

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ def test_replace
109109
task = Bigquery.configure(config, schema, processor_count)
110110
any_instance_of(BigqueryClient) do |obj|
111111
mock(obj).get_dataset(config['dataset'])
112-
mock(obj).create_table_if_not_exists(config['temp_table'], { :options => { "expiration_time" => nil } })
112+
mock(obj).create_table_if_not_exists(config['temp_table'], options: {"expiration_time"=>nil})
113113
mock(obj).create_table_if_not_exists(config['table'])
114114
mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_TRUNCATE')
115115
mock(obj).delete_table(config['temp_table'])
@@ -122,7 +122,7 @@ def test_replace_with_partitioning
122122
task = Bigquery.configure(config, schema, processor_count)
123123
any_instance_of(BigqueryClient) do |obj|
124124
mock(obj).get_dataset(config['dataset'])
125-
mock(obj).create_table_if_not_exists(config['temp_table'], { :options => { "expiration_time" => nil } })
125+
mock(obj).create_table_if_not_exists(config['temp_table'], options: {"expiration_time"=>nil})
126126
mock(obj).create_table_if_not_exists(config['table'])
127127
mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_TRUNCATE')
128128
mock(obj).delete_table(config['temp_table'])
@@ -138,7 +138,7 @@ def test_replace_backup
138138
any_instance_of(BigqueryClient) do |obj|
139139
mock(obj).get_dataset(config['dataset'])
140140
mock(obj).get_dataset(config['dataset_old'])
141-
mock(obj).create_table_if_not_exists(config['temp_table'], { :options => { "expiration_time" => nil } })
141+
mock(obj).create_table_if_not_exists(config['temp_table'], options: {"expiration_time"=>nil})
142142
mock(obj).create_table_if_not_exists(config['table'])
143143
mock(obj).create_table_if_not_exists(config['table_old'], dataset: config['dataset_old'])
144144

@@ -158,7 +158,7 @@ def test_replace_backup_auto_create_dataset
158158
mock(obj).create_dataset(config['dataset'])
159159
mock(obj).create_dataset(config['dataset_old'], reference: config['dataset'])
160160
mock(obj).create_table_if_not_exists(config['table'])
161-
mock(obj).create_table_if_not_exists(config['temp_table'], { :options => { "expiration_time" => nil } })
161+
mock(obj).create_table_if_not_exists(config['temp_table'], options: {"expiration_time"=>nil})
162162
mock(obj).create_table_if_not_exists(config['table_old'], dataset: config['dataset_old'])
163163

164164
mock(obj).get_table_or_partition(config['table'])
@@ -176,7 +176,7 @@ def test_replace_backup_with_partitioning
176176
any_instance_of(BigqueryClient) do |obj|
177177
mock(obj).get_dataset(config['dataset'])
178178
mock(obj).get_dataset(config['dataset_old'])
179-
mock(obj).create_table_if_not_exists(config['temp_table'], { :options => { "expiration_time" => nil } })
179+
mock(obj).create_table_if_not_exists(config['temp_table'], options: {"expiration_time"=>nil})
180180
mock(obj).create_table_if_not_exists(config['table'])
181181
mock(obj).create_table_if_not_exists(config['table_old'], dataset: config['dataset_old'])
182182

@@ -196,7 +196,7 @@ def test_append
196196
task = Bigquery.configure(config, schema, processor_count)
197197
any_instance_of(BigqueryClient) do |obj|
198198
mock(obj).get_dataset(config['dataset'])
199-
mock(obj).create_table_if_not_exists(config['temp_table'], { :options => { "expiration_time" => nil } })
199+
mock(obj).create_table_if_not_exists(config['temp_table'], options: {"expiration_time"=>nil})
200200
mock(obj).create_table_if_not_exists(config['table'])
201201
mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_APPEND')
202202
mock(obj).delete_table(config['temp_table'])
@@ -209,7 +209,7 @@ def test_append_with_partitioning
209209
task = Bigquery.configure(config, schema, processor_count)
210210
any_instance_of(BigqueryClient) do |obj|
211211
mock(obj).get_dataset(config['dataset'])
212-
mock(obj).create_table_if_not_exists(config['temp_table'], { :options => { "expiration_time" => nil } })
212+
mock(obj).create_table_if_not_exists(config['temp_table'], options: {"expiration_time"=>nil})
213213
mock(obj).create_table_if_not_exists(config['table'])
214214
mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_APPEND')
215215
mock(obj).delete_table(config['temp_table'])

test/test_value_converter_factory.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,12 @@ def test_record
389389
assert_equal nil, converter.call(nil)
390390
assert_equal({'foo'=>'foo'}, converter.call({'foo'=>'foo'}))
391391
end
392+
393+
def test_json
394+
converter = ValueConverterFactory.new(SCHEMA_TYPE, 'JSON').create_converter
395+
assert_equal nil, converter.call(nil)
396+
assert_equal({'foo'=>'foo'}, converter.call({'foo'=>'foo'}))
397+
end
392398
end
393399

394400
def test_strict_false

0 commit comments

Comments
 (0)