Skip to content

Commit f4bea02

Browse files
committed
gcp/enterprise/changestreams: Subscriber initial version
=== RUN TestIntegrationSpannerCDCInput integration_spanner_cdc_test.go:168: Created table "rpcn_test_table_1747147812659865000" and stream "rpcn_test_stream_1747147812659865000" level=info msg="Connecting to Spanner CDC stream: rpcn_test_stream_1747147812659865000 (project: sandbox-rpcn, instance: rpcn-tests, database: changestreams)" @service=benthos label="" path=root.input level=info msg="Sending inproc messages to ID: ed8d8deb-90ed-4b0a-b7a3-6f4b4cd00c26" @service=benthos label="" path=root.output level=debug msg="creating partition metadata table cdc_metadata_rpcn_test_stream_1747147812659865000 if not exist" @service=benthos label="" path=root.input level=info msg="Input type gcp_spanner_cdc is now active" @service=benthos label="" path=root.input level=info msg="starting subscriber" @service=benthos label="" path=root.input level=debug msg="detected unfinished min watermark: 2025-05-13 14:51:21 +0000 UTC" @service=benthos label="" path=root.input level=debug msg="detected 1 new partitions" @service=benthos label="" path=root.input level=debug msg="Parent0: updating partition to running" @service=benthos label="" path=root.input level=debug msg="Parent0: querying partition change stream" @service=benthos label="" path=root.input level=debug msg="Parent0: child partition: token: __8BAYEHAeZvPQAAAYLDT4NycGNuX3Rlc3Rfc3RyZWFtXzE3NDcxNDc4MTI2NTk4NjUwMDAAAYSBAIKAgwjDZAAAAAAAAIQEHbHBFIVnMF8wAAH__4X_BjUFgQPhaYb_BjUF4njkpYeAwGQBAf__, parent partition tokens: []" @service=benthos label="" path=root.input level=debug msg="Parent0: child partition: token: __8BAYEHAeZvPQAAAYLDT4NycGNuX3Rlc3Rfc3RyZWFtXzE3NDcxNDc4MTI2NTk4NjUwMDAAAYSBBgp5WFABAoKAgwjDZAAAAAAAAIQE8dndpoVnMjU4XzEwOTgyNzg5AAH__4X_BjUFdG6nhob_BjUGOVgT74eAwGQBAf__, parent partition tokens: []" @service=benthos label="" path=root.input level=debug msg="Parent0: done querying partition change stream" @service=benthos label="" path=root.input level=debug msg="Parent0: updating partition to finished" @service=benthos label="" path=root.input level=debug msg="detected unfinished min watermark: 2025-05-13 14:51:21 +0000 UTC" @service=benthos label="" path=root.input level=debug msg="detected 2 new partitions" @service=benthos label="" path=root.input level=debug msg="__8BAYEHAeZvPQAAAYLDT4NycGNuX3Rlc3Rfc3RyZWFtXzE3NDcxNDc4MTI2NTk4NjUwMDAAAYSBAIKAgwjDZAAAAAAAAIQEHbHBFIVnMF8wAAH__4X_BjUFgQPhaYb_BjUF4njkpYeAwGQBAf__: updating partition to running" @service=benthos label="" path=root.input level=debug msg="__8BAYEHAeZvPQAAAYLDT4NycGNuX3Rlc3Rfc3RyZWFtXzE3NDcxNDc4MTI2NTk4NjUwMDAAAYSBBgp5WFABAoKAgwjDZAAAAAAAAIQE8dndpoVnMjU4XzEwOTgyNzg5AAH__4X_BjUFdG6nhob_BjUGOVgT74eAwGQBAf__: updating partition to running" @service=benthos label="" path=root.input level=debug msg="__8BAYEHAeZvPQAAAYLDT4NycGNuX3Rlc3Rfc3RyZWFtXzE3NDcxNDc4MTI2NTk4NjUwMDAAAYSBAIKAgwjDZAAAAAAAAIQEHbHBFIVnMF8wAAH__4X_BjUFgQPhaYb_BjUF4njkpYeAwGQBAf__: querying partition change stream" @service=benthos label="" path=root.input level=debug msg="__8BAYEHAeZvPQAAAYLDT4NycGNuX3Rlc3Rfc3RyZWFtXzE3NDcxNDc4MTI2NTk4NjUwMDAAAYSBBgp5WFABAoKAgwjDZAAAAAAAAIQE8dndpoVnMjU4XzEwOTgyNzg5AAH__4X_BjUFdG6nhob_BjUGOVgT74eAwGQBAf__: querying partition change stream" @service=benthos label="" path=root.input level=debug msg="__8BAYEHAeZvPQAAAYLDT4NycGNuX3Rlc3Rfc3RyZWFtXzE3NDcxNDc4MTI2NTk4NjUwMDAAAYSBBgp5WFABAoKAgwjDZAAAAAAAAIQE8dndpoVnMjU4XzEwOTgyNzg5AAH__4X_BjUFdG6nhob_BjUGOVgT74eAwGQBAf__: data change record: table: rpcn_test_table_1747147812659865000, modification type: INSERT, commit timestamp: 2025-05-13 14:51:21.867471 +0000 UTC" @service=benthos label="" path=root.input level=debug msg="__8BAYEHAeZvPQAAAYLDT4NycGNuX3Rlc3Rfc3RyZWFtXzE3NDcxNDc4MTI2NTk4NjUwMDAAAYSBBgp5WFABAoKAgwjDZAAAAAAAAIQE8dndpoVnMjU4XzEwOTgyNzg5AAH__4X_BjUFdG6nhob_BjUGOVgT74eAwGQBAf__: data change record: table: rpcn_test_table_1747147812659865000, modification type: DELETE, commit timestamp: 2025-05-13 14:51:21.867471 +0000 UTC" @service=benthos label="" path=root.input level=debug msg="Waiting for pending acks to resolve before shutting down." @service=benthos label="" path=root.input level=debug msg="Pending acks resolved." @service=benthos label="" path=root.input level=info msg="subscriber stopped" @service=benthos label="" path=root.input level=error msg="error while processing partition __8BAYEHAeZvPQAAAYLDT4NycGNuX3Rlc3Rfc3RyZWFtXzE3NDcxNDc4MTI2NTk4NjUwMDAAAYSBAIKAgwjDZAAAAAAAAIQEHbHBFIVnMF8wAAH__4X_BjUFgQPhaYb_BjUF4njkpYeAwGQBAf__: process partition change stream: get change stream results: spanner: code = \"Canceled\", desc = \"context canceled\", requestID = \"1.708bda666e1f4e83.2.1.47.1\"" @service=benthos label="" path=root.input level=error msg="error while processing partition __8BAYEHAeZvPQAAAYLDT4NycGNuX3Rlc3Rfc3RyZWFtXzE3NDcxNDc4MTI2NTk4NjUwMDAAAYSBBgp5WFABAoKAgwjDZAAAAAAAAIQE8dndpoVnMjU4XzEwOTgyNzg5AAH__4X_BjUFdG6nhob_BjUGOVgT74eAwGQBAf__: process partition change stream: update watermark: updateWatermark: spanner: code = \"Canceled\", desc = \"context canceled\", requestID = \"1.708bda666e1f4e83.2.1.56.1\"" @service=benthos label="" path=root.input integration_spanner_cdc_test.go:174: Dropped table "rpcn_test_table_1747147812659865000" and stream "rpcn_test_stream_1747147812659865000" --- PASS: TestIntegrationSpannerCDCInput (120.99s) PASS ok github.com/redpanda-data/connect/v4/internal/impl/gcp/enterprise 121.598s
1 parent 069c4e8 commit f4bea02

File tree

10 files changed

+686
-313
lines changed

10 files changed

+686
-313
lines changed

internal/impl/gcp/enterprise/changestreams/metadata/metadata.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,11 @@ func NewStore(conf StoreConfig, client *spanner.Client) *Store {
235235
}
236236
}
237237

238+
// Config returns the store configuration.
239+
func (s *Store) Config() StoreConfig {
240+
return s.conf
241+
}
242+
238243
// GetPartition fetches the partition metadata row data for the given partition token.
239244
func (s *Store) GetPartition(ctx context.Context, partitionToken string) (PartitionMetadata, error) {
240245
var stmt spanner.Statement

internal/impl/gcp/enterprise/changestreams/model.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ import (
1414
"time"
1515

1616
"cloud.google.com/go/spanner"
17+
18+
"github.com/redpanda-data/connect/v4/internal/impl/gcp/enterprise/changestreams/metadata"
1719
)
1820

1921
// ChangeRecord is the single unit of the records from the change stream.
@@ -189,7 +191,30 @@ func (cp *ChildPartition) String() string {
189191
cp.Token, cp.ParentPartitionTokens)
190192
}
191193

194+
// toPartitionMetadata converts a ChildPartition to a PartitionMetadata.
195+
// The startTimestamp is taken from the ChildPartitionsRecord.StartTimestamp,
196+
// and represents the earliest timestamp that the child partitions contain
197+
// change records for. The endTimestamp and heartbeatMillis are inherited
198+
// from the parent partition.
199+
func (cp *ChildPartition) toPartitionMetadata(
200+
startTimestamp,
201+
endTimestamp time.Time,
202+
heartbeatMillis int64,
203+
) metadata.PartitionMetadata {
204+
return metadata.PartitionMetadata{
205+
PartitionToken: cp.Token,
206+
ParentTokens: cp.ParentPartitionTokens,
207+
StartTimestamp: startTimestamp,
208+
EndTimestamp: endTimestamp,
209+
HeartbeatMillis: heartbeatMillis,
210+
State: metadata.StateCreated,
211+
Watermark: startTimestamp,
212+
}
213+
}
214+
192215
// TODO(mmt): add splits to metrics
193216
func (cp *ChildPartition) isSplit() bool {
194217
return len(cp.ParentPartitionTokens) == 1
195218
}
219+
220+
const rootPartitionToken = "Parent0"
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
// Copyright 2025 Redpanda Data, Inc.
2+
//
3+
// Licensed as a Redpanda Enterprise file under the Redpanda Community
4+
// License (the "License"); you may not use this file except in compliance with
5+
// the License. You may obtain a copy of the License at
6+
//
7+
// https://github.com/redpanda-data/connect/blob/main/licenses/rcl.md
8+
9+
package changestreams
10+
11+
import (
12+
"context"
13+
"errors"
14+
"fmt"
15+
16+
"cloud.google.com/go/spanner"
17+
"cloud.google.com/go/spanner/apiv1/spannerpb"
18+
"google.golang.org/api/iterator"
19+
20+
"github.com/redpanda-data/connect/v4/internal/impl/gcp/enterprise/changestreams/metadata"
21+
)
22+
23+
type readResult struct {
24+
ChangeRecords []*ChangeRecord `spanner:"ChangeRecord" json:"change_record"`
25+
}
26+
27+
type querier struct {
28+
client *spanner.Client
29+
dialect dialect
30+
streamName string
31+
priority spannerpb.RequestOptions_Priority
32+
}
33+
34+
// query executes a change stream query for the specified stream and partition.
35+
// It processes each record from the change stream and calls the callback function.
36+
func (q *querier) query(
37+
ctx context.Context,
38+
pm metadata.PartitionMetadata,
39+
fn func(ctx context.Context, pm metadata.PartitionMetadata, cr ChangeRecord) error,
40+
) error {
41+
var stmt spanner.Statement
42+
if q.isPostgres() {
43+
stmt = spanner.Statement{
44+
SQL: fmt.Sprintf(`SELECT * FROM spanner.read_json_%s($1, $2, $3, $4, null)`, q.streamName),
45+
Params: map[string]any{
46+
"p1": pm.Watermark,
47+
"p2": pm.EndTimestamp,
48+
"p3": pm.PartitionToken,
49+
"p4": pm.HeartbeatMillis,
50+
},
51+
}
52+
// Convert to NULL
53+
if pm.EndTimestamp.IsZero() {
54+
stmt.Params["p2"] = nil
55+
}
56+
if pm.PartitionToken == "" || pm.PartitionToken == rootPartitionToken {
57+
stmt.Params["p3"] = nil
58+
}
59+
} else {
60+
stmt = spanner.Statement{
61+
SQL: fmt.Sprintf(`SELECT ChangeRecord FROM READ_%s(@start_timestamp, @end_timestamp, @partition_token, @heartbeat_millis)`, q.streamName),
62+
Params: map[string]any{
63+
"start_timestamp": pm.Watermark,
64+
"end_timestamp": pm.EndTimestamp,
65+
"partition_token": pm.PartitionToken,
66+
"heartbeat_millis": pm.HeartbeatMillis,
67+
},
68+
}
69+
// Convert to NULL
70+
if pm.EndTimestamp.IsZero() {
71+
stmt.Params["end_timestamp"] = nil
72+
}
73+
if pm.PartitionToken == "" || pm.PartitionToken == rootPartitionToken {
74+
stmt.Params["partition_token"] = nil
75+
}
76+
}
77+
78+
iter := q.client.Single().QueryWithOptions(ctx, stmt, spanner.QueryOptions{Priority: q.priority})
79+
defer iter.Stop()
80+
81+
for {
82+
row, err := iter.Next()
83+
if err != nil {
84+
if errors.Is(err, iterator.Done) {
85+
break
86+
}
87+
return fmt.Errorf("get change stream results: %w", err)
88+
}
89+
90+
if q.isPostgres() {
91+
cr, err := decodePostgresRow(row)
92+
if err != nil {
93+
return fmt.Errorf("decode postgres row: %w", err)
94+
}
95+
if err := fn(ctx, pm, cr); err != nil {
96+
return err
97+
}
98+
} else {
99+
var rr readResult
100+
if err := row.ToStruct(&rr); err != nil {
101+
return fmt.Errorf("row to struct: %w", err)
102+
}
103+
for _, cr := range rr.ChangeRecords {
104+
if err := fn(ctx, pm, *cr); err != nil {
105+
return err
106+
}
107+
}
108+
}
109+
}
110+
111+
return nil
112+
}
113+
114+
func (q *querier) isPostgres() bool {
115+
return q.dialect == dialectPostgreSQL
116+
}

0 commit comments

Comments
 (0)