Skip to content

Commit f74f9aa

Browse files
committed
gcp/changestreams: add emulator test harness
1 parent 90f379f commit f74f9aa

File tree

1 file changed

+204
-0
lines changed

1 file changed

+204
-0
lines changed
Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
// Copyright 2025 Redpanda Data, Inc.
2+
//
3+
// Licensed as a Redpanda Enterprise file under the Redpanda Community
4+
// License (the "License"); you may not use this file except in compliance with
5+
// the License. You may obtain a copy of the License at
6+
//
7+
// https://github.com/redpanda-data/connect/blob/main/licenses/rcl.md
8+
9+
package changestreams
10+
11+
import (
12+
"context"
13+
"errors"
14+
"fmt"
15+
"testing"
16+
17+
"cloud.google.com/go/spanner"
18+
adminapi "cloud.google.com/go/spanner/admin/database/apiv1"
19+
adminpb "cloud.google.com/go/spanner/admin/database/apiv1/databasepb"
20+
instance "cloud.google.com/go/spanner/admin/instance/apiv1"
21+
"cloud.google.com/go/spanner/admin/instance/apiv1/instancepb"
22+
"github.com/ory/dockertest/v3"
23+
"github.com/ory/dockertest/v3/docker"
24+
"google.golang.org/api/option"
25+
"google.golang.org/grpc"
26+
"google.golang.org/grpc/credentials/insecure"
27+
)
28+
29+
func startSpannerEmulator(t *testing.T) (addr string) {
30+
pool, err := dockertest.NewPool("")
31+
if err != nil {
32+
t.Fatal(err)
33+
}
34+
35+
t.Log("Starting emulator")
36+
res, err := pool.RunWithOptions(&dockertest.RunOptions{
37+
Repository: "gcr.io/cloud-spanner-emulator/emulator",
38+
Tag: "latest",
39+
Env: []string{
40+
"SPANNER_EMULATOR_HOST=0.0.0.0:9010",
41+
},
42+
ExposedPorts: []string{"9010/tcp"},
43+
PortBindings: map[docker.Port][]docker.PortBinding{
44+
"9010/tcp": {
45+
{HostIP: "0.0.0.0", HostPort: "9010"},
46+
},
47+
},
48+
}, func(cfg *docker.HostConfig) {
49+
cfg.AutoRemove = true
50+
cfg.RestartPolicy = docker.RestartPolicy{
51+
Name: "no",
52+
}
53+
})
54+
if err != nil {
55+
t.Fatal(err)
56+
}
57+
58+
closeFn := func() {
59+
if err := pool.Purge(res); err != nil {
60+
t.Errorf("Failed to purge resource: %v", err)
61+
}
62+
t.Log("Emulator stopped")
63+
}
64+
65+
addr = fmt.Sprintf("localhost:%s", res.GetPort("9010/tcp"))
66+
67+
if err := pool.Retry(func() error {
68+
t.Logf("Waiting for emulator to be ready at %s", addr)
69+
conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
70+
if err != nil {
71+
return err
72+
}
73+
conn.Close()
74+
return nil
75+
}); err != nil {
76+
closeFn()
77+
t.Fatal(err)
78+
}
79+
80+
t.Cleanup(closeFn)
81+
return
82+
}
83+
84+
func createInstance(ctx context.Context, conn *grpc.ClientConn) (string, error) {
85+
const (
86+
emulatorProjectID = "test-project"
87+
emulatorInstanceID = "test-instance"
88+
)
89+
90+
adm, err := instance.NewInstanceAdminClient(ctx,
91+
option.WithGRPCConn(conn),
92+
option.WithoutAuthentication(),
93+
)
94+
if err != nil {
95+
return "", err
96+
}
97+
// Do not close as it will close the grpc connection
98+
99+
op, err := adm.CreateInstance(ctx, &instancepb.CreateInstanceRequest{
100+
Parent: "projects/" + emulatorProjectID,
101+
InstanceId: emulatorInstanceID,
102+
Instance: &instancepb.Instance{
103+
Config: "projects/" + emulatorProjectID + "/instanceConfigs/regional-europe-central2",
104+
DisplayName: emulatorInstanceID,
105+
ProcessingUnits: 100,
106+
},
107+
})
108+
if err != nil {
109+
return "", err
110+
}
111+
112+
resp, err := op.Wait(ctx)
113+
if err != nil {
114+
return "", err
115+
}
116+
117+
return resp.Name, nil
118+
}
119+
120+
type emulatorAdminClient struct {
121+
*adminapi.DatabaseAdminClient
122+
instanceName string
123+
124+
t *testing.T
125+
conn *grpc.ClientConn
126+
}
127+
128+
func newEmulatorHelper(t *testing.T) *emulatorAdminClient {
129+
t.Helper()
130+
131+
// Create a gRPC connection to the emulator
132+
conn, err := grpc.NewClient(startSpannerEmulator(t),
133+
grpc.WithTransportCredentials(insecure.NewCredentials()))
134+
if err != nil {
135+
t.Fatal(err)
136+
}
137+
138+
ctx := t.Context()
139+
140+
// Create an instance
141+
instanceName, err := createInstance(ctx, conn)
142+
if err != nil {
143+
t.Fatal(err)
144+
}
145+
146+
// Create the database admin client with the gRPC connection
147+
adm, err := adminapi.NewDatabaseAdminClient(ctx,
148+
option.WithGRPCConn(conn),
149+
option.WithoutAuthentication())
150+
if err != nil {
151+
t.Fatal(err)
152+
}
153+
154+
return &emulatorAdminClient{
155+
DatabaseAdminClient: adm,
156+
instanceName: instanceName,
157+
158+
t: t,
159+
conn: conn,
160+
}
161+
}
162+
163+
func (a *emulatorAdminClient) fullDatabaseName(dbName string) string {
164+
return fmt.Sprintf("%s/databases/%s", a.instanceName, dbName)
165+
}
166+
167+
func (a *emulatorAdminClient) createTestDatabase(dbName string, opts ...func(*adminpb.CreateDatabaseRequest)) (*spanner.Client, error) {
168+
req := &adminpb.CreateDatabaseRequest{
169+
Parent: a.instanceName,
170+
CreateStatement: fmt.Sprintf("CREATE DATABASE %s", dbName),
171+
}
172+
for _, o := range opts {
173+
o(req)
174+
}
175+
176+
a.t.Logf("Creating test database %q", dbName)
177+
ctx := a.t.Context()
178+
op, err := a.CreateDatabase(ctx, req)
179+
if err != nil {
180+
return nil, err
181+
}
182+
if _, err := op.Wait(ctx); err != nil {
183+
return nil, err
184+
}
185+
c, err := spanner.NewClient(ctx, a.fullDatabaseName(dbName), option.WithGRPCConn(a.conn))
186+
if err != nil {
187+
return nil, err
188+
}
189+
a.t.Cleanup(c.Close)
190+
191+
return c, nil
192+
}
193+
194+
func (a *emulatorAdminClient) CreateTestDatabase(dbName string, opts ...func(*adminpb.CreateDatabaseRequest)) *spanner.Client {
195+
c, err := a.createTestDatabase(dbName, opts...)
196+
if err != nil {
197+
a.t.Fatal(err)
198+
}
199+
return c
200+
}
201+
202+
func (a *emulatorAdminClient) Close() error {
203+
return errors.Join(a.DatabaseAdminClient.Close(), a.conn.Close())
204+
}

0 commit comments

Comments
 (0)