Skip to content

feat(spanner_cdc): single worker Spanner CDC support #3371

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 30 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
503a41d
spanner_cdc: initial version
mmatczuk Apr 25, 2025
8630b4c
spanner_cdc: clean streams and tables not properly cleaned by tests l…
mmatczuk May 5, 2025
62b6d8c
spanner_cdc(changestreams): add emulator test harness
mmatczuk Apr 29, 2025
4123e6b
spanner_cdc(changestreams): refactor dialect to use databasepb.Databa…
mmatczuk Apr 29, 2025
1a42777
spanner_cdc(changestreams): improve model
mmatczuk Apr 30, 2025
a135b09
spanner_cdc(changestreams): refactor Reader for flexibility
mmatczuk May 5, 2025
e087204
spanner_cdc(changestreams): add metadata store
mmatczuk May 6, 2025
30a680c
spanner_cdc: add metadata_table config option
mmatczuk May 9, 2025
ede43d5
spanner_cdc(changestreams): new implementation using persistent metad…
mmatczuk May 7, 2025
2ad37c6
spanner_cdc(changestreams): reschedule running and scheduled partitio…
mmatczuk May 19, 2025
8d95883
spanner_cdc(changestreams): extend callback mechanics to allow batching
mmatczuk May 21, 2025
6270ff1
internal/ack: add service.AckFunc util that enforces that ack is call…
mmatczuk May 20, 2025
8057c23
spanner_cdc: switch to service.BatchInput
mmatczuk May 22, 2025
cb0388c
spanner_cdc: use shutdown.Signaller
mmatczuk May 23, 2025
22ba376
spanner_cdc(changestreams): add min watermark cache
mmatczuk May 23, 2025
2d649e3
spanner_cdc: expose min_watermark_cache_ttl
mmatczuk May 23, 2025
f9b72de
spanncr_cdc: expose credentials_json
mmatczuk May 23, 2025
c282ea0
spanncr_cdc(changestreams): modification type filtering
mmatczuk May 23, 2025
42eb3b5
spanncr_cdc: expose allowed_mod_types
mmatczuk May 23, 2025
7a61cb2
spanncr_cdc: fix field declarations and handling
mmatczuk May 26, 2025
2d2625a
chore(spanner_cdc): rewrite tests to testify
mmatczuk May 26, 2025
d99f21f
spanncr_cdc(changestreams): refactor tests for reusability
mmatczuk May 27, 2025
88049bd
spanncr_cdc(changestreams): return error from Start when partition pr…
mmatczuk May 27, 2025
0a5909b
spanncr_cdc(changestreams): add processing order test based on docs
mmatczuk May 27, 2025
7096fcf
chore(spanner_cdc): rename Subscriber.Start() to Run
mmatczuk May 28, 2025
d2ac4e8
spanner_cdc: defer waiting for acks till the end of partition
mmatczuk May 28, 2025
c07c295
spanner_cdc: emit message per DataChangeRecord.Mod item
mmatczuk May 28, 2025
6304c04
spanner_cdc: move all test helper code to changestreamstest and refac…
mmatczuk May 28, 2025
fddd0fb
chore(spanner_cdc): fix linting errors
mmatczuk May 29, 2025
7deab05
spanner_cdc: do not emit column types and embed Mod
mmatczuk May 29, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ linters:
- bodyclose
- perfsprint
path: _test.go
- linters:
- perfsprint
path: internal/impl/gcp/enterprise/changestreams/changestreamstest
- linters:
- perfsprint
path: internal/impl/gcp/enterprise/changestreams/metadata
- linters:
- revive
text: "exported method .*\\.(Close|Connect|Read|ReadBatch|Write|WriteBatch|Process|ProcessBatch|NextBatch|Create|EndOfInput) should have comment or be unexported"
Expand Down
332 changes: 332 additions & 0 deletions docs/modules/components/pages/inputs/gcp_spanner_cdc.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,332 @@
= gcp_spanner_cdc
:type: input
:status: beta
:categories: ["Services","GCP"]



////
THIS FILE IS AUTOGENERATED!

To make changes, edit the corresponding source file under:

https://github.com/redpanda-data/connect/tree/main/internal/impl/<provider>.

And:

https://github.com/redpanda-data/connect/tree/main/cmd/tools/docs_gen/templates/plugin.adoc.tmpl
////

// © 2024 Redpanda Data Inc.


component_type_dropdown::[]


Creates an input that consumes from a spanner change stream.

Introduced in version TODO.


[tabs]
======
Common::
+
--

```yml
# Common config fields, showing default values
input:
label: ""
gcp_spanner_cdc:
credentials_json: ""
project_id: "" # No default (required)
instance_id: "" # No default (required)
database_id: "" # No default (required)
stream_id: "" # No default (required)
start_timestamp: ""
end_timestamp: ""
batching:
count: 0
byte_size: 0
period: ""
check: ""
auto_replay_nacks: true
```

--
Advanced::
+
--

```yml
# All config fields, showing default values
input:
label: ""
gcp_spanner_cdc:
credentials_json: ""
project_id: "" # No default (required)
instance_id: "" # No default (required)
database_id: "" # No default (required)
stream_id: "" # No default (required)
start_timestamp: ""
end_timestamp: ""
heartbeat_interval: 10s
metadata_table: ""
min_watermark_cache_ttl: 5s
allowed_mod_types: [] # No default (optional)
batching:
count: 0
byte_size: 0
period: ""
check: ""
processors: [] # No default (optional)
auto_replay_nacks: true
```

--
======

Consumes change records from a Google Cloud Spanner change stream. This input allows
you to track and process database changes in real-time, making it useful for data
replication, event-driven architectures, and maintaining derived data stores.

The input reads from a specified change stream within a Spanner database and converts
each change record into a message. The message payload contains the change records in
JSON format, and metadata is added with details about the Spanner instance, database,
and stream.

Change streams provide a way to track mutations to your Spanner database tables. For
more information about Spanner change streams, refer to the Google Cloud documentation:
https://cloud.google.com/spanner/docs/change-streams


== Fields

=== `credentials_json`

Base64 encoded GCP service account JSON credentials file for authentication. If not provided, Application Default Credentials (ADC) will be used.


*Type*: `string`

*Default*: `""`

=== `project_id`

GCP project ID containing the Spanner instance


*Type*: `string`


=== `instance_id`

Spanner instance ID


*Type*: `string`


=== `database_id`

Spanner database ID


*Type*: `string`


=== `stream_id`

The name of the change stream to track, the stream must exist in the database. To create a change stream, see https://cloud.google.com/spanner/docs/change-streams/manage.


*Type*: `string`


=== `start_timestamp`

RFC3339 formatted timestamp to start reading from (default: current time)


*Type*: `string`

*Default*: `""`

```yml
# Examples

start_timestamp: "2022-01-01T00:00:00Z"
```

=== `end_timestamp`

RFC3339 formatted timestamp to stop reading at (default: no end time)


*Type*: `string`

*Default*: `""`

```yml
# Examples

end_timestamp: "2022-01-01T00:00:00Z"
```

=== `heartbeat_interval`

Duration string for heartbeat interval


*Type*: `string`

*Default*: `"10s"`

=== `metadata_table`

The table to store metadata in (default: cdc_metadata_<stream_id>)


*Type*: `string`

*Default*: `""`

=== `min_watermark_cache_ttl`

Duration string for frequency of querying Spanner for minimum watermark.


*Type*: `string`

*Default*: `"5s"`

=== `allowed_mod_types`

List of modification types to process. If not specified, all modification types are processed. Allowed values: INSERT, UPDATE, DELETE


*Type*: `array`


```yml
# Examples

allowed_mod_types:
- INSERT
- UPDATE
- DELETE
```

=== `batching`

Allows you to configure a xref:configuration:batching.adoc[batching policy].


*Type*: `object`


```yml
# Examples

batching:
byte_size: 5000
count: 0
period: 1s

batching:
count: 10
period: 1s

batching:
check: this.contains("END BATCH")
count: 0
period: 1m
```

=== `batching.count`

A number of messages at which the batch should be flushed. If `0` disables count based batching.


*Type*: `int`

*Default*: `0`

=== `batching.byte_size`

An amount of bytes at which the batch should be flushed. If `0` disables size based batching.


*Type*: `int`

*Default*: `0`

=== `batching.period`

A period in which an incomplete batch should be flushed regardless of its size.


*Type*: `string`

*Default*: `""`

```yml
# Examples

period: 1s

period: 1m

period: 500ms
```

=== `batching.check`

A xref:guides:bloblang/about.adoc[Bloblang query] that should return a boolean value indicating whether a message should end a batch.


*Type*: `string`

*Default*: `""`

```yml
# Examples

check: this.type == "end_of_transaction"
```

=== `batching.processors`

A list of xref:components:processors/about.adoc[processors] to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.


*Type*: `array`


```yml
# Examples

processors:
- archive:
format: concatenate

processors:
- archive:
format: lines

processors:
- archive:
format: json_array
```

=== `auto_replay_nacks`

Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to `false` these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation.


*Type*: `bool`

*Default*: `true`


2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
cloud.google.com/go/aiplatform v1.74.0
cloud.google.com/go/bigquery v1.66.2
cloud.google.com/go/pubsub v1.47.0
cloud.google.com/go/spanner v1.76.1
cloud.google.com/go/storage v1.51.0
cloud.google.com/go/vertexai v0.12.0
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.18.0
Expand Down Expand Up @@ -172,7 +173,6 @@ require (
cloud.google.com/go/longrunning v0.6.5 // indirect
cloud.google.com/go/monitoring v1.24.1 // indirect
cloud.google.com/go/secretmanager v1.14.5 // indirect
cloud.google.com/go/spanner v1.76.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/keyvault/azsecrets v0.12.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/keyvault/internal v0.7.1 // indirect
github.com/BurntSushi/toml v1.5.0 // indirect
Expand Down
Loading