Skip to content

Commit de6d861

Browse files
committed
gcp/enterprise/changestreams: extract model to separate files
The model is very pointer heavy, this is due to the spanner APIs. Spanner expects a slice of struct pointers ex. []*ChangeRecord when decoding ARRAY[STRUCT] data. Fixes CON-66
1 parent caae8f2 commit de6d861

File tree

4 files changed

+125
-106
lines changed

4 files changed

+125
-106
lines changed
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// Copyright 2025 Redpanda Data, Inc.
2+
//
3+
// Licensed as a Redpanda Enterprise file under the Redpanda Community
4+
// License (the "License"); you may not use this file except in compliance with
5+
// the License. You may obtain a copy of the License at
6+
//
7+
// https://github.com/redpanda-data/connect/blob/main/licenses/rcl.md
8+
9+
package changestreams
10+
11+
import (
12+
"time"
13+
14+
"cloud.google.com/go/spanner"
15+
)
16+
17+
// ChangeRecord is the single unit of the records from the change stream.
18+
// See https://cloud.google.com/spanner/docs/change-streams/details#change_streams_record_format
19+
type ChangeRecord struct {
20+
DataChangeRecords []*DataChangeRecord `spanner:"data_change_record" json:"data_change_record"`
21+
HeartbeatRecords []*HeartbeatRecord `spanner:"heartbeat_record" json:"heartbeat_record"`
22+
ChildPartitionsRecords []*ChildPartitionsRecord `spanner:"child_partitions_record" json:"child_partitions_record"`
23+
}
24+
25+
// DataChangeRecord contains a set of changes to the table.
26+
// See https://cloud.google.com/spanner/docs/change-streams/details#data-change-records
27+
type DataChangeRecord struct {
28+
CommitTimestamp time.Time `spanner:"commit_timestamp" json:"commit_timestamp"`
29+
RecordSequence string `spanner:"record_sequence" json:"record_sequence"`
30+
ServerTransactionID string `spanner:"server_transaction_id" json:"server_transaction_id"`
31+
IsLastRecordInTransactionInPartition bool `spanner:"is_last_record_in_transaction_in_partition" json:"is_last_record_in_transaction_in_partition"`
32+
TableName string `spanner:"table_name" json:"table_name"`
33+
ColumnTypes []*ColumnType `spanner:"column_types" json:"column_types"`
34+
Mods []*Mod `spanner:"mods" json:"mods"`
35+
ModType string `spanner:"mod_type" json:"mod_type"`
36+
ValueCaptureType string `spanner:"value_capture_type" json:"value_capture_type"`
37+
NumberOfRecordsInTransaction int64 `spanner:"number_of_records_in_transaction" json:"number_of_records_in_transaction"`
38+
NumberOfPartitionsInTransaction int64 `spanner:"number_of_partitions_in_transaction" json:"number_of_partitions_in_transaction"`
39+
TransactionTag string `spanner:"transaction_tag" json:"transaction_tag"`
40+
IsSystemTransaction bool `spanner:"is_system_transaction" json:"is_system_transaction"`
41+
}
42+
43+
// ColumnType is the metadata of the column.
44+
type ColumnType struct {
45+
Name string `spanner:"name" json:"name"`
46+
Type spanner.NullJSON `spanner:"type" json:"type"`
47+
IsPrimaryKey bool `spanner:"is_primary_key" json:"is_primary_key"`
48+
OrdinalPosition int64 `spanner:"ordinal_position" json:"ordinal_position"`
49+
}
50+
51+
// Mod is the changes that were made on the table.
52+
// See https://cloud.google.com/spanner/docs/change-streams/details#heartbeat-records
53+
type Mod struct {
54+
Keys spanner.NullJSON `spanner:"keys" json:"keys"`
55+
NewValues spanner.NullJSON `spanner:"new_values" json:"new_values"`
56+
OldValues spanner.NullJSON `spanner:"old_values" json:"old_values"`
57+
}
58+
59+
// HeartbeatRecord is the heartbeat record returned from Cloud Spanner.
60+
type HeartbeatRecord struct {
61+
Timestamp time.Time `spanner:"timestamp" json:"timestamp"`
62+
}
63+
64+
// ChildPartitionsRecord contains the child partitions of the stream.
65+
// See https://cloud.google.com/spanner/docs/change-streams/details#child-partitions-records
66+
type ChildPartitionsRecord struct {
67+
StartTimestamp time.Time `spanner:"start_timestamp" json:"start_timestamp"`
68+
RecordSequence string `spanner:"record_sequence" json:"record_sequence"`
69+
ChildPartitions []*ChildPartition `spanner:"child_partitions" json:"child_partitions"`
70+
}
71+
72+
// ChildPartition contains the child partition token.
73+
type ChildPartition struct {
74+
Token string `spanner:"token" json:"token"`
75+
ParentPartitionTokens []string `spanner:"parent_partition_tokens" json:"parent_partition_tokens"`
76+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
// Copyright 2025 Redpanda Data, Inc.
2+
//
3+
// Licensed as a Redpanda Enterprise file under the Redpanda Community
4+
// License (the "License"); you may not use this file except in compliance with
5+
// the License. You may obtain a copy of the License at
6+
//
7+
// https://github.com/redpanda-data/connect/blob/main/licenses/rcl.md
8+
9+
package changestreams
10+
11+
import (
12+
"encoding/json"
13+
"fmt"
14+
15+
"cloud.google.com/go/spanner"
16+
)
17+
18+
func decodePostgresRow(row *spanner.Row) (*ChangeRecord, error) {
19+
var col spanner.NullJSON
20+
if err := row.Column(0, &col); err != nil {
21+
return nil, fmt.Errorf("extract column from row: %w", err)
22+
}
23+
24+
b, err := col.MarshalJSON()
25+
if err != nil {
26+
return nil, fmt.Errorf("marshal JSON column: %w", err)
27+
}
28+
29+
var pgcr struct {
30+
DataChangeRecord *DataChangeRecord `json:"data_change_record"`
31+
HeartbeatRecord *HeartbeatRecord `json:"heartbeat_record"`
32+
ChildPartitionsRecord *ChildPartitionsRecord `json:"child_partitions_record"`
33+
}
34+
if err := json.Unmarshal(b, &pgcr); err != nil {
35+
return nil, fmt.Errorf("unmarshal JSON data: %w", err)
36+
}
37+
38+
var cr ChangeRecord
39+
if pgcr.DataChangeRecord != nil {
40+
cr.DataChangeRecords = []*DataChangeRecord{pgcr.DataChangeRecord}
41+
}
42+
if pgcr.HeartbeatRecord != nil {
43+
cr.HeartbeatRecords = []*HeartbeatRecord{pgcr.HeartbeatRecord}
44+
}
45+
if pgcr.ChildPartitionsRecord != nil {
46+
cr.ChildPartitionsRecords = []*ChildPartitionsRecord{pgcr.ChildPartitionsRecord}
47+
}
48+
return &cr, nil
49+
}

internal/impl/gcp/enterprise/changestreams/reader_test.go renamed to internal/impl/gcp/enterprise/changestreams/model_pg_test.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ func TestDecodePostgresRow(t *testing.T) {
5555
}
5656
}`,
5757
want: &ChangeRecord{
58-
DataChangeRecords: []*DataChangeRecord{},
5958
ChildPartitionsRecords: []*ChildPartitionsRecord{
6059
{
6160
StartTimestamp: mustParseTime("2023-02-24T01:06:48.000000-08:00"),
@@ -68,7 +67,6 @@ func TestDecodePostgresRow(t *testing.T) {
6867
},
6968
},
7069
},
71-
HeartbeatRecords: []*HeartbeatRecord{},
7270
},
7371
},
7472
{
@@ -170,8 +168,6 @@ func TestDecodePostgresRow(t *testing.T) {
170168
},
171169
},
172170
},
173-
ChildPartitionsRecords: []*ChildPartitionsRecord{},
174-
HeartbeatRecords: []*HeartbeatRecord{},
175171
},
176172
},
177173
{
@@ -183,8 +179,6 @@ func TestDecodePostgresRow(t *testing.T) {
183179
}
184180
}`,
185181
want: &ChangeRecord{
186-
DataChangeRecords: []*DataChangeRecord{},
187-
ChildPartitionsRecords: []*ChildPartitionsRecord{},
188182
HeartbeatRecords: []*HeartbeatRecord{
189183
{
190184
Timestamp: mustParseTime("2023-02-24T17:16:43.811345-08:00"),

internal/impl/gcp/enterprise/changestreams/reader.go

Lines changed: 0 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ package changestreams
2525

2626
import (
2727
"context"
28-
"encoding/json"
2928
"errors"
3029
"fmt"
3130
"sync"
@@ -42,70 +41,6 @@ type ReadResult struct {
4241
ChangeRecords []*ChangeRecord `spanner:"ChangeRecord" json:"change_record"`
4342
}
4443

45-
// ChangeRecord is the single unit of the records from the change stream.
46-
type ChangeRecord struct {
47-
DataChangeRecords []*DataChangeRecord `spanner:"data_change_record" json:"data_change_record"`
48-
HeartbeatRecords []*HeartbeatRecord `spanner:"heartbeat_record" json:"heartbeat_record"`
49-
ChildPartitionsRecords []*ChildPartitionsRecord `spanner:"child_partitions_record" json:"child_partitions_record"`
50-
}
51-
52-
// DataChangeRecord contains a set of changes to the table.
53-
type DataChangeRecord struct {
54-
CommitTimestamp time.Time `spanner:"commit_timestamp" json:"commit_timestamp"`
55-
RecordSequence string `spanner:"record_sequence" json:"record_sequence"`
56-
ServerTransactionID string `spanner:"server_transaction_id" json:"server_transaction_id"`
57-
IsLastRecordInTransactionInPartition bool `spanner:"is_last_record_in_transaction_in_partition" json:"is_last_record_in_transaction_in_partition"`
58-
TableName string `spanner:"table_name" json:"table_name"`
59-
ColumnTypes []*ColumnType `spanner:"column_types" json:"column_types"`
60-
Mods []*Mod `spanner:"mods" json:"mods"`
61-
ModType string `spanner:"mod_type" json:"mod_type"`
62-
ValueCaptureType string `spanner:"value_capture_type" json:"value_capture_type"`
63-
NumberOfRecordsInTransaction int64 `spanner:"number_of_records_in_transaction" json:"number_of_records_in_transaction"`
64-
NumberOfPartitionsInTransaction int64 `spanner:"number_of_partitions_in_transaction" json:"number_of_partitions_in_transaction"`
65-
TransactionTag string `spanner:"transaction_tag" json:"transaction_tag"`
66-
IsSystemTransaction bool `spanner:"is_system_transaction" json:"is_system_transaction"`
67-
}
68-
69-
// ColumnType is the metadata of the column.
70-
type ColumnType struct {
71-
Name string `spanner:"name" json:"name"`
72-
Type spanner.NullJSON `spanner:"type" json:"type"`
73-
IsPrimaryKey bool `spanner:"is_primary_key" json:"is_primary_key"`
74-
OrdinalPosition int64 `spanner:"ordinal_position" json:"ordinal_position"`
75-
}
76-
77-
// Mod is the changes that were made on the table.
78-
type Mod struct {
79-
Keys spanner.NullJSON `spanner:"keys" json:"keys"`
80-
NewValues spanner.NullJSON `spanner:"new_values" json:"new_values"`
81-
OldValues spanner.NullJSON `spanner:"old_values" json:"old_values"`
82-
}
83-
84-
// HeartbeatRecord is the heartbeat record returned from Cloud Spanner.
85-
type HeartbeatRecord struct {
86-
Timestamp time.Time `spanner:"timestamp" json:"timestamp"`
87-
}
88-
89-
// ChildPartitionsRecord contains the child partitions of the stream.
90-
type ChildPartitionsRecord struct {
91-
StartTimestamp time.Time `spanner:"start_timestamp" json:"start_timestamp"`
92-
RecordSequence string `spanner:"record_sequence" json:"record_sequence"`
93-
ChildPartitions []*ChildPartition `spanner:"child_partitions" json:"child_partitions"`
94-
}
95-
96-
// ChildPartition contains the child partition token.
97-
type ChildPartition struct {
98-
Token string `spanner:"token" json:"token"`
99-
ParentPartitionTokens []string `spanner:"parent_partition_tokens" json:"parent_partition_tokens"`
100-
}
101-
102-
// changeRecordPostgres is an interim struct to decode change stream result for PostgreSQL.
103-
type changeRecordPostgres struct {
104-
DataChangeRecord *DataChangeRecord `spanner:"data_change_record" json:"data_change_record"`
105-
HeartbeatRecord *HeartbeatRecord `spanner:"heartbeat_record" json:"heartbeat_record"`
106-
ChildPartitionsRecord *ChildPartitionsRecord `spanner:"child_partitions_record" json:"child_partitions_record"`
107-
}
108-
10944
type partitionState int
11045

11146
const (
@@ -330,38 +265,3 @@ func (r *Reader) canReadChild(partition *ChildPartition) bool {
330265
}
331266
return true
332267
}
333-
334-
func decodePostgresRow(row *spanner.Row) (*ChangeRecord, error) {
335-
// Retrieve JSON bytes.
336-
var col spanner.NullJSON
337-
if err := row.Column(0, &col); err != nil {
338-
return nil, err
339-
}
340-
jsonBytes, err := col.MarshalJSON()
341-
if err != nil {
342-
return nil, err
343-
}
344-
345-
var changeRecordPG changeRecordPostgres
346-
if err := json.Unmarshal(jsonBytes, &changeRecordPG); err != nil {
347-
return nil, err
348-
}
349-
350-
// Convert to ChangeRecord type.
351-
changeRecord := ChangeRecord{
352-
DataChangeRecords: []*DataChangeRecord{},
353-
HeartbeatRecords: []*HeartbeatRecord{},
354-
ChildPartitionsRecords: []*ChildPartitionsRecord{},
355-
}
356-
if changeRecordPG.DataChangeRecord != nil {
357-
changeRecord.DataChangeRecords = []*DataChangeRecord{changeRecordPG.DataChangeRecord}
358-
}
359-
if changeRecordPG.HeartbeatRecord != nil {
360-
changeRecord.HeartbeatRecords = []*HeartbeatRecord{changeRecordPG.HeartbeatRecord}
361-
}
362-
if changeRecordPG.ChildPartitionsRecord != nil {
363-
changeRecord.ChildPartitionsRecords = []*ChildPartitionsRecord{changeRecordPG.ChildPartitionsRecord}
364-
}
365-
366-
return &changeRecord, nil
367-
}

0 commit comments

Comments
 (0)