Skip to content

Commit 86231af

Browse files
committed
WIP Subscriber
1 parent eb182df commit 86231af

File tree

2 files changed

+225
-0
lines changed

2 files changed

+225
-0
lines changed
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
// Copyright 2025 Redpanda Data, Inc.
2+
//
3+
// Licensed as a Redpanda Enterprise file under the Redpanda Community
4+
// License (the "License"); you may not use this file except in compliance with
5+
// the License. You may obtain a copy of the License at
6+
//
7+
// https://github.com/redpanda-data/connect/blob/main/licenses/rcl.md
8+
9+
package changestreams
10+
11+
import (
12+
"context"
13+
"fmt"
14+
"slices"
15+
"time"
16+
17+
"cloud.google.com/go/spanner"
18+
"github.com/redpanda-data/benthos/v4/public/service"
19+
20+
"github.com/redpanda-data/connect/v4/internal/impl/gcp/enterprise/changestreams/metadata"
21+
)
22+
23+
type timeRange struct {
24+
cur time.Time
25+
end *time.Time
26+
}
27+
28+
func (r *timeRange) tryClaim(t time.Time) bool {
29+
if t.Before(r.cur) {
30+
return false
31+
}
32+
if r.end != nil && r.end.Compare(t) <= 0 {
33+
return false
34+
}
35+
36+
r.cur = t
37+
return true
38+
}
39+
40+
func (r *timeRange) now() time.Time {
41+
return r.cur
42+
}
43+
44+
type Subscriber struct {
45+
clock timeRange
46+
store *metadata.Store
47+
log *service.Logger
48+
}
49+
50+
func NewSubscriber(ctx context.Context, conf Config, log *service.Logger) (*Subscriber, error) {
51+
dbName := fmt.Sprintf("projects/%s/instances/%s/databases/%s", conf.ProjectID, conf.InstanceID, conf.DatabaseID)
52+
client, err := spanner.NewClientWithConfig(ctx, dbName, conf.SpannerClientConfig, conf.SpannerClientOptions...)
53+
if err != nil {
54+
return nil, err
55+
}
56+
57+
dialect, err := detectDialect(ctx, client)
58+
if err != nil {
59+
client.Close()
60+
return nil, fmt.Errorf("failed to detect dialect: %w", err)
61+
}
62+
63+
var tableNames metadata.TableNames
64+
if conf.MetadataTable != "" {
65+
tableNames = metadata.TableNamesFromExistingTable(conf.DatabaseID, conf.MetadataTable)
66+
} else {
67+
tableNames = metadata.RandomTableNames(conf.DatabaseID)
68+
}
69+
70+
sConf := metadata.StoreConfig{
71+
ProjectID: conf.ProjectID,
72+
InstanceID: conf.InstanceID,
73+
DatabaseID: conf.DatabaseID,
74+
Dialect: dialect,
75+
TableNames: tableNames,
76+
}
77+
78+
store := metadata.NewStore(sConf, client)
79+
80+
clock := timeRange{
81+
cur: conf.StartTimestamp,
82+
end: nil,
83+
}
84+
if !conf.EndTimestamp.IsZero() {
85+
end := conf.EndTimestamp
86+
clock.end = &end
87+
}
88+
89+
return &Subscriber{
90+
clock: clock,
91+
store: store,
92+
log: log,
93+
}, nil
94+
}
95+
96+
func (s *Subscriber) DetectNewPartitions(ctx context.Context) error {
97+
minWatermark, err := s.store.GetUnfinishedMinWatermark(ctx)
98+
if err != nil {
99+
return fmt.Errorf("get unfinished min watermark: %w", err)
100+
}
101+
if s.clock.tryClaim(minWatermark) {
102+
return nil
103+
}
104+
105+
pms, err := s.store.GetAllPartitionsCreatedAfter(ctx, s.clock.now())
106+
if err != nil {
107+
return err
108+
}
109+
if len(pms) == 0 {
110+
return nil
111+
}
112+
113+
for _, g := range groupPartitionsByCreatedAt(pms) {
114+
s.log.Infof("group: %v", g)
115+
}
116+
return nil
117+
}
118+
119+
// groupPartitionsByCreatedAt works on partitions sorted by creation time.
120+
func groupPartitionsByCreatedAt(partitions []metadata.PartitionMetadata) [][]metadata.PartitionMetadata {
121+
groups := [][]metadata.PartitionMetadata{{partitions[0]}}
122+
123+
cmp := func(g []metadata.PartitionMetadata, t time.Time) int {
124+
return g[0].CreatedAt.Compare(t)
125+
}
126+
127+
for _, p := range partitions[1:] {
128+
idx, ok := slices.BinarySearchFunc(groups, p.CreatedAt, cmp)
129+
if ok {
130+
groups[idx] = append(groups[idx], p)
131+
} else {
132+
groups = append(groups, []metadata.PartitionMetadata{p})
133+
}
134+
}
135+
136+
return groups
137+
}
138+
139+
func tokensOf(partitions []metadata.PartitionMetadata) []string {
140+
s := make([]string, len(partitions))
141+
for i, p := range partitions {
142+
s[i] = p.PartitionToken
143+
}
144+
return s
145+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// Copyright 2025 Redpanda Data, Inc.
2+
//
3+
// Licensed as a Redpanda Enterprise file under the Redpanda Community
4+
// License (the "License"); you may not use this file except in compliance with
5+
// the License. You may obtain a copy of the License at
6+
//
7+
// https://github.com/redpanda-data/connect/blob/main/licenses/rcl.md
8+
9+
package changestreams
10+
11+
import (
12+
"testing"
13+
"time"
14+
15+
"github.com/google/go-cmp/cmp"
16+
17+
"github.com/redpanda-data/connect/v4/internal/impl/gcp/enterprise/changestreams/metadata"
18+
)
19+
20+
func TestTimeRange(t *testing.T) {
21+
b := time.Unix(0, 10_000)
22+
e := time.Unix(0, 20_000)
23+
r := timeRange{
24+
cur: b,
25+
end: &e,
26+
}
27+
28+
tests := []struct {
29+
time time.Time
30+
expected bool
31+
}{
32+
{time.Unix(0, 10_000), true},
33+
{time.Unix(0, 10_000), true},
34+
{time.Unix(0, 11_000), true},
35+
{time.Unix(0, 11_000), true},
36+
{time.Unix(0, 19_000), true},
37+
{time.Unix(0, 20_000), false},
38+
}
39+
40+
for _, test := range tests {
41+
if r.tryClaim(test.time) != test.expected {
42+
t.Errorf("Expected tryClaim(%v) to be %v", test.time, test.expected)
43+
}
44+
}
45+
}
46+
47+
func TestGroupPartitionsByCreatedAt(t *testing.T) {
48+
pms := []metadata.PartitionMetadata{
49+
{PartitionToken: "a", CreatedAt: time.Unix(0, 10_000)},
50+
{PartitionToken: "b", CreatedAt: time.Unix(0, 10_000)},
51+
{PartitionToken: "c", CreatedAt: time.Unix(0, 20_000)},
52+
{PartitionToken: "d", CreatedAt: time.Unix(0, 20_000)},
53+
}
54+
55+
got := groupPartitionsByCreatedAt(pms)
56+
57+
want := [][]metadata.PartitionMetadata{
58+
{{PartitionToken: "a", CreatedAt: time.Unix(0, 10_000)}, {PartitionToken: "b", CreatedAt: time.Unix(0, 10_000)}},
59+
{{PartitionToken: "c", CreatedAt: time.Unix(0, 20_000)}, {PartitionToken: "d", CreatedAt: time.Unix(0, 20_000)}},
60+
}
61+
62+
if diff := cmp.Diff(got, want); diff != "" {
63+
t.Errorf("groupPartitionsByCreatedAt() mismatch (-want +got):\n%s", diff)
64+
}
65+
}
66+
67+
func TestTokensOf(t *testing.T) {
68+
pms := []metadata.PartitionMetadata{
69+
{PartitionToken: "a"},
70+
{PartitionToken: "b"},
71+
{PartitionToken: "c"},
72+
{PartitionToken: "d"},
73+
}
74+
75+
got := tokensOf(pms)
76+
want := []string{"a", "b", "c", "d"}
77+
if diff := cmp.Diff(got, want); diff != "" {
78+
t.Errorf("tokensOf() mismatch (-want +got):\n%s", diff)
79+
}
80+
}

0 commit comments

Comments
 (0)