Skip to content

Commit 2becebd

Browse files
committed
WIP Subscriber
1 parent 4012a59 commit 2becebd

File tree

2 files changed

+173
-0
lines changed

2 files changed

+173
-0
lines changed
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
// Copyright 2025 Redpanda Data, Inc.
2+
//
3+
// Licensed as a Redpanda Enterprise file under the Redpanda Community
4+
// License (the "License"); you may not use this file except in compliance with
5+
// the License. You may obtain a copy of the License at
6+
//
7+
// https://github.com/redpanda-data/connect/blob/main/licenses/rcl.md
8+
9+
package changestreams
10+
11+
import (
12+
"context"
13+
"slices"
14+
"time"
15+
16+
"github.com/redpanda-data/connect/v4/internal/impl/gcp/enterprise/changestreams/metadata"
17+
)
18+
19+
type timeRange struct {
20+
cur time.Time
21+
end *time.Time
22+
}
23+
24+
func (r *timeRange) tryClaim(t time.Time) bool {
25+
if t.Before(r.cur) {
26+
return false
27+
}
28+
if r.end != nil && r.end.Compare(t) <= 0 {
29+
return false
30+
}
31+
32+
r.cur = t
33+
return true
34+
}
35+
36+
type Subscriber struct {
37+
store *metadata.Store
38+
read timeRange
39+
}
40+
41+
func NewSubscriber(store *metadata.Store, begin time.Time, end *time.Time) *Subscriber {
42+
return &Subscriber{
43+
store: store,
44+
read: timeRange{
45+
cur: begin,
46+
end: end,
47+
},
48+
}
49+
}
50+
51+
func (s *Subscriber) DetectNewPartitions(ctx context.Context) error {
52+
partitions, err := s.store.GetAllPartitionsCreatedAfter(ctx, s.read.cur)
53+
if err != nil {
54+
return err
55+
}
56+
57+
if len(partitions) == 0 {
58+
return nil
59+
}
60+
61+
//for _, := range groupPartitionsByCreatedAt(partitions) {
62+
//
63+
//}
64+
return nil
65+
}
66+
67+
// groupPartitionsByCreatedAt works on partitions sorted by creation time.
68+
func groupPartitionsByCreatedAt(partitions []metadata.PartitionMetadata) [][]metadata.PartitionMetadata {
69+
groups := [][]metadata.PartitionMetadata{{partitions[0]}}
70+
71+
cmp := func(g []metadata.PartitionMetadata, t time.Time) int {
72+
return g[0].CreatedAt.Compare(t)
73+
}
74+
75+
for _, p := range partitions[1:] {
76+
idx, ok := slices.BinarySearchFunc(groups, p.CreatedAt, cmp)
77+
if ok {
78+
groups[idx] = append(groups[idx], p)
79+
} else {
80+
groups = append(groups, []metadata.PartitionMetadata{p})
81+
}
82+
}
83+
84+
return groups
85+
}
86+
87+
func tokensOf(partitions []metadata.PartitionMetadata) []string {
88+
s := make([]string, len(partitions))
89+
for i, p := range partitions {
90+
s[i] = p.PartitionToken
91+
}
92+
return s
93+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// Copyright 2025 Redpanda Data, Inc.
2+
//
3+
// Licensed as a Redpanda Enterprise file under the Redpanda Community
4+
// License (the "License"); you may not use this file except in compliance with
5+
// the License. You may obtain a copy of the License at
6+
//
7+
// https://github.com/redpanda-data/connect/blob/main/licenses/rcl.md
8+
9+
package changestreams
10+
11+
import (
12+
"testing"
13+
"time"
14+
15+
"github.com/google/go-cmp/cmp"
16+
17+
"github.com/redpanda-data/connect/v4/internal/impl/gcp/enterprise/changestreams/metadata"
18+
)
19+
20+
func TestTimeRange(t *testing.T) {
21+
b := time.Unix(0, 10_000)
22+
e := time.Unix(0, 20_000)
23+
r := timeRange{
24+
cur: b,
25+
end: &e,
26+
}
27+
28+
tests := []struct {
29+
time time.Time
30+
expected bool
31+
}{
32+
{time.Unix(0, 10_000), true},
33+
{time.Unix(0, 10_000), true},
34+
{time.Unix(0, 11_000), true},
35+
{time.Unix(0, 11_000), true},
36+
{time.Unix(0, 19_000), true},
37+
{time.Unix(0, 20_000), false},
38+
}
39+
40+
for _, test := range tests {
41+
if r.tryClaim(test.time) != test.expected {
42+
t.Errorf("Expected tryClaim(%v) to be %v", test.time, test.expected)
43+
}
44+
}
45+
}
46+
47+
func TestGroupPartitionsByCreatedAt(t *testing.T) {
48+
pms := []metadata.PartitionMetadata{
49+
{PartitionToken: "a", CreatedAt: time.Unix(0, 10_000)},
50+
{PartitionToken: "b", CreatedAt: time.Unix(0, 10_000)},
51+
{PartitionToken: "c", CreatedAt: time.Unix(0, 20_000)},
52+
{PartitionToken: "d", CreatedAt: time.Unix(0, 20_000)},
53+
}
54+
55+
got := groupPartitionsByCreatedAt(pms)
56+
57+
want := [][]metadata.PartitionMetadata{
58+
{{PartitionToken: "a", CreatedAt: time.Unix(0, 10_000)}, {PartitionToken: "b", CreatedAt: time.Unix(0, 10_000)}},
59+
{{PartitionToken: "c", CreatedAt: time.Unix(0, 20_000)}, {PartitionToken: "d", CreatedAt: time.Unix(0, 20_000)}},
60+
}
61+
62+
if diff := cmp.Diff(got, want); diff != "" {
63+
t.Errorf("groupPartitionsByCreatedAt() mismatch (-want +got):\n%s", diff)
64+
}
65+
}
66+
67+
func TestTokensOf(t *testing.T) {
68+
pms := []metadata.PartitionMetadata{
69+
{PartitionToken: "a"},
70+
{PartitionToken: "b"},
71+
{PartitionToken: "c"},
72+
{PartitionToken: "d"},
73+
}
74+
75+
got := tokensOf(pms)
76+
want := []string{"a", "b", "c", "d"}
77+
if diff := cmp.Diff(got, want); diff != "" {
78+
t.Errorf("tokensOf() mismatch (-want +got):\n%s", diff)
79+
}
80+
}

0 commit comments

Comments
 (0)