Skip to content

feat: Add range partitioning support #174

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 13, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ Following options are same as [bq command-line tools](https://cloud.google.com/b
| time_partitioning.type | string | required | nil | The only type supported is DAY, which will generate one partition per day based on data loading time. |
| time_partitioning.expiration_ms | int | optional | nil | Number of milliseconds for which to keep the storage for a partition. |
| time_partitioning.field | string | optional | nil | `DATE` or `TIMESTAMP` column used for partitioning |
| range_partitioning | hash | optional | nil | See [Range Partitioning](#range-partitioning) |
| range_partitioning.field | string | required | nil | `INT64` column used for partitioning |
| range-partitioning.range | hash | required | nil | Defines the ranges for range paritioning |
| range-partitioning.range.start | int | required | nil | The start of range partitioning, inclusive. |
| range-partitioning.range.end | int | required | nil | The end of range partitioning, exclusive. |
| range-partitioning.range.interval| int | required | nil | The width of each interval. |
| clustering | hash | optional | nil | Currently, clustering is supported for partitioned tables, so must be used with `time_partitioning` option. See [clustered tables](https://cloud.google.com/bigquery/docs/clustered-tables) |
| clustering.fields | array | required | nil | One or more fields on which data should be clustered. The order of the specified columns determines the sort order of the data. |
| schema_update_options | array | optional | nil | (Experimental) List of `ALLOW_FIELD_ADDITION` or `ALLOW_FIELD_RELAXATION` or both. See [jobs#configuration.load.schemaUpdateOptions](https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.schemaUpdateOptions). NOTE for the current status: `schema_update_options` does not work for `copy` job, that is, is not effective for most of modes such as `append`, `replace` and `replace_backup`. `delete_in_advance` deletes origin table so does not need to update schema. Only `append_direct` can utilize schema update. |
Expand Down Expand Up @@ -448,6 +454,24 @@ MEMO: [jobs#configuration.load.schemaUpdateOptions](https://cloud.google.com/big
to update the schema of the desitination table as a side effect of the load job, but it is not available for copy job.
Thus, it was not suitable for embulk-output-bigquery idempotence modes, `append`, `replace`, and `replace_backup`, sigh.

### Range Partitioning

See also [Creating and Updating Range-Partitioned Tables](https://cloud.google.com/bigquery/docs/creating-partitioned-tables).

To load into a partition, specify `range_partitioning` and `table` parameter with a partition decorator as:

```yaml
out:
type: bigquery
table: table_name$1
range_partitioning:
field: customer_id
range:
start: 1
end: 99999
range: 1
```

## Development

### Run example:
Expand Down
36 changes: 36 additions & 0 deletions example/config_replace_field_range_partitioned_table.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
in:
type: file
path_prefix: example/example.csv
parser:
type: csv
charset: UTF-8
newline: CRLF
null_string: 'NULL'
skip_header_lines: 1
comment_line_marker: '#'
columns:
- {name: date, type: string}
- {name: timestamp, type: timestamp, format: "%Y-%m-%d %H:%M:%S.%N", timezone: "+09:00"}
- {name: "null", type: string}
- {name: long, type: long}
- {name: string, type: string}
- {name: double, type: double}
- {name: boolean, type: boolean}
out:
type: bigquery
mode: replace
auth_method: service_account
json_keyfile: example/your-project-000.json
dataset: your_dataset_name
table: your_field_partitioned_table_name
source_format: NEWLINE_DELIMITED_JSON
compression: NONE
auto_create_dataset: true
auto_create_table: true
schema_file: example/schema.json
range_partitioning:
field: 'long'
range:
start: 90
end: 100
interval: 1
36 changes: 35 additions & 1 deletion lib/embulk/output/bigquery.rb
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ def self.configure(config, schema, task_count)
'ignore_unknown_values' => config.param('ignore_unknown_values', :bool, :default => false),
'allow_quoted_newlines' => config.param('allow_quoted_newlines', :bool, :default => false),
'time_partitioning' => config.param('time_partitioning', :hash, :default => nil),
'range_partitioning' => config.param('range_partitioning', :hash, :default => nil),
'clustering' => config.param('clustering', :hash, :default => nil), # google-api-ruby-client >= v0.21.0
'schema_update_options' => config.param('schema_update_options', :array, :default => nil),

Expand Down Expand Up @@ -231,10 +232,43 @@ def self.configure(config, schema, task_count)
unless task['time_partitioning']['type']
raise ConfigError.new "`time_partitioning` must have `type` key"
end
elsif Helper.has_partition_decorator?(task['table'])
# If user specify range_partitioning, it should be used as is
elsif Helper.has_partition_decorator?(task['table']) && task['range_partitioning'].nil?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate on this part? IIUC Helper.has_partition_decorator?(task['table']) returns true if the table name contains $. why you add task['range_partitioning'].nil??

I can't understand If user specify range_partitioning, it should be used as is means.

In my opinion, these checks are preferable.

# can't use two partitions config at the same time.
if task['time_partitioning'] && task['range_partitioning']
  raise ConfigError.new ...
end

# partition decrator doesn't support range_partition (if needed)
if Helper.has_partition_decorator?(task['table']) && task['range_partitioning']
  raise ConfigError.new ...
end

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the review. I think that the following code would be preferable.

if Helper.has_partition_decorator?(task['table'])
  task['time_partitioning'] = {'type' => 'DAY'}
end

if task['time_partitioning'] && task['range_partitioning']
  raise ConfigError.new ...
end

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, the error message would be confusing. I fix like e9cc945. Could you check this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(This is just a double check.) the error message means If user specify range_partitioning, it should be used as is?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I use like following. Users who use partition decorator may be confused, because he don't use time_partitioning.

if Helper.has_partition_decorator?(task['table'])
  task['time_partitioning'] = {'type' => 'DAY'}
end

if task['time_partitioning'] && task['range_partitioning']
  raise ConfigError.new "`time_partitioning` and `range_partitioning` cannot be used at the same time"
end

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, you mean if the user uses the following configuration, the user may confuse the storage message. correct?
If so, your proposed PR seems good.

  table: part_test$200105
  range_partitioning:
    field: id
    range:
      start: 1
      end: 5001
      interval: 500

task['time_partitioning'] = {'type' => 'DAY'}
end

if task['range_partitioning']
unless task['range_partitioning']['field']
raise ConfigError.new "`range_partitioning` must have `field` key"
end
unless task['range_partitioning']['range']
raise ConfigError.new "`range_partitioning` must have `range` key"
end

range = task['range_partitioning']['range']
unless range['start']
raise ConfigError.new "`range_partitioning` must have `range.start` key"
end
unless range['start'].is_a?(Integer)
raise ConfigError.new "`range_partitioning.range.start` must be an integer"
end
unless range['end']
raise ConfigError.new "`range_partitioning` must have `range.end` key"
end
unless range['end'].is_a?(Integer)
raise ConfigError.new "`range_partitioning.range.end` must be an integer"
end
unless range['interval']
raise ConfigError.new "`range_partitioning` must have `range.interval` key"
end
unless range['interval'].is_a?(Integer)
raise ConfigError.new "`range_partitioning.range.interval` must be an integer"
end
if range['start'] + range['interval'] > range['end']
raise ConfigError.new "`range_partitioning.range.start` + `range_partitioning.range.interval` must be less than `range_partitioning.range.end`"
end
end

if task['clustering']
unless task['clustering']['fields']
raise ConfigError.new "`clustering` must have `fields` key"
Expand Down
14 changes: 13 additions & 1 deletion lib/embulk/output/bigquery/bigquery_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ def create_table_if_not_exists(table, dataset: nil, options: nil)
dataset ||= @dataset
options ||= {}
options['time_partitioning'] ||= @task['time_partitioning']
if Helper.has_partition_decorator?(table)
if Helper.has_partition_decorator?(table) && @task['range_partitioning'].nil?
options['time_partitioning'] ||= {'type' => 'DAY'}
table = Helper.chomp_partition_decorator(table)
end
Expand All @@ -435,6 +435,18 @@ def create_table_if_not_exists(table, dataset: nil, options: nil)
}
end

options['range_partitioning'] ||= @task['range_partitioning']
if options['range_partitioning']
body[:range_partitioning] = {
field: options['range_partitioning']['field'],
range: {
start: options['range_partitioning']['range']['start'].to_s,
end: options['range_partitioning']['range']['end'].to_s,
interval: options['range_partitioning']['range']['interval'].to_s,
},
}
end

options['clustering'] ||= @task['clustering']
if options['clustering']
body[:clustering] = {
Expand Down
30 changes: 30 additions & 0 deletions test/test_configure.rb
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,36 @@ def test_time_partitioning
assert_equal 'DAY', task['time_partitioning']['type']
end

def test_range_partitioning
config = least_config.merge('range_partitioning' => {'field' => 'foo', 'range' => { 'start' => 1, 'end' => 2, 'interval' => 1 }})
assert_nothing_raised { Bigquery.configure(config, schema, processor_count) }

# field is required
config = least_config.merge('range_partitioning' => {'range' => { 'start' => 1, 'end' => 2, 'interval' => 1 }})
assert_raise { Bigquery.configure(config, schema, processor_count) }


# range is required
config = least_config.merge('range_partitioning' => {'field' => 'foo'})
assert_raise { Bigquery.configure(config, schema, processor_count) }

# range.start is required
config = least_config.merge('range_partitioning' => {'field' => 'foo', 'range' => { 'end' => 2, 'interval' => 1 }})
assert_raise { Bigquery.configure(config, schema, processor_count) }

# range.end is required
config = least_config.merge('range_partitioning' => {'field' => 'foo', 'range' => { 'start' => 1, 'interval' => 1 }})
assert_raise { Bigquery.configure(config, schema, processor_count) }

# range.interval is required
config = least_config.merge('range_partitioning' => {'field' => 'foo', 'range' => { 'start' => 1, 'end' => 2 }})
assert_raise { Bigquery.configure(config, schema, processor_count) }

# range.start + range.interval should be less than range.end
config = least_config.merge('range_partitioning' => {'field' => 'foo', 'range' => { 'start' => 1, 'end' => 2, 'interval' => 2 }})
assert_raise { Bigquery.configure(config, schema, processor_count) }
end

def test_clustering
config = least_config.merge('clustering' => {'fields' => ['field_a']})
assert_nothing_raised { Bigquery.configure(config, schema, processor_count) }
Expand Down