@@ -217,25 +217,29 @@ func newSpannerCDCReader(conf spannerCDCInputConfig, batching service.BatchPolic
217
217
}
218
218
}
219
219
220
- func (r * spannerCDCReader ) waitForAck (
220
+ func (r * spannerCDCReader ) emit (
221
221
ctx context.Context ,
222
222
partitionToken string ,
223
223
dcr * changestreams.DataChangeRecord ,
224
224
msg service.MessageBatch ,
225
- ) error {
225
+ ) ( * ack. Once , error ) {
226
226
if len (msg ) == 0 {
227
- return nil
227
+ return nil , nil
228
228
}
229
229
ackOnce := ack .NewOnce (func (ctx context.Context ) error {
230
- return r .subscriber .UpdatePartitionWatermark (ctx , partitionToken , dcr )
230
+ // If we processed the message and failed to update the watermark, we
231
+ // would try to update it on the next message, no need to return an error here.
232
+ if err := r .subscriber .UpdatePartitionWatermark (ctx , partitionToken , dcr ); err != nil {
233
+ r .log .Errorf ("%s: failed to update watermark: %v" , partitionToken , err )
234
+ }
235
+ return nil
231
236
})
232
237
select {
233
238
case <- ctx .Done ():
234
- return ctx .Err ()
239
+ return nil , ctx .Err ()
235
240
case r .resCh <- asyncMessage {msg : msg , ackFn : ackOnce .Ack }:
236
- // ok
241
+ return ackOnce , nil
237
242
}
238
- return ackOnce .Wait (ctx )
239
243
}
240
244
241
245
var forcePeriodicFlush = & changestreams.DataChangeRecord {
@@ -248,30 +252,58 @@ func (r *spannerCDCReader) onDataChangeRecord(ctx context.Context, partitionToke
248
252
return err
249
253
}
250
254
255
+ if err := batcher .AckError (); err != nil {
256
+ return fmt .Errorf ("ack error: %v" , err )
257
+ }
258
+
259
+ // On partition end, flush the remaining messages and wait for all messages
260
+ // to be acked before returning and marking the partition as finished.
251
261
if dcr == nil {
252
262
msg , last , err := batcher .Flush (ctx )
253
263
if err != nil {
254
264
return err
255
265
}
266
+ ack , err := r .emit (ctx , partitionToken , last , msg )
267
+ if err != nil {
268
+ return err
269
+ }
270
+ batcher .AddAck (ack )
271
+
272
+ if err := batcher .WaitAcks (ctx ); err != nil {
273
+ return fmt .Errorf ("ack error: %v" , err )
274
+ }
256
275
if err := batcher .Close (ctx ); err != nil {
257
276
return err
258
277
}
259
- return r .waitForAck (ctx , partitionToken , last , msg )
278
+
279
+ return nil
260
280
}
261
281
262
282
if dcr == forcePeriodicFlush {
263
283
msg , last , err := batcher .Flush (ctx )
264
284
if err != nil {
265
285
return err
266
286
}
267
- return r .waitForAck (ctx , partitionToken , last , msg )
287
+ ack , err := r .emit (ctx , partitionToken , last , msg )
288
+ if err != nil {
289
+ return err
290
+ }
291
+ batcher .AddAck (ack )
292
+
293
+ return nil
268
294
}
269
295
270
296
msg , err := batcher .MaybeFlushWith (ctx , dcr )
271
297
if err != nil {
272
298
return err
273
299
}
274
- return r .waitForAck (ctx , partitionToken , dcr , msg )
300
+ ack , err := r .emit (ctx , partitionToken , dcr , msg )
301
+ if err != nil {
302
+ return err
303
+ }
304
+ batcher .AddAck (ack )
305
+
306
+ return nil
275
307
}
276
308
277
309
func (r * spannerCDCReader ) Connect (ctx context.Context ) error {
0 commit comments