Skip to content

Commit 5ca73d3

Browse files
committed
Export the unique, stable, ID from datastore
1 parent 395f2c9 commit 5ca73d3

21 files changed

+148
-48
lines changed

internal/datastore/context.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ func NewSeparatingContextDatastoreProxy(d datastore.Datastore) datastore.Datasto
3939

4040
type ctxProxy struct{ delegate datastore.Datastore }
4141

42+
func (p *ctxProxy) UniqueID(ctx context.Context) (string, error) {
43+
return p.delegate.UniqueID(SeparateContextWithTracing(ctx))
44+
}
45+
4246
func (p *ctxProxy) ReadWriteTx(
4347
ctx context.Context,
4448
f datastore.TxUserFunc,

internal/datastore/crdb/crdb.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"regexp"
88
"strconv"
9+
"sync/atomic"
910
"time"
1011

1112
"github.com/IBM/pgxpoolprometheus"
@@ -263,6 +264,8 @@ type crdbDatastore struct {
263264
pruneGroup *errgroup.Group
264265
ctx context.Context
265266
cancel context.CancelFunc
267+
268+
uniqueID atomic.Pointer[string]
266269
}
267270

268271
func (cds *crdbDatastore) SnapshotReader(rev datastore.Revision) datastore.Reader {

internal/datastore/crdb/stats.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,21 +34,33 @@ var (
3434
).Suffix(fmt.Sprintf("ON CONFLICT (%[1]s) DO UPDATE SET %[2]s = %[3]s.%[2]s + EXCLUDED.%[2]s RETURNING cluster_logical_timestamp()", colID, colCount, tableCounters))
3535

3636
rng = rand.NewSource(time.Now().UnixNano())
37-
38-
uniqueID string
3937
)
4038

41-
func (cds *crdbDatastore) Statistics(ctx context.Context) (datastore.Stats, error) {
42-
if len(uniqueID) == 0 {
39+
func (cds *crdbDatastore) UniqueID(ctx context.Context) (string, error) {
40+
if cds.uniqueID.Load() == nil {
4341
sql, args, err := queryReadUniqueID.ToSql()
4442
if err != nil {
45-
return datastore.Stats{}, fmt.Errorf("unable to prepare unique ID sql: %w", err)
43+
return "", fmt.Errorf("unable to prepare unique ID sql: %w", err)
4644
}
45+
46+
var uniqueID string
4747
if err := cds.readPool.QueryRowFunc(ctx, func(ctx context.Context, row pgx.Row) error {
4848
return row.Scan(&uniqueID)
4949
}, sql, args...); err != nil {
50-
return datastore.Stats{}, fmt.Errorf("unable to query unique ID: %w", err)
50+
return "", fmt.Errorf("unable to query unique ID: %w", err)
5151
}
52+
53+
cds.uniqueID.Store(&uniqueID)
54+
return uniqueID, nil
55+
}
56+
57+
return *cds.uniqueID.Load(), nil
58+
}
59+
60+
func (cds *crdbDatastore) Statistics(ctx context.Context) (datastore.Stats, error) {
61+
uniqueID, err := cds.UniqueID(ctx)
62+
if err != nil {
63+
return datastore.Stats{}, err
5264
}
5365

5466
var nsDefs []datastore.RevisionedNamespace

internal/datastore/memdb/memdb.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,10 @@ type snapshot struct {
9999
db *memdb.MemDB
100100
}
101101

102+
func (mdb *memdbDatastore) UniqueID(_ context.Context) (string, error) {
103+
return mdb.uniqueID, nil
104+
}
105+
102106
func (mdb *memdbDatastore) SnapshotReader(dr datastore.Revision) datastore.Reader {
103107
mdb.RLock()
104108
defer mdb.RUnlock()

internal/datastore/mysql/datastore.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,8 @@ type Datastore struct {
450450
createTxn string
451451
createBaseTxn string
452452

453+
uniqueID atomic.Pointer[string]
454+
453455
*QueryBuilder
454456
*revisions.CachedOptimizedRevisions
455457
revisions.CommonDecoder
@@ -527,7 +529,7 @@ func (mds *Datastore) isSeeded(ctx context.Context) (bool, error) {
527529
return false, nil
528530
}
529531

530-
_, err = mds.getUniqueID(ctx)
532+
_, err = mds.UniqueID(ctx)
531533
if err != nil {
532534
return false, nil
533535
}

internal/datastore/mysql/stats.go

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func (mds *Datastore) Statistics(ctx context.Context) (datastore.Stats, error) {
2828
}
2929
}
3030

31-
uniqueID, err := mds.getUniqueID(ctx)
31+
uniqueID, err := mds.UniqueID(ctx)
3232
if err != nil {
3333
return datastore.Stats{}, err
3434
}
@@ -81,16 +81,20 @@ func (mds *Datastore) Statistics(ctx context.Context) (datastore.Stats, error) {
8181
}, nil
8282
}
8383

84-
func (mds *Datastore) getUniqueID(ctx context.Context) (string, error) {
85-
sql, args, err := sb.Select(metadataUniqueIDColumn).From(mds.driver.Metadata()).ToSql()
86-
if err != nil {
87-
return "", fmt.Errorf("unable to generate query sql: %w", err)
88-
}
84+
func (mds *Datastore) UniqueID(ctx context.Context) (string, error) {
85+
if mds.uniqueID.Load() == nil {
86+
sql, args, err := sb.Select(metadataUniqueIDColumn).From(mds.driver.Metadata()).ToSql()
87+
if err != nil {
88+
return "", fmt.Errorf("unable to generate query sql: %w", err)
89+
}
8990

90-
var uniqueID string
91-
if err := mds.db.QueryRowContext(ctx, sql, args...).Scan(&uniqueID); err != nil {
92-
return "", fmt.Errorf("unable to query unique ID: %w", err)
91+
var uniqueID string
92+
if err := mds.db.QueryRowContext(ctx, sql, args...).Scan(&uniqueID); err != nil {
93+
return "", fmt.Errorf("unable to query unique ID: %w", err)
94+
}
95+
mds.uniqueID.Store(&uniqueID)
96+
return uniqueID, nil
9397
}
9498

95-
return uniqueID, nil
99+
return *mds.uniqueID.Load(), nil
96100
}

internal/datastore/postgres/postgres.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,8 @@ type pgDatastore struct {
300300
maxRetries uint8
301301
watchEnabled bool
302302

303+
uniqueID atomic.Pointer[string]
304+
303305
gcGroup *errgroup.Group
304306
gcCtx context.Context
305307
cancelGc context.CancelFunc
@@ -548,7 +550,7 @@ func (pgd *pgDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, e
548550

549551
if version == headMigration {
550552
// Ensure a datastore ID is present. This ensures the tables have not been truncated.
551-
uniqueID, err := pgd.datastoreUniqueID(ctx)
553+
uniqueID, err := pgd.UniqueID(ctx)
552554
if err != nil {
553555
return datastore.ReadyState{}, fmt.Errorf("database validation failed: %w; if you have previously run `TRUNCATE`, this database is no longer valid and must be remigrated. See: https://spicedb.dev/d/truncate-unsupported", err)
554556
}

internal/datastore/postgres/stats.go

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,25 @@ var (
2828
Where(sq.Eq{colRelname: tableTuple})
2929
)
3030

31-
func (pgd *pgDatastore) datastoreUniqueID(ctx context.Context) (string, error) {
32-
idSQL, idArgs, err := queryUniqueID.ToSql()
33-
if err != nil {
34-
return "", fmt.Errorf("unable to generate query sql: %w", err)
31+
func (pgd *pgDatastore) UniqueID(ctx context.Context) (string, error) {
32+
if pgd.uniqueID.Load() == nil {
33+
idSQL, idArgs, err := queryUniqueID.ToSql()
34+
if err != nil {
35+
return "", fmt.Errorf("unable to generate query sql: %w", err)
36+
}
37+
38+
var uniqueID string
39+
if err := pgx.BeginTxFunc(ctx, pgd.readPool, pgd.readTxOptions, func(tx pgx.Tx) error {
40+
return tx.QueryRow(ctx, idSQL, idArgs...).Scan(&uniqueID)
41+
}); err != nil {
42+
return "", fmt.Errorf("unable to query unique ID: %w", err)
43+
}
44+
45+
pgd.uniqueID.Store(&uniqueID)
46+
return uniqueID, nil
3547
}
3648

37-
var uniqueID string
38-
return uniqueID, pgx.BeginTxFunc(ctx, pgd.readPool, pgd.readTxOptions, func(tx pgx.Tx) error {
39-
return tx.QueryRow(ctx, idSQL, idArgs...).Scan(&uniqueID)
40-
})
49+
return *pgd.uniqueID.Load(), nil
4150
}
4251

4352
func (pgd *pgDatastore) Statistics(ctx context.Context) (datastore.Stats, error) {

internal/datastore/proxy/observable.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ func NewObservableDatastoreProxy(d datastore.Datastore) datastore.Datastore {
6767

6868
type observableProxy struct{ delegate datastore.Datastore }
6969

70+
func (p *observableProxy) UniqueID(ctx context.Context) (string, error) {
71+
return p.delegate.UniqueID(ctx)
72+
}
73+
7074
func (p *observableProxy) SnapshotReader(rev datastore.Revision) datastore.Reader {
7175
delegateReader := p.delegate.SnapshotReader(rev)
7276
return &observableReader{delegateReader}

internal/datastore/proxy/proxy_test/mock.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ type MockDatastore struct {
1515
mock.Mock
1616
}
1717

18+
func (dm *MockDatastore) UniqueID(_ context.Context) (string, error) {
19+
return "mockds", nil
20+
}
21+
1822
func (dm *MockDatastore) SnapshotReader(rev datastore.Revision) datastore.Reader {
1923
args := dm.Called(rev)
2024
return args.Get(0).(datastore.Reader)

internal/datastore/proxy/schemacaching/watchingcache_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,10 @@ type fakeDatastore struct {
337337
lock sync.RWMutex
338338
}
339339

340+
func (fds *fakeDatastore) UniqueID(_ context.Context) (string, error) {
341+
return "fakedsforwatch", nil
342+
}
343+
340344
func (fds *fakeDatastore) updateNamespace(name string, def *corev1.NamespaceDefinition, revision datastore.Revision) {
341345
fds.lock.Lock()
342346
defer fds.lock.Unlock()

internal/datastore/proxy/singleflight.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ type singleflightProxy struct {
2525

2626
var _ datastore.Datastore = (*singleflightProxy)(nil)
2727

28+
func (p *singleflightProxy) UniqueID(ctx context.Context) (string, error) {
29+
return p.delegate.UniqueID(ctx)
30+
}
31+
2832
func (p *singleflightProxy) SnapshotReader(rev datastore.Revision) datastore.Reader {
2933
return p.delegate.SnapshotReader(rev)
3034
}

internal/datastore/spanner/revisions.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313

1414
var ParseRevisionString = revisions.RevisionParser(revisions.Timestamp)
1515

16-
func (sd spannerDatastore) headRevisionInternal(ctx context.Context) (datastore.Revision, error) {
16+
func (sd *spannerDatastore) headRevisionInternal(ctx context.Context) (datastore.Revision, error) {
1717
now, err := sd.now(ctx)
1818
if err != nil {
1919
return datastore.NoRevision, fmt.Errorf(errRevision, err)
@@ -22,11 +22,11 @@ func (sd spannerDatastore) headRevisionInternal(ctx context.Context) (datastore.
2222
return revisions.NewForTime(now), nil
2323
}
2424

25-
func (sd spannerDatastore) HeadRevision(ctx context.Context) (datastore.Revision, error) {
25+
func (sd *spannerDatastore) HeadRevision(ctx context.Context) (datastore.Revision, error) {
2626
return sd.headRevisionInternal(ctx)
2727
}
2828

29-
func (sd spannerDatastore) now(ctx context.Context) (time.Time, error) {
29+
func (sd *spannerDatastore) now(ctx context.Context) (time.Time, error) {
3030
var timestamp time.Time
3131
if err := sd.client.Single().Query(ctx, spanner.NewStatement("SELECT CURRENT_TIMESTAMP()")).Do(func(r *spanner.Row) error {
3232
return r.Columns(&timestamp)

internal/datastore/spanner/spanner.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"os"
77
"regexp"
88
"strconv"
9+
"sync/atomic"
910
"time"
1011

1112
"cloud.google.com/go/spanner"
@@ -78,6 +79,8 @@ type spannerDatastore struct {
7879
client *spanner.Client
7980
config spannerOptions
8081
database string
82+
83+
uniqueID atomic.Pointer[string]
8184
}
8285

8386
// NewSpannerDatastore returns a datastore backed by cloud spanner
@@ -142,7 +145,7 @@ func NewSpannerDatastore(ctx context.Context, database string, opts ...Option) (
142145
maxRevisionStaleness := time.Duration(float64(config.revisionQuantization.Nanoseconds())*
143146
config.maxRevisionStalenessPercent) * time.Nanosecond
144147

145-
ds := spannerDatastore{
148+
ds := &spannerDatastore{
146149
RemoteClockRevisions: revisions.NewRemoteClockRevisions(
147150
defaultChangeStreamRetention,
148151
maxRevisionStaleness,
@@ -194,7 +197,7 @@ func (t *traceableRTX) Query(ctx context.Context, statement spanner.Statement) *
194197
return t.delegate.Query(ctx, statement)
195198
}
196199

197-
func (sd spannerDatastore) SnapshotReader(revisionRaw datastore.Revision) datastore.Reader {
200+
func (sd *spannerDatastore) SnapshotReader(revisionRaw datastore.Revision) datastore.Reader {
198201
r := revisionRaw.(revisions.TimestampRevision)
199202

200203
txSource := func() readTX {
@@ -204,7 +207,7 @@ func (sd spannerDatastore) SnapshotReader(revisionRaw datastore.Revision) datast
204207
return spannerReader{executor, txSource}
205208
}
206209

207-
func (sd spannerDatastore) ReadWriteTx(ctx context.Context, fn datastore.TxUserFunc, opts ...options.RWTOptionsOption) (datastore.Revision, error) {
210+
func (sd *spannerDatastore) ReadWriteTx(ctx context.Context, fn datastore.TxUserFunc, opts ...options.RWTOptionsOption) (datastore.Revision, error) {
208211
config := options.NewRWTOptionsWithOptions(opts...)
209212

210213
ctx, span := tracer.Start(ctx, "ReadWriteTx")
@@ -247,7 +250,7 @@ func (sd spannerDatastore) ReadWriteTx(ctx context.Context, fn datastore.TxUserF
247250
return revisions.NewForTime(ts), nil
248251
}
249252

250-
func (sd spannerDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, error) {
253+
func (sd *spannerDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, error) {
251254
headMigration, err := migrations.SpannerMigrations.HeadRevision()
252255
if err != nil {
253256
return datastore.ReadyState{}, fmt.Errorf("invalid head migration found for spanner: %w", err)
@@ -274,11 +277,11 @@ func (sd spannerDatastore) ReadyState(ctx context.Context) (datastore.ReadyState
274277
}, nil
275278
}
276279

277-
func (sd spannerDatastore) Features(_ context.Context) (*datastore.Features, error) {
280+
func (sd *spannerDatastore) Features(_ context.Context) (*datastore.Features, error) {
278281
return &datastore.Features{Watch: datastore.Feature{Enabled: true}}, nil
279282
}
280283

281-
func (sd spannerDatastore) Close() error {
284+
func (sd *spannerDatastore) Close() error {
282285
sd.client.Close()
283286
return nil
284287
}

internal/datastore/spanner/spanner_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import (
1818
)
1919

2020
// Implement TestableDatastore interface
21-
func (sd spannerDatastore) ExampleRetryableError() error {
21+
func (sd *spannerDatastore) ExampleRetryableError() error {
2222
return status.New(codes.Aborted, "retryable").Err()
2323
}
2424

internal/datastore/spanner/stats.go

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,29 @@ var (
2020
rng = rand.NewSource(time.Now().UnixNano())
2121
)
2222

23-
func (sd spannerDatastore) Statistics(ctx context.Context) (datastore.Stats, error) {
24-
var uniqueID string
25-
if err := sd.client.Single().Read(
26-
context.Background(),
27-
tableMetadata,
28-
spanner.AllKeys(),
29-
[]string{colUniqueID},
30-
).Do(func(r *spanner.Row) error {
31-
return r.Columns(&uniqueID)
32-
}); err != nil {
23+
func (sd *spannerDatastore) UniqueID(ctx context.Context) (string, error) {
24+
if sd.uniqueID.Load() == nil {
25+
var uniqueID string
26+
if err := sd.client.Single().Read(
27+
ctx,
28+
tableMetadata,
29+
spanner.AllKeys(),
30+
[]string{colUniqueID},
31+
).Do(func(r *spanner.Row) error {
32+
return r.Columns(&uniqueID)
33+
}); err != nil {
34+
return "", fmt.Errorf("unable to read unique ID: %w", err)
35+
}
36+
sd.uniqueID.Store(&uniqueID)
37+
return uniqueID, nil
38+
}
39+
40+
return *sd.uniqueID.Load(), nil
41+
}
42+
43+
func (sd *spannerDatastore) Statistics(ctx context.Context) (datastore.Stats, error) {
44+
uniqueID, err := sd.UniqueID(ctx)
45+
if err != nil {
3346
return datastore.Stats{}, fmt.Errorf("unable to read unique ID: %w", err)
3447
}
3548

internal/datastore/spanner/watch.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func parseDatabaseName(db string) (project, instance, database string, err error
5151
return matches[1], matches[2], matches[3], nil
5252
}
5353

54-
func (sd spannerDatastore) Watch(ctx context.Context, afterRevision datastore.Revision, opts datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) {
54+
func (sd *spannerDatastore) Watch(ctx context.Context, afterRevision datastore.Revision, opts datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) {
5555
watchBufferLength := opts.WatchBufferLength
5656
if watchBufferLength <= 0 {
5757
watchBufferLength = sd.watchBufferLength
@@ -65,7 +65,7 @@ func (sd spannerDatastore) Watch(ctx context.Context, afterRevision datastore.Re
6565
return updates, errs
6666
}
6767

68-
func (sd spannerDatastore) watch(
68+
func (sd *spannerDatastore) watch(
6969
ctx context.Context,
7070
afterRevisionRaw datastore.Revision,
7171
opts datastore.WatchOptions,

0 commit comments

Comments
 (0)