Skip to content

Commit f0e3173

Browse files
committed
spanner_cdc(changestreams): new implementation using persistent metadata store in Spanner
Remove Reader and use new implementation based on metadata package. The integration test is passing. === RUN TestIntegrationSpannerCDCInput integration_spanner_cdc_test.go:168: Created table "rpcn_test_table_1747229402569820000" and stream "rpcn_test_stream_1747229402569820000" level=info msg="Sending inproc messages to ID: b4aaf7e6-fdc0-4976-be9b-dc58abc376bb" @service=benthos label="" path=root.output level=info msg="Connecting to Spanner CDC stream: rpcn_test_stream_1747229402569820000 (project: sandbox-rpcn, instance: rpcn-tests, database: changestreams)" @service=benthos label="" path=root.input level=debug msg="Creating partition metadata table cdc_metadata_rpcn_test_stream_1747229402569820000 if not exist" @service=benthos label="" path=root.input level=info msg="Input type gcp_spanner_cdc is now active" @service=benthos label="" path=root.input level=info msg="Starting subscriber" @service=benthos label="" path=root.input level=info msg="Detected root partition __8BAYEHAeZvPQAAAYLGOINycGNuX3Rlc3Rfc3RyZWFtXzE3NDcyMjk0MDI1Njk4MjAwMDAAAYSBAIKAgwjDZAAAAAAAAIQEHbHBFIVnMF8wAAH__4X_BjUYdYtI7Yb_BjUZNAqgH4eAwGQBAf__" @service=benthos label="" path=root.input level=info msg="Detected root partition __8BAYEHAeZvPQAAAYLGOINycGNuX3Rlc3Rfc3RyZWFtXzE3NDcyMjk0MDI1Njk4MjAwMDAAAYSBBgp5WFABAoKAgwjDZAAAAAAAAIQE8dndpoVnMjU4XzEwOTgyNzg5AAH__4X_BjUYGgpAWYb_BjUY3T6Sn4eAwGQBAf__" @service=benthos label="" path=root.input level=debug msg="Detected unfinished min watermark: 2025-05-14 13:30:31 +0000 UTC" @service=benthos label="" path=root.input level=debug msg="Detected 2 new partitions" @service=benthos label="" path=root.input level=debug msg="__8BAYEHAeZvPQAAAYLGOINycGNuX3Rlc3Rfc3RyZWFtXzE3NDcyMjk0MDI1Njk4MjAwMDAAAYSBAIKAgwjDZAAAAAAAAIQEHbHBFIVnMF8wAAH__4X_BjUYdYtI7Yb_BjUZNAqgH4eAwGQBAf__: updating partition to running" @service=benthos label="" path=root.input level=debug msg="__8BAYEHAeZvPQAAAYLGOINycGNuX3Rlc3Rfc3RyZWFtXzE3NDcyMjk0MDI1Njk4MjAwMDAAAYSBBgp5WFABAoKAgwjDZAAAAAAAAIQE8dndpoVnMjU4XzEwOTgyNzg5AAH__4X_BjUYGgpAWYb_BjUY3T6Sn4eAwGQBAf__: updating partition to running" @service=benthos label="" path=root.input level=debug msg="__8BAYEHAeZvPQAAAYLGOINycGNuX3Rlc3Rfc3RyZWFtXzE3NDcyMjk0MDI1Njk4MjAwMDAAAYSBAIKAgwjDZAAAAAAAAIQEHbHBFIVnMF8wAAH__4X_BjUYdYtI7Yb_BjUZNAqgH4eAwGQBAf__: querying partition change stream" @service=benthos label="" path=root.input level=debug msg="__8BAYEHAeZvPQAAAYLGOINycGNuX3Rlc3Rfc3RyZWFtXzE3NDcyMjk0MDI1Njk4MjAwMDAAAYSBBgp5WFABAoKAgwjDZAAAAAAAAIQE8dndpoVnMjU4XzEwOTgyNzg5AAH__4X_BjUYGgpAWYb_BjUY3T6Sn4eAwGQBAf__: querying partition change stream" @service=benthos label="" path=root.input level=debug msg="Detected unfinished min watermark: 2025-05-14 13:30:31 +0000 UTC" @service=benthos label="" path=root.input level=debug msg="__8BAYEHAeZvPQAAAYLGOINycGNuX3Rlc3Rfc3RyZWFtXzE3NDcyMjk0MDI1Njk4MjAwMDAAAYSBBgp5WFABAoKAgwjDZAAAAAAAAIQE8dndpoVnMjU4XzEwOTgyNzg5AAH__4X_BjUYGgpAWYb_BjUY3T6Sn4eAwGQBAf__: data change record: table: rpcn_test_table_1747229402569820000, modification type: INSERT, commit timestamp: 2025-05-14 13:30:32.186671 +0000 UTC" @service=benthos label="" path=root.input level=debug msg="__8BAYEHAeZvPQAAAYLGOINycGNuX3Rlc3Rfc3RyZWFtXzE3NDcyMjk0MDI1Njk4MjAwMDAAAYSBBgp5WFABAoKAgwjDZAAAAAAAAIQE8dndpoVnMjU4XzEwOTgyNzg5AAH__4X_BjUYGgpAWYb_BjUY3T6Sn4eAwGQBAf__: data change record: table: rpcn_test_table_1747229402569820000, modification type: DELETE, commit timestamp: 2025-05-14 13:30:32.186671 +0000 UTC" @service=benthos label="" path=root.input level=debug msg="Waiting for pending acks to resolve before shutting down." @service=benthos label="" path=root.input level=debug msg="Pending acks resolved." @service=benthos label="" path=root.input level=info msg="Subscriber stopped" @service=benthos label="" path=root.input level=debug msg="__8BAYEHAeZvPQAAAYLGOINycGNuX3Rlc3Rfc3RyZWFtXzE3NDcyMjk0MDI1Njk4MjAwMDAAAYSBAIKAgwjDZAAAAAAAAIQEHbHBFIVnMF8wAAH__4X_BjUYdYtI7Yb_BjUZNAqgH4eAwGQBAf__: context canceled" @service=benthos label="" path=root.input level=debug msg="__8BAYEHAeZvPQAAAYLGOINycGNuX3Rlc3Rfc3RyZWFtXzE3NDcyMjk0MDI1Njk4MjAwMDAAAYSBBgp5WFABAoKAgwjDZAAAAAAAAIQE8dndpoVnMjU4XzEwOTgyNzg5AAH__4X_BjUYGgpAWYb_BjUY3T6Sn4eAwGQBAf__: context canceled" @service=benthos label="" path=root.input integration_spanner_cdc_test.go:174: Dropped table "rpcn_test_table_1747229402569820000" and stream "rpcn_test_stream_1747229402569820000" --- PASS: TestIntegrationSpannerCDCInput (65.83s) PASS ok github.com/redpanda-data/connect/v4/internal/impl/gcp/enterprise 66.854s Fixes CON-67
1 parent ed36364 commit f0e3173

File tree

10 files changed

+770
-315
lines changed

10 files changed

+770
-315
lines changed

internal/impl/gcp/enterprise/changestreams/metadata/metadata.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,11 @@ func NewStore(conf StoreConfig, client *spanner.Client) *Store {
252252
}
253253
}
254254

255+
// Config returns the store configuration.
256+
func (s *Store) Config() StoreConfig {
257+
return s.conf
258+
}
259+
255260
// GetPartition fetches the partition metadata row data for the given partition token.
256261
func (s *Store) GetPartition(ctx context.Context, partitionToken string) (PartitionMetadata, error) {
257262
var stmt spanner.Statement

internal/impl/gcp/enterprise/changestreams/model.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ import (
1414
"time"
1515

1616
"cloud.google.com/go/spanner"
17+
18+
"github.com/redpanda-data/connect/v4/internal/impl/gcp/enterprise/changestreams/metadata"
1719
)
1820

1921
// ChangeRecord is the single unit of the records from the change stream.
@@ -189,7 +191,27 @@ func (cp *ChildPartition) String() string {
189191
cp.Token, cp.ParentPartitionTokens)
190192
}
191193

192-
// TODO(mmt): add splits to metrics
194+
// toPartitionMetadata converts a ChildPartition to a PartitionMetadata.
195+
// The startTimestamp is taken from the ChildPartitionsRecord.StartTimestamp,
196+
// and represents the earliest timestamp that the child partitions contain
197+
// change records for. The endTimestamp and heartbeatMillis are inherited
198+
// from the parent partition.
199+
func (cp *ChildPartition) toPartitionMetadata(
200+
startTimestamp,
201+
endTimestamp time.Time,
202+
heartbeatMillis int64,
203+
) metadata.PartitionMetadata {
204+
return metadata.PartitionMetadata{
205+
PartitionToken: cp.Token,
206+
ParentTokens: cp.ParentPartitionTokens,
207+
StartTimestamp: startTimestamp,
208+
EndTimestamp: endTimestamp,
209+
HeartbeatMillis: heartbeatMillis,
210+
State: metadata.StateCreated,
211+
Watermark: startTimestamp,
212+
}
213+
}
214+
193215
func (cp *ChildPartition) isSplit() bool {
194216
return len(cp.ParentPartitionTokens) == 1
195217
}
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
// Copyright 2025 Redpanda Data, Inc.
2+
//
3+
// Licensed as a Redpanda Enterprise file under the Redpanda Community
4+
// License (the "License"); you may not use this file except in compliance with
5+
// the License. You may obtain a copy of the License at
6+
//
7+
// https://github.com/redpanda-data/connect/blob/main/licenses/rcl.md
8+
9+
package changestreams
10+
11+
import (
12+
"context"
13+
"errors"
14+
"fmt"
15+
16+
"cloud.google.com/go/spanner"
17+
"cloud.google.com/go/spanner/apiv1/spannerpb"
18+
"google.golang.org/api/iterator"
19+
20+
"github.com/redpanda-data/connect/v4/internal/impl/gcp/enterprise/changestreams/metadata"
21+
)
22+
23+
type readResult struct {
24+
ChangeRecords []*ChangeRecord `spanner:"ChangeRecord" json:"change_record"`
25+
}
26+
27+
type querier struct {
28+
client *spanner.Client
29+
dialect dialect
30+
streamName string
31+
priority spannerpb.RequestOptions_Priority
32+
}
33+
34+
// query executes a change stream query for the specified stream and partition.
35+
// It processes each record from the change stream and calls the callback function.
36+
func (q *querier) query(
37+
ctx context.Context,
38+
pm metadata.PartitionMetadata,
39+
cb func(ctx context.Context, pm metadata.PartitionMetadata, cr ChangeRecord) error,
40+
) error {
41+
var stmt spanner.Statement
42+
if q.isPostgres() {
43+
stmt = spanner.Statement{
44+
SQL: fmt.Sprintf(`SELECT * FROM spanner.read_json_%s($1, $2, $3, $4, null)`, q.streamName),
45+
Params: map[string]any{
46+
"p1": pm.Watermark,
47+
"p2": pm.EndTimestamp,
48+
"p3": pm.PartitionToken,
49+
"p4": pm.HeartbeatMillis,
50+
},
51+
}
52+
// Convert to NULL
53+
if pm.EndTimestamp.IsZero() {
54+
stmt.Params["p2"] = nil
55+
}
56+
if pm.PartitionToken == "" {
57+
stmt.Params["p3"] = nil
58+
}
59+
} else {
60+
stmt = spanner.Statement{
61+
SQL: fmt.Sprintf(`SELECT ChangeRecord FROM READ_%s(@start_timestamp, @end_timestamp, @partition_token, @heartbeat_millis)`, q.streamName),
62+
Params: map[string]any{
63+
"start_timestamp": pm.Watermark,
64+
"end_timestamp": pm.EndTimestamp,
65+
"partition_token": pm.PartitionToken,
66+
"heartbeat_millis": pm.HeartbeatMillis,
67+
},
68+
}
69+
// Convert to NULL
70+
if pm.EndTimestamp.IsZero() {
71+
stmt.Params["end_timestamp"] = nil
72+
}
73+
if pm.PartitionToken == "" {
74+
stmt.Params["partition_token"] = nil
75+
}
76+
}
77+
78+
iter := q.client.Single().QueryWithOptions(ctx, stmt, spanner.QueryOptions{Priority: q.priority})
79+
defer iter.Stop()
80+
81+
for {
82+
row, err := iter.Next()
83+
if err != nil {
84+
if errors.Is(err, iterator.Done) {
85+
break
86+
}
87+
return fmt.Errorf("get change stream results: %w", err)
88+
}
89+
90+
if q.isPostgres() {
91+
cr, err := decodePostgresRow(row)
92+
if err != nil {
93+
return fmt.Errorf("decode postgres row: %w", err)
94+
}
95+
if err := cb(ctx, pm, cr); err != nil {
96+
return err
97+
}
98+
} else {
99+
var rr readResult
100+
if err := row.ToStruct(&rr); err != nil {
101+
return fmt.Errorf("row to struct: %w", err)
102+
}
103+
for _, cr := range rr.ChangeRecords {
104+
if err := cb(ctx, pm, *cr); err != nil {
105+
return err
106+
}
107+
}
108+
}
109+
}
110+
111+
return nil
112+
}
113+
114+
func (q *querier) isPostgres() bool {
115+
return q.dialect == dialectPostgreSQL
116+
}

0 commit comments

Comments
 (0)