Skip to content

Single Worker Spanner CDC support #3371

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 29 commits into
base: main
Choose a base branch
from
Open

Single Worker Spanner CDC support #3371

wants to merge 29 commits into from

Conversation

mmatczuk
Copy link
Collaborator

@mmatczuk mmatczuk commented Apr 28, 2025

GCP Spanner CDS support. It only supports single connect instance at the moment.

@mmatczuk mmatczuk marked this pull request as draft April 28, 2025 16:56
@mmatczuk mmatczuk force-pushed the mmt/gcp_spanner branch 5 times, most recently from 52c4f78 to bc31359 Compare April 29, 2025 14:52
@mmatczuk mmatczuk changed the title WIP: Spanner support WIP: Spanner CDC support Apr 29, 2025
Copy link
Contributor

@rockwotj rockwotj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First pass - I like the general direction.

We should likely convert to a batch input - the performance is generally better.

}

func init() {
err := service.RegisterInput("gcp_spanner", spannerInputSpec(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
err := service.RegisterInput("gcp_spanner", spannerInputSpec(),
err := service.RegisterInput("gcp_spanner_cdc", spannerInputSpec(),

Having cdc in the name is important IMO to distinguish that it is CDC because people might just search for CDC

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed types, files, registration and corrected commit messages

@@ -88,6 +88,7 @@ gcp_cloud_storage ,output ,GCP Cloud Storage ,3.43.0 ,certif
gcp_cloudtrace ,tracer ,GCP Cloud Trace ,4.2.0 ,certified ,n ,y ,y
gcp_pubsub ,input ,GCP PubSub ,0.0.0 ,certified ,n ,y ,y
gcp_pubsub ,output ,GCP PubSub ,0.0.0 ,certified ,n ,y ,y
gcp_spanner ,input ,gcp_spanner ,0.0.0 ,community ,n ,n ,n
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • We should update the version here (can wait but needs to be right before merged)
  • This is enterprise not community
  • It should be available in cloud (both last two columns ticked yes

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 1 to 13
// Copyright 2025 Redpanda Data, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs to be our enterprise RCL license. The text is available in the licenses repo or other enterprise components.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to enterprise directory, fixed file headers

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add color to the commit message or doc.go on why we forked this library?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -1,3 +1,16 @@
// Copyright 2025 Redpanda Data, Inc.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we clarify with @davidfoleyrp the right approach here? Generally I haven't seen us add our license on top of an existing one.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most likely it would be almost completely rewritten in the process.

}

func (r *spannerReader) Close(ctx context.Context) error {
r.cancel()
r.reader.Close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we wait for the background fiber to stop too?

Comment on lines 245 to 247
if !r.connected.Load() {
return nil, nil, service.ErrNotConnected
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since these are inherently a bit racy, we generally use shutdown.Signaller to know when the connection drops and when the background fiber has stopped.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I added task to fix it CON-71

t.Skip("Skipping Spanner integration test as required flags are not set")
}

h, err := newSpannerTestHelper()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the test helper, but it'd be nice to take this a step further and move more setup into a helper, it will be nice to have lots of different tests in the future here.

Copy link
Collaborator Author

@mmatczuk mmatczuk Apr 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ported what was available in Go implementations. I recorded a task to port Apache Beam integration tests CON-80. I'd refactor as needed when adding more tests.

@mmatczuk mmatczuk force-pushed the mmt/gcp_spanner branch 2 times, most recently from 7cd5c5c to ba90339 Compare April 30, 2025 12:11
@mmatczuk
Copy link
Collaborator Author

I discussed batching with @Jeffail before, we decided to keep it simple for now. The BatchInput will be implemented in CON-68.

Copy link
Collaborator

@mihaitodor mihaitodor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just left a few nits

@mmatczuk mmatczuk force-pushed the mmt/gcp_spanner branch 7 times, most recently from 2becebd to 86231af Compare May 9, 2025 14:02
@mmatczuk mmatczuk force-pushed the mmt/gcp_spanner branch 5 times, most recently from f4bea02 to 2f7af8a Compare May 14, 2025 14:41
@mmatczuk mmatczuk requested a review from Copilot May 15, 2025 10:36
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds initial support for Spanner CDC by introducing a new input plugin (“gcp_spanner_cdc”) based on the spanner-change-streams-tail code. The key changes include a new plugin entry in info.csv, implementation of the input reader and its configuration parsing, and comprehensive tests (integration and unit) that cover the CDC data record processing and dialect handling.

Reviewed Changes

Copilot reviewed 23 out of 23 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
internal/plugins/info.csv Added a new entry for the gcp_spanner_cdc input plugin.
internal/impl/gcp/enterprise/integration_spanner_cdc_test.go Introduced integration tests for Spanner CDC including table/stream creation, cleanup, and data change record validation.
internal/impl/gcp/enterprise/input_spanner_cdc.go Implemented the CDC input spec and reader with configuration parsing, connection handling, and message emission.
internal/impl/gcp/enterprise/changestreams/* Multiple files added to support change stream query, record decoding, metadata management, dialect detection, and testing.
docs/modules/components/pages/inputs/gcp_spanner_cdc.adoc Generated documentation for the new CDC input.
Various test and helper files Added tests and helper functions for both emulator and real Spanner scenarios.

return fmt.Errorf("setup Spanner change stream reader: %w", err)
}

r.resCh = make(chan *service.Message)
Copy link
Preview

Copilot AI May 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Consider using a buffered channel for r.resCh to prevent potential blocking if the subscriber produces messages faster than they are consumed. For example, using 'make(chan *service.Message, <buffer_size>)' can help improve responsiveness under high throughput.

Suggested change
r.resCh = make(chan *service.Message)
r.resCh = make(chan *service.Message, 100)

Copilot uses AI. Check for mistakes.

@mmatczuk
Copy link
Collaborator Author

Adjusted commit messages prefix to project standards, fixed test.

mmatczuk added 25 commits May 28, 2025 13:08
The model is very pointer heavy, and it cannot be improved.
This is due to the spanner APIs. Spanner expects a slice of struct pointers
ex. []*ChangeRecord when decoding ARRAY[STRUCT] data.

- Extract model to sparate files
- Add relevant sections from Google Spanner docs explaining meaning of relevant fields
- Implement fmt.Stringer

Fixes CON-66
- Move projectID, instanceID, databaseID, streamID to Config
- Introduce isPostgres() helper function (like in Apache Beam)
- Introduce Handler interface
- Rename read to Start do not exit on error
- close client if constructor fails
- make parsing not lenient
…ata store in Spanner

Remove Reader and use new implementation based on metadata package.
The integration test is passing.

=== RUN   TestIntegrationSpannerCDCInput
    integration_spanner_cdc_test.go:168: Created table "rpcn_test_table_1747229402569820000" and stream "rpcn_test_stream_1747229402569820000"
level=info msg="Sending inproc messages to ID: b4aaf7e6-fdc0-4976-be9b-dc58abc376bb" @service=benthos label="" path=root.output
level=info msg="Connecting to Spanner CDC stream: rpcn_test_stream_1747229402569820000 (project: sandbox-rpcn, instance: rpcn-tests, database: changestreams)" @service=benthos label="" path=root.input
level=debug msg="Creating partition metadata table cdc_metadata_rpcn_test_stream_1747229402569820000 if not exist" @service=benthos label="" path=root.input
level=info msg="Input type gcp_spanner_cdc is now active" @service=benthos label="" path=root.input
level=info msg="Starting subscriber" @service=benthos label="" path=root.input
level=info msg="Detected root partition __8BAYEHAeZvPQAAAYLGOINycGNuX3Rlc3Rfc3RyZWFtXzE3NDcyMjk0MDI1Njk4MjAwMDAAAYSBAIKAgwjDZAAAAAAAAIQEHbHBFIVnMF8wAAH__4X_BjUYdYtI7Yb_BjUZNAqgH4eAwGQBAf__" @service=benthos label="" path=root.input
level=info msg="Detected root partition __8BAYEHAeZvPQAAAYLGOINycGNuX3Rlc3Rfc3RyZWFtXzE3NDcyMjk0MDI1Njk4MjAwMDAAAYSBBgp5WFABAoKAgwjDZAAAAAAAAIQE8dndpoVnMjU4XzEwOTgyNzg5AAH__4X_BjUYGgpAWYb_BjUY3T6Sn4eAwGQBAf__" @service=benthos label="" path=root.input
level=debug msg="Detected unfinished min watermark: 2025-05-14 13:30:31 +0000 UTC" @service=benthos label="" path=root.input
level=debug msg="Detected 2 new partitions" @service=benthos label="" path=root.input
level=debug msg="__8BAYEHAeZvPQAAAYLGOINycGNuX3Rlc3Rfc3RyZWFtXzE3NDcyMjk0MDI1Njk4MjAwMDAAAYSBAIKAgwjDZAAAAAAAAIQEHbHBFIVnMF8wAAH__4X_BjUYdYtI7Yb_BjUZNAqgH4eAwGQBAf__: updating partition to running" @service=benthos label="" path=root.input
level=debug msg="__8BAYEHAeZvPQAAAYLGOINycGNuX3Rlc3Rfc3RyZWFtXzE3NDcyMjk0MDI1Njk4MjAwMDAAAYSBBgp5WFABAoKAgwjDZAAAAAAAAIQE8dndpoVnMjU4XzEwOTgyNzg5AAH__4X_BjUYGgpAWYb_BjUY3T6Sn4eAwGQBAf__: updating partition to running" @service=benthos label="" path=root.input
level=debug msg="__8BAYEHAeZvPQAAAYLGOINycGNuX3Rlc3Rfc3RyZWFtXzE3NDcyMjk0MDI1Njk4MjAwMDAAAYSBAIKAgwjDZAAAAAAAAIQEHbHBFIVnMF8wAAH__4X_BjUYdYtI7Yb_BjUZNAqgH4eAwGQBAf__: querying partition change stream" @service=benthos label="" path=root.input
level=debug msg="__8BAYEHAeZvPQAAAYLGOINycGNuX3Rlc3Rfc3RyZWFtXzE3NDcyMjk0MDI1Njk4MjAwMDAAAYSBBgp5WFABAoKAgwjDZAAAAAAAAIQE8dndpoVnMjU4XzEwOTgyNzg5AAH__4X_BjUYGgpAWYb_BjUY3T6Sn4eAwGQBAf__: querying partition change stream" @service=benthos label="" path=root.input
level=debug msg="Detected unfinished min watermark: 2025-05-14 13:30:31 +0000 UTC" @service=benthos label="" path=root.input
level=debug msg="__8BAYEHAeZvPQAAAYLGOINycGNuX3Rlc3Rfc3RyZWFtXzE3NDcyMjk0MDI1Njk4MjAwMDAAAYSBBgp5WFABAoKAgwjDZAAAAAAAAIQE8dndpoVnMjU4XzEwOTgyNzg5AAH__4X_BjUYGgpAWYb_BjUY3T6Sn4eAwGQBAf__: data change record: table: rpcn_test_table_1747229402569820000, modification type: INSERT, commit timestamp: 2025-05-14 13:30:32.186671 +0000 UTC" @service=benthos label="" path=root.input
level=debug msg="__8BAYEHAeZvPQAAAYLGOINycGNuX3Rlc3Rfc3RyZWFtXzE3NDcyMjk0MDI1Njk4MjAwMDAAAYSBBgp5WFABAoKAgwjDZAAAAAAAAIQE8dndpoVnMjU4XzEwOTgyNzg5AAH__4X_BjUYGgpAWYb_BjUY3T6Sn4eAwGQBAf__: data change record: table: rpcn_test_table_1747229402569820000, modification type: DELETE, commit timestamp: 2025-05-14 13:30:32.186671 +0000 UTC" @service=benthos label="" path=root.input
level=debug msg="Waiting for pending acks to resolve before shutting down." @service=benthos label="" path=root.input
level=debug msg="Pending acks resolved." @service=benthos label="" path=root.input
level=info msg="Subscriber stopped" @service=benthos label="" path=root.input
level=debug msg="__8BAYEHAeZvPQAAAYLGOINycGNuX3Rlc3Rfc3RyZWFtXzE3NDcyMjk0MDI1Njk4MjAwMDAAAYSBAIKAgwjDZAAAAAAAAIQEHbHBFIVnMF8wAAH__4X_BjUYdYtI7Yb_BjUZNAqgH4eAwGQBAf__: context canceled" @service=benthos label="" path=root.input
level=debug msg="__8BAYEHAeZvPQAAAYLGOINycGNuX3Rlc3Rfc3RyZWFtXzE3NDcyMjk0MDI1Njk4MjAwMDAAAYSBBgp5WFABAoKAgwjDZAAAAAAAAIQE8dndpoVnMjU4XzEwOTgyNzg5AAH__4X_BjUYGgpAWYb_BjUY3T6Sn4eAwGQBAf__: context canceled" @service=benthos label="" path=root.input
    integration_spanner_cdc_test.go:174: Dropped table "rpcn_test_table_1747229402569820000" and stream "rpcn_test_stream_1747229402569820000"
--- PASS: TestIntegrationSpannerCDCInput (65.83s)
PASS
ok      github.com/redpanda-data/connect/v4/internal/impl/gcp/enterprise        66.854s

Fixes CON-67
…ns on start

On start get PartitionMetadata in SCHEDULED and RUNNING state and reschedule it for execution.
Add lightweight integration test framework, the tests are executed against emulator and mock querier.
It's easier to simulate edge cases using mock.
Also, emulator does not support Change Data Capture querying.

Fixes CON-89
- Add CallbackFunc type and use it in all types
- Add Subscriber.UpdatePartitionWatermark()
- Remove UpdateWatermark() calls in handler, and document that callback is responsible for updating watermark
- Add test cases
Supports two operational modes:
- Without batching period (default): Single goroutine per partition runs Spanner row iterator directly in spannerCDCReader.
- With batching period: periodicallyFlushingSpannerCDCReader provides channel-based callback implementation that synchronizes callbacks with periodic flushes.

Components:
- spannerPartitionBatcher: Manages batching per partition with timer-controlled periodic flushes
- periodicallyFlushingSpannerCDCReader: Coordinates callbacks with flush operations to maintain ordering
- ack.Once: Implements AckFunc with sync.OnceFunc semantics and waiting support

Fixes CON-68
- Mark some fields as Advanced
- Provide examples
- Rename waitForAck() to emit and do not wait for ack
- Add spannerPartitionBatcher.acks and helper methods to work with it
@mmatczuk
Copy link
Collaborator Author

I reworked ack.Once for better clarity.

Copy link
Collaborator

@mihaitodor mihaitodor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just a few small nits. Feel free to 🐑 🚀 if ready

Comment on lines +115 to +117
if rand.Intn(100) > 10 {
return
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be useful to add a comment to explain why this is needed

}

var forcePeriodicFlush = &changestreams.DataChangeRecord{
ServerTransactionID: "just do it",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thought it's annoying for people to search for it in logs if it has spaces

ChangeStreamQueryPriority spannerpb.RequestOptions_Priority
}

// Subscriber is a partition awareSpanner change stream consumer. It reads
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: awareSpanner -> aware Spanner

license.InjectTestService(stream.Resources())

t.Cleanup(func() {
if err := stream.Stop(context.Background()); err != nil { //nolint:usetesting // the cleanup needs to run with fresh context
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: stream.StopWithin

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants