-
Notifications
You must be signed in to change notification settings - Fork 873
Single Worker Spanner CDC support #3371
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
52c4f78
to
bc31359
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First pass - I like the general direction.
We should likely convert to a batch input - the performance is generally better.
internal/impl/gcp/input_spanner.go
Outdated
} | ||
|
||
func init() { | ||
err := service.RegisterInput("gcp_spanner", spannerInputSpec(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
err := service.RegisterInput("gcp_spanner", spannerInputSpec(), | |
err := service.RegisterInput("gcp_spanner_cdc", spannerInputSpec(), |
Having cdc
in the name is important IMO to distinguish that it is CDC because people might just search for CDC
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed types, files, registration and corrected commit messages
internal/plugins/info.csv
Outdated
@@ -88,6 +88,7 @@ gcp_cloud_storage ,output ,GCP Cloud Storage ,3.43.0 ,certif | |||
gcp_cloudtrace ,tracer ,GCP Cloud Trace ,4.2.0 ,certified ,n ,y ,y | |||
gcp_pubsub ,input ,GCP PubSub ,0.0.0 ,certified ,n ,y ,y | |||
gcp_pubsub ,output ,GCP PubSub ,0.0.0 ,certified ,n ,y ,y | |||
gcp_spanner ,input ,gcp_spanner ,0.0.0 ,community ,n ,n ,n |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- We should update the version here (can wait but needs to be right before merged)
- This is enterprise not community
- It should be available in cloud (both last two columns ticked yes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
internal/impl/gcp/input_spanner.go
Outdated
// Copyright 2025 Redpanda Data, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs to be our enterprise RCL license. The text is available in the licenses repo or other enterprise components.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to enterprise directory, fixed file headers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add color to the commit message or doc.go
on why we forked this library?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -1,3 +1,16 @@ | |||
// Copyright 2025 Redpanda Data, Inc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we clarify with @davidfoleyrp the right approach here? Generally I haven't seen us add our license on top of an existing one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most likely it would be almost completely rewritten in the process.
internal/impl/gcp/input_spanner.go
Outdated
} | ||
|
||
func (r *spannerReader) Close(ctx context.Context) error { | ||
r.cancel() | ||
r.reader.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we wait for the background fiber to stop too?
internal/impl/gcp/input_spanner.go
Outdated
if !r.connected.Load() { | ||
return nil, nil, service.ErrNotConnected | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since these are inherently a bit racy, we generally use shutdown.Signaller
to know when the connection drops and when the background fiber has stopped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I added task to fix it CON-71
t.Skip("Skipping Spanner integration test as required flags are not set") | ||
} | ||
|
||
h, err := newSpannerTestHelper() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the test helper, but it'd be nice to take this a step further and move more setup into a helper, it will be nice to have lots of different tests in the future here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ported what was available in Go implementations. I recorded a task to port Apache Beam integration tests CON-80. I'd refactor as needed when adding more tests.
7cd5c5c
to
ba90339
Compare
I discussed batching with @Jeffail before, we decided to keep it simple for now. The BatchInput will be implemented in CON-68. |
ba90339
to
de6d861
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, just left a few nits
2becebd
to
86231af
Compare
f4bea02
to
2f7af8a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds initial support for Spanner CDC by introducing a new input plugin (“gcp_spanner_cdc”) based on the spanner-change-streams-tail code. The key changes include a new plugin entry in info.csv, implementation of the input reader and its configuration parsing, and comprehensive tests (integration and unit) that cover the CDC data record processing and dialect handling.
Reviewed Changes
Copilot reviewed 23 out of 23 changed files in this pull request and generated 1 comment.
Show a summary per file
File | Description |
---|---|
internal/plugins/info.csv | Added a new entry for the gcp_spanner_cdc input plugin. |
internal/impl/gcp/enterprise/integration_spanner_cdc_test.go | Introduced integration tests for Spanner CDC including table/stream creation, cleanup, and data change record validation. |
internal/impl/gcp/enterprise/input_spanner_cdc.go | Implemented the CDC input spec and reader with configuration parsing, connection handling, and message emission. |
internal/impl/gcp/enterprise/changestreams/* | Multiple files added to support change stream query, record decoding, metadata management, dialect detection, and testing. |
docs/modules/components/pages/inputs/gcp_spanner_cdc.adoc | Generated documentation for the new CDC input. |
Various test and helper files | Added tests and helper functions for both emulator and real Spanner scenarios. |
return fmt.Errorf("setup Spanner change stream reader: %w", err) | ||
} | ||
|
||
r.resCh = make(chan *service.Message) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Consider using a buffered channel for r.resCh to prevent potential blocking if the subscriber produces messages faster than they are consumed. For example, using 'make(chan *service.Message, <buffer_size>)' can help improve responsiveness under high throughput.
r.resCh = make(chan *service.Message) | |
r.resCh = make(chan *service.Message, 100) |
Copilot uses AI. Check for mistakes.
Adjusted commit messages prefix to project standards, fixed test. |
…seDialect and add a test Fixes CON-65
The model is very pointer heavy, and it cannot be improved. This is due to the spanner APIs. Spanner expects a slice of struct pointers ex. []*ChangeRecord when decoding ARRAY[STRUCT] data. - Extract model to sparate files - Add relevant sections from Google Spanner docs explaining meaning of relevant fields - Implement fmt.Stringer Fixes CON-66
- Move projectID, instanceID, databaseID, streamID to Config - Introduce isPostgres() helper function (like in Apache Beam) - Introduce Handler interface - Rename read to Start do not exit on error - close client if constructor fails - make parsing not lenient
The implementation is adapted from Apache Beam [1],[2]. [1] https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java [2] https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
…ata store in Spanner Remove Reader and use new implementation based on metadata package. The integration test is passing. === RUN TestIntegrationSpannerCDCInput integration_spanner_cdc_test.go:168: Created table "rpcn_test_table_1747229402569820000" and stream "rpcn_test_stream_1747229402569820000" level=info msg="Sending inproc messages to ID: b4aaf7e6-fdc0-4976-be9b-dc58abc376bb" @service=benthos label="" path=root.output level=info msg="Connecting to Spanner CDC stream: rpcn_test_stream_1747229402569820000 (project: sandbox-rpcn, instance: rpcn-tests, database: changestreams)" @service=benthos label="" path=root.input level=debug msg="Creating partition metadata table cdc_metadata_rpcn_test_stream_1747229402569820000 if not exist" @service=benthos label="" path=root.input level=info msg="Input type gcp_spanner_cdc is now active" @service=benthos label="" path=root.input level=info msg="Starting subscriber" @service=benthos label="" path=root.input level=info msg="Detected root partition __8BAYEHAeZvPQAAAYLGOINycGNuX3Rlc3Rfc3RyZWFtXzE3NDcyMjk0MDI1Njk4MjAwMDAAAYSBAIKAgwjDZAAAAAAAAIQEHbHBFIVnMF8wAAH__4X_BjUYdYtI7Yb_BjUZNAqgH4eAwGQBAf__" @service=benthos label="" path=root.input level=info msg="Detected root partition __8BAYEHAeZvPQAAAYLGOINycGNuX3Rlc3Rfc3RyZWFtXzE3NDcyMjk0MDI1Njk4MjAwMDAAAYSBBgp5WFABAoKAgwjDZAAAAAAAAIQE8dndpoVnMjU4XzEwOTgyNzg5AAH__4X_BjUYGgpAWYb_BjUY3T6Sn4eAwGQBAf__" @service=benthos label="" path=root.input level=debug msg="Detected unfinished min watermark: 2025-05-14 13:30:31 +0000 UTC" @service=benthos label="" path=root.input level=debug msg="Detected 2 new partitions" @service=benthos label="" path=root.input level=debug msg="__8BAYEHAeZvPQAAAYLGOINycGNuX3Rlc3Rfc3RyZWFtXzE3NDcyMjk0MDI1Njk4MjAwMDAAAYSBAIKAgwjDZAAAAAAAAIQEHbHBFIVnMF8wAAH__4X_BjUYdYtI7Yb_BjUZNAqgH4eAwGQBAf__: updating partition to running" @service=benthos label="" path=root.input level=debug msg="__8BAYEHAeZvPQAAAYLGOINycGNuX3Rlc3Rfc3RyZWFtXzE3NDcyMjk0MDI1Njk4MjAwMDAAAYSBBgp5WFABAoKAgwjDZAAAAAAAAIQE8dndpoVnMjU4XzEwOTgyNzg5AAH__4X_BjUYGgpAWYb_BjUY3T6Sn4eAwGQBAf__: updating partition to running" @service=benthos label="" path=root.input level=debug msg="__8BAYEHAeZvPQAAAYLGOINycGNuX3Rlc3Rfc3RyZWFtXzE3NDcyMjk0MDI1Njk4MjAwMDAAAYSBAIKAgwjDZAAAAAAAAIQEHbHBFIVnMF8wAAH__4X_BjUYdYtI7Yb_BjUZNAqgH4eAwGQBAf__: querying partition change stream" @service=benthos label="" path=root.input level=debug msg="__8BAYEHAeZvPQAAAYLGOINycGNuX3Rlc3Rfc3RyZWFtXzE3NDcyMjk0MDI1Njk4MjAwMDAAAYSBBgp5WFABAoKAgwjDZAAAAAAAAIQE8dndpoVnMjU4XzEwOTgyNzg5AAH__4X_BjUYGgpAWYb_BjUY3T6Sn4eAwGQBAf__: querying partition change stream" @service=benthos label="" path=root.input level=debug msg="Detected unfinished min watermark: 2025-05-14 13:30:31 +0000 UTC" @service=benthos label="" path=root.input level=debug msg="__8BAYEHAeZvPQAAAYLGOINycGNuX3Rlc3Rfc3RyZWFtXzE3NDcyMjk0MDI1Njk4MjAwMDAAAYSBBgp5WFABAoKAgwjDZAAAAAAAAIQE8dndpoVnMjU4XzEwOTgyNzg5AAH__4X_BjUYGgpAWYb_BjUY3T6Sn4eAwGQBAf__: data change record: table: rpcn_test_table_1747229402569820000, modification type: INSERT, commit timestamp: 2025-05-14 13:30:32.186671 +0000 UTC" @service=benthos label="" path=root.input level=debug msg="__8BAYEHAeZvPQAAAYLGOINycGNuX3Rlc3Rfc3RyZWFtXzE3NDcyMjk0MDI1Njk4MjAwMDAAAYSBBgp5WFABAoKAgwjDZAAAAAAAAIQE8dndpoVnMjU4XzEwOTgyNzg5AAH__4X_BjUYGgpAWYb_BjUY3T6Sn4eAwGQBAf__: data change record: table: rpcn_test_table_1747229402569820000, modification type: DELETE, commit timestamp: 2025-05-14 13:30:32.186671 +0000 UTC" @service=benthos label="" path=root.input level=debug msg="Waiting for pending acks to resolve before shutting down." @service=benthos label="" path=root.input level=debug msg="Pending acks resolved." @service=benthos label="" path=root.input level=info msg="Subscriber stopped" @service=benthos label="" path=root.input level=debug msg="__8BAYEHAeZvPQAAAYLGOINycGNuX3Rlc3Rfc3RyZWFtXzE3NDcyMjk0MDI1Njk4MjAwMDAAAYSBAIKAgwjDZAAAAAAAAIQEHbHBFIVnMF8wAAH__4X_BjUYdYtI7Yb_BjUZNAqgH4eAwGQBAf__: context canceled" @service=benthos label="" path=root.input level=debug msg="__8BAYEHAeZvPQAAAYLGOINycGNuX3Rlc3Rfc3RyZWFtXzE3NDcyMjk0MDI1Njk4MjAwMDAAAYSBBgp5WFABAoKAgwjDZAAAAAAAAIQE8dndpoVnMjU4XzEwOTgyNzg5AAH__4X_BjUYGgpAWYb_BjUY3T6Sn4eAwGQBAf__: context canceled" @service=benthos label="" path=root.input integration_spanner_cdc_test.go:174: Dropped table "rpcn_test_table_1747229402569820000" and stream "rpcn_test_stream_1747229402569820000" --- PASS: TestIntegrationSpannerCDCInput (65.83s) PASS ok github.com/redpanda-data/connect/v4/internal/impl/gcp/enterprise 66.854s Fixes CON-67
…ns on start On start get PartitionMetadata in SCHEDULED and RUNNING state and reschedule it for execution. Add lightweight integration test framework, the tests are executed against emulator and mock querier. It's easier to simulate edge cases using mock. Also, emulator does not support Change Data Capture querying. Fixes CON-89
- Add CallbackFunc type and use it in all types - Add Subscriber.UpdatePartitionWatermark() - Remove UpdateWatermark() calls in handler, and document that callback is responsible for updating watermark - Add test cases
Supports two operational modes: - Without batching period (default): Single goroutine per partition runs Spanner row iterator directly in spannerCDCReader. - With batching period: periodicallyFlushingSpannerCDCReader provides channel-based callback implementation that synchronizes callbacks with periodic flushes. Components: - spannerPartitionBatcher: Manages batching per partition with timer-controlled periodic flushes - periodicallyFlushingSpannerCDCReader: Coordinates callbacks with flush operations to maintain ordering - ack.Once: Implements AckFunc with sync.OnceFunc semantics and waiting support Fixes CON-68
Fixes CON-71
Fixes CON-92
Fixes CON-86
Fixes CON-81
- Mark some fields as Advanced - Provide examples
- Rename waitForAck() to emit and do not wait for ack - Add spannerPartitionBatcher.acks and helper methods to work with it
I reworked ack.Once for better clarity. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, just a few small nits. Feel free to 🐑 🚀 if ready
if rand.Intn(100) > 10 { | ||
return | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be useful to add a comment to explain why this is needed
} | ||
|
||
var forcePeriodicFlush = &changestreams.DataChangeRecord{ | ||
ServerTransactionID: "just do it", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just thought it's annoying for people to search for it in logs if it has spaces
ChangeStreamQueryPriority spannerpb.RequestOptions_Priority | ||
} | ||
|
||
// Subscriber is a partition awareSpanner change stream consumer. It reads |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: awareSpanner -> aware Spanner
license.InjectTestService(stream.Resources()) | ||
|
||
t.Cleanup(func() { | ||
if err := stream.Stop(context.Background()); err != nil { //nolint:usetesting // the cleanup needs to run with fresh context |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: stream.StopWithin
GCP Spanner CDS support. It only supports single connect instance at the moment.