Skip to content

Commit 9c8829f

Browse files
committed
WIP message per mod
1 parent 656895a commit 9c8829f

File tree

5 files changed

+157
-86
lines changed

5 files changed

+157
-86
lines changed

internal/impl/gcp/enterprise/changestreams/callback.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ package changestreams
1010

1111
import (
1212
"context"
13+
"time"
1314
)
1415

1516
// CallbackFunc is a function that is called for each change record.
@@ -24,7 +25,18 @@ import (
2425
type CallbackFunc func(ctx context.Context, partitionToken string, dcr *DataChangeRecord) error
2526

2627
// UpdatePartitionWatermark updates the watermark for a partition. It's intended
27-
// for use by Callback function to update progress.
28-
func (s *Subscriber) UpdatePartitionWatermark(ctx context.Context, partitionToken string, dcr *DataChangeRecord) error {
29-
return s.store.UpdateWatermark(ctx, partitionToken, dcr.CommitTimestamp)
28+
// for use by Callback function to update progress. If commitTimestamp is zero
29+
// value, the watermark is not updated.
30+
func (s *Subscriber) UpdatePartitionWatermark(
31+
ctx context.Context,
32+
partitionToken string,
33+
commitTimestamp time.Time,
34+
) error {
35+
if commitTimestamp.IsZero() {
36+
return nil
37+
}
38+
39+
s.log.Debugf("%s: updating watermark to %s", partitionToken, commitTimestamp)
40+
41+
return s.store.UpdateWatermark(ctx, partitionToken, commitTimestamp)
3042
}

internal/impl/gcp/enterprise/changestreams/subscriber_integration_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,7 @@ func TestIntegrationSubscriberCallbackUpdatePartitionWatermark(t *testing.T) {
362362
assert.Equal(t, testStartTimestamp, pm.Watermark)
363363

364364
// When UpdatePartitionWatermark is called
365-
if err := s.UpdatePartitionWatermark(ctx, partitionToken, dcr); err != nil {
365+
if err := s.UpdatePartitionWatermark(ctx, partitionToken, dcr.CommitTimestamp); err != nil {
366366
require.NoError(t, err)
367367
}
368368
case 3:

internal/impl/gcp/enterprise/input_spanner_cdc.go

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -220,16 +220,16 @@ func newSpannerCDCReader(conf spannerCDCInputConfig, batching service.BatchPolic
220220
func (r *spannerCDCReader) emit(
221221
ctx context.Context,
222222
partitionToken string,
223-
dcr *changestreams.DataChangeRecord,
224223
msg service.MessageBatch,
224+
commitTimestamp time.Time,
225225
) (*ack.Once, error) {
226226
if len(msg) == 0 {
227227
return nil, nil
228228
}
229229
ackOnce := ack.NewOnce(func(ctx context.Context) error {
230230
// If we processed the message and failed to update the watermark, we
231231
// would try to update it on the next message, no need to return an error here.
232-
if err := r.subscriber.UpdatePartitionWatermark(ctx, partitionToken, dcr); err != nil {
232+
if err := r.subscriber.UpdatePartitionWatermark(ctx, partitionToken, commitTimestamp); err != nil {
233233
r.log.Errorf("%s: failed to update watermark: %v", partitionToken, err)
234234
}
235235
return nil
@@ -259,11 +259,11 @@ func (r *spannerCDCReader) onDataChangeRecord(ctx context.Context, partitionToke
259259
// On partition end, flush the remaining messages and wait for all messages
260260
// to be acked before returning and marking the partition as finished.
261261
if dcr == nil {
262-
msg, last, err := batcher.Flush(ctx)
262+
msg, ts, err := batcher.Flush(ctx)
263263
if err != nil {
264264
return err
265265
}
266-
ack, err := r.emit(ctx, partitionToken, last, msg)
266+
ack, err := r.emit(ctx, partitionToken, msg, ts)
267267
if err != nil {
268268
return err
269269
}
@@ -280,11 +280,11 @@ func (r *spannerCDCReader) onDataChangeRecord(ctx context.Context, partitionToke
280280
}
281281

282282
if dcr == forcePeriodicFlush {
283-
msg, last, err := batcher.Flush(ctx)
283+
msg, ts, err := batcher.Flush(ctx)
284284
if err != nil {
285285
return err
286286
}
287-
ack, err := r.emit(ctx, partitionToken, last, msg)
287+
ack, err := r.emit(ctx, partitionToken, msg, ts)
288288
if err != nil {
289289
return err
290290
}
@@ -293,15 +293,17 @@ func (r *spannerCDCReader) onDataChangeRecord(ctx context.Context, partitionToke
293293
return nil
294294
}
295295

296-
msg, err := batcher.MaybeFlushWith(ctx, dcr)
297-
if err != nil {
298-
return err
296+
iter := batcher.MaybeFlushWith(dcr)
297+
for mb, ts := range iter.Iter(ctx) {
298+
ack, err := r.emit(ctx, partitionToken, mb, ts)
299+
if err != nil {
300+
return err
301+
}
302+
batcher.AddAck(ack)
299303
}
300-
ack, err := r.emit(ctx, partitionToken, dcr, msg)
301-
if err != nil {
304+
if err := iter.Err(); err != nil {
302305
return err
303306
}
304-
batcher.AddAck(ack)
305307

306308
return nil
307309
}

internal/impl/gcp/enterprise/input_spanner_partition_batcher.go

Lines changed: 94 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ import (
1212
"context"
1313
"encoding/json"
1414
"errors"
15-
"fmt"
15+
"iter"
16+
"strconv"
1617
"sync"
1718
"time"
1819

@@ -21,6 +22,93 @@ import (
2122
"github.com/redpanda-data/connect/v4/internal/impl/gcp/enterprise/changestreams"
2223
)
2324

25+
// spannerPartitionBatchIter goes over changestreams.DataChangeRecord.Mods,
26+
// for every mod it creates a message and adds it to the batch, if the batch is
27+
// full, it yields the batch and creates a new one.
28+
//
29+
// Iff batch is returned with nonzero time, when acked the partition watermark
30+
// should be updated to this time.
31+
type spannerPartitionBatchIter struct {
32+
*spannerPartitionBatcher
33+
dcr *changestreams.DataChangeRecord
34+
err error
35+
}
36+
37+
type spannerMod struct {
38+
TableName string
39+
ColumnTypes []*changestreams.ColumnType
40+
Mod *changestreams.Mod
41+
ModType string
42+
}
43+
44+
func (s *spannerPartitionBatchIter) Iter(ctx context.Context) iter.Seq2[service.MessageBatch, time.Time] {
45+
return func(yield func(service.MessageBatch, time.Time) bool) {
46+
defer func() {
47+
s.last = s.dcr
48+
}()
49+
if s.err != nil {
50+
return
51+
}
52+
53+
first := true
54+
for i, m := range s.dcr.Mods {
55+
modData := spannerMod{
56+
TableName: s.dcr.TableName,
57+
ColumnTypes: s.dcr.ColumnTypes,
58+
Mod: m,
59+
ModType: s.dcr.ModType,
60+
}
61+
62+
b, err := json.Marshal(modData)
63+
if err != nil {
64+
s.err = err
65+
return
66+
}
67+
68+
msg := service.NewMessage(b)
69+
msg.MetaSet("commit_timestamp", s.dcr.CommitTimestamp.Format(time.RFC3339Nano))
70+
msg.MetaSet("record_sequence", s.dcr.RecordSequence)
71+
msg.MetaSet("server_transaction_id", s.dcr.ServerTransactionID)
72+
msg.MetaSet("is_last_record_in_transaction_in_partition", strconv.FormatBool(s.dcr.IsLastRecordInTransactionInPartition))
73+
msg.MetaSet("value_capture_type", s.dcr.ValueCaptureType)
74+
msg.MetaSet("number_of_records_in_transaction", strconv.FormatInt(s.dcr.NumberOfRecordsInTransaction, 10))
75+
msg.MetaSet("number_of_partitions_in_transaction", strconv.FormatInt(s.dcr.NumberOfPartitionsInTransaction, 10))
76+
msg.MetaSet("transaction_tag", s.dcr.TransactionTag)
77+
msg.MetaSet("is_system_transaction", strconv.FormatBool(s.dcr.IsSystemTransaction))
78+
79+
if !s.batcher.Add(msg) {
80+
continue
81+
}
82+
83+
mb, err := s.flush(ctx)
84+
if err != nil {
85+
s.err = err
86+
return
87+
}
88+
89+
// Return the watermark to be updated after processing the batch.
90+
// Not every batch should update the watermark, we update watermark
91+
// only after processing the whole DataChangeRecord.
92+
var watermark time.Time
93+
if first && s.last != nil {
94+
watermark = s.last.CommitTimestamp
95+
first = false
96+
}
97+
if i == len(s.dcr.Mods)-1 {
98+
watermark = s.dcr.CommitTimestamp
99+
}
100+
if !yield(mb, watermark) {
101+
return
102+
}
103+
}
104+
}
105+
}
106+
107+
// Err returns any error that occurred during iteration.
108+
func (s *spannerPartitionBatchIter) Err() error {
109+
return s.err
110+
}
111+
24112
type spannerPartitionBatcher struct {
25113
batcher *service.Batcher
26114
last *changestreams.DataChangeRecord
@@ -29,29 +117,17 @@ type spannerPartitionBatcher struct {
29117
rm func()
30118
}
31119

32-
func (s *spannerPartitionBatcher) MaybeFlushWith(ctx context.Context, dcr *changestreams.DataChangeRecord) (service.MessageBatch, error) {
33-
b, err := json.Marshal(dcr)
34-
if err != nil {
35-
return nil, fmt.Errorf("marshal data change record as JSON: %w", err)
36-
}
37-
38-
s.last = dcr
39-
40-
if !s.batcher.Add(service.NewMessage(b)) {
41-
return nil, nil
42-
}
43-
44-
return s.flush(ctx)
120+
func (s *spannerPartitionBatcher) MaybeFlushWith(dcr *changestreams.DataChangeRecord) *spannerPartitionBatchIter {
121+
return &spannerPartitionBatchIter{spannerPartitionBatcher: s, dcr: dcr}
45122
}
46123

47-
func (s *spannerPartitionBatcher) Flush(ctx context.Context) (service.MessageBatch, *changestreams.DataChangeRecord, error) {
124+
func (s *spannerPartitionBatcher) Flush(ctx context.Context) (service.MessageBatch, time.Time, error) {
48125
if s.last == nil {
49-
return nil, nil, nil
126+
return nil, time.Time{}, nil
50127
}
51128

52-
last := s.last
53129
msg, err := s.flush(ctx)
54-
return msg, last, err
130+
return msg, s.last.CommitTimestamp, err
55131
}
56132

57133
func (s *spannerPartitionBatcher) flush(ctx context.Context) (service.MessageBatch, error) {

internal/impl/gcp/enterprise/integration_spanner_cdc_test.go

Lines changed: 33 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -97,11 +97,9 @@ gcp_spanner_cdc:
9797
})
9898
require.NoError(t, err)
9999

100-
expected := []changestreams.DataChangeRecord{
100+
expected := []spannerMod{
101101
{
102-
RecordSequence: "00000000",
103-
IsLastRecordInTransactionInPartition: false,
104-
TableName: h.Table(),
102+
TableName: h.Table(),
105103
ColumnTypes: []*changestreams.ColumnType{
106104
{
107105
Name: "id",
@@ -122,33 +120,24 @@ gcp_spanner_cdc:
122120
OrdinalPosition: 2,
123121
},
124122
},
125-
Mods: []*changestreams.Mod{
126-
{
127-
Keys: spanner.NullJSON{
128-
Value: map[string]interface{}{"id": "1"},
129-
Valid: true,
130-
},
131-
NewValues: spanner.NullJSON{
132-
Value: map[string]interface{}{"active": true},
133-
Valid: true,
134-
},
135-
OldValues: spanner.NullJSON{
136-
Value: map[string]interface{}{},
137-
Valid: true,
138-
},
123+
Mod: &changestreams.Mod{
124+
Keys: spanner.NullJSON{
125+
Value: map[string]interface{}{"id": "1"},
126+
Valid: true,
127+
},
128+
NewValues: spanner.NullJSON{
129+
Value: map[string]interface{}{"active": true},
130+
Valid: true,
131+
},
132+
OldValues: spanner.NullJSON{
133+
Value: map[string]interface{}{},
134+
Valid: true,
139135
},
140136
},
141-
ModType: "INSERT",
142-
ValueCaptureType: "OLD_AND_NEW_VALUES",
143-
NumberOfRecordsInTransaction: 2,
144-
NumberOfPartitionsInTransaction: 1,
145-
TransactionTag: "",
146-
IsSystemTransaction: false,
137+
ModType: "INSERT",
147138
},
148139
{
149-
RecordSequence: "00000001",
150-
IsLastRecordInTransactionInPartition: true,
151-
TableName: h.Table(),
140+
TableName: h.Table(),
152141
ColumnTypes: []*changestreams.ColumnType{
153142
{
154143
Name: "id",
@@ -169,40 +158,32 @@ gcp_spanner_cdc:
169158
OrdinalPosition: 2,
170159
},
171160
},
172-
Mods: []*changestreams.Mod{
173-
{
174-
Keys: spanner.NullJSON{
175-
Value: map[string]interface{}{"id": "1"},
176-
Valid: true,
177-
},
178-
NewValues: spanner.NullJSON{
179-
Value: map[string]interface{}{},
180-
Valid: true,
181-
},
182-
OldValues: spanner.NullJSON{
183-
Value: map[string]interface{}{"active": true},
184-
Valid: true,
185-
},
161+
Mod: &changestreams.Mod{
162+
Keys: spanner.NullJSON{
163+
Value: map[string]interface{}{"id": "1"},
164+
Valid: true,
165+
},
166+
NewValues: spanner.NullJSON{
167+
Value: map[string]interface{}{},
168+
Valid: true,
169+
},
170+
OldValues: spanner.NullJSON{
171+
Value: map[string]interface{}{"active": true},
172+
Valid: true,
186173
},
187174
},
188-
ModType: "DELETE",
189-
ValueCaptureType: "OLD_AND_NEW_VALUES",
190-
NumberOfRecordsInTransaction: 2,
191-
NumberOfPartitionsInTransaction: 1,
192-
TransactionTag: "",
193-
IsSystemTransaction: false,
175+
ModType: "DELETE",
194176
},
195177
}
196178

197-
var got []changestreams.DataChangeRecord
179+
var got []spannerMod
198180
for msg := range messages {
199181
b, err := msg.AsBytes()
200182
require.NoError(t, err)
201183

202-
var dcr changestreams.DataChangeRecord
203-
require.NoError(t, json.Unmarshal(b, &dcr))
204-
205-
got = append(got, dcr)
184+
var mod spannerMod
185+
require.NoError(t, json.Unmarshal(b, &mod))
186+
got = append(got, mod)
206187
if len(got) == len(expected) {
207188
break
208189
}

0 commit comments

Comments
 (0)