Skip to content

Commit 86e891a

Browse files
committed
Export the unique, stable, ID from datastore
1 parent 493063f commit 86e891a

21 files changed

+148
-47
lines changed

internal/datastore/context.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ func NewSeparatingContextDatastoreProxy(d datastore.Datastore) datastore.Datasto
3939

4040
type ctxProxy struct{ delegate datastore.Datastore }
4141

42+
func (p *ctxProxy) UniqueID(ctx context.Context) (string, error) {
43+
return p.delegate.UniqueID(SeparateContextWithTracing(ctx))
44+
}
45+
4246
func (p *ctxProxy) ReadWriteTx(
4347
ctx context.Context,
4448
f datastore.TxUserFunc,

internal/datastore/crdb/crdb.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"regexp"
88
"strconv"
9+
"sync/atomic"
910
"time"
1011

1112
"github.com/IBM/pgxpoolprometheus"
@@ -273,6 +274,8 @@ type crdbDatastore struct {
273274
pruneGroup *errgroup.Group
274275
ctx context.Context
275276
cancel context.CancelFunc
277+
278+
uniqueID atomic.Pointer[string]
276279
}
277280

278281
func (cds *crdbDatastore) SnapshotReader(rev datastore.Revision) datastore.Reader {

internal/datastore/crdb/stats.go

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,33 @@ const (
2020

2121
var (
2222
queryReadUniqueID = psql.Select(colUniqueID).From(tableMetadata)
23-
uniqueID string
2423
)
2524

26-
func (cds *crdbDatastore) Statistics(ctx context.Context) (datastore.Stats, error) {
27-
if len(uniqueID) == 0 {
25+
func (cds *crdbDatastore) UniqueID(ctx context.Context) (string, error) {
26+
if cds.uniqueID.Load() == nil {
2827
sql, args, err := queryReadUniqueID.ToSql()
2928
if err != nil {
30-
return datastore.Stats{}, fmt.Errorf("unable to prepare unique ID sql: %w", err)
29+
return "", fmt.Errorf("unable to prepare unique ID sql: %w", err)
3130
}
31+
32+
var uniqueID string
3233
if err := cds.readPool.QueryRowFunc(ctx, func(ctx context.Context, row pgx.Row) error {
3334
return row.Scan(&uniqueID)
3435
}, sql, args...); err != nil {
35-
return datastore.Stats{}, fmt.Errorf("unable to query unique ID: %w", err)
36+
return "", fmt.Errorf("unable to query unique ID: %w", err)
3637
}
38+
39+
cds.uniqueID.Store(&uniqueID)
40+
return uniqueID, nil
41+
}
42+
43+
return *cds.uniqueID.Load(), nil
44+
}
45+
46+
func (cds *crdbDatastore) Statistics(ctx context.Context) (datastore.Stats, error) {
47+
uniqueID, err := cds.UniqueID(ctx)
48+
if err != nil {
49+
return datastore.Stats{}, err
3750
}
3851

3952
var nsDefs []datastore.RevisionedNamespace

internal/datastore/memdb/memdb.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,10 @@ type snapshot struct {
9999
db *memdb.MemDB
100100
}
101101

102+
func (mdb *memdbDatastore) UniqueID(_ context.Context) (string, error) {
103+
return mdb.uniqueID, nil
104+
}
105+
102106
func (mdb *memdbDatastore) SnapshotReader(dr datastore.Revision) datastore.Reader {
103107
mdb.RLock()
104108
defer mdb.RUnlock()

internal/datastore/mysql/datastore.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,8 @@ type Datastore struct {
448448
createTxn string
449449
createBaseTxn string
450450

451+
uniqueID atomic.Pointer[string]
452+
451453
*QueryBuilder
452454
*revisions.CachedOptimizedRevisions
453455
revisions.CommonDecoder
@@ -524,7 +526,7 @@ func (mds *Datastore) isSeeded(ctx context.Context) (bool, error) {
524526
return false, nil
525527
}
526528

527-
_, err = mds.getUniqueID(ctx)
529+
_, err = mds.UniqueID(ctx)
528530
if err != nil {
529531
return false, nil
530532
}

internal/datastore/mysql/stats.go

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func (mds *Datastore) Statistics(ctx context.Context) (datastore.Stats, error) {
2828
}
2929
}
3030

31-
uniqueID, err := mds.getUniqueID(ctx)
31+
uniqueID, err := mds.UniqueID(ctx)
3232
if err != nil {
3333
return datastore.Stats{}, err
3434
}
@@ -81,16 +81,20 @@ func (mds *Datastore) Statistics(ctx context.Context) (datastore.Stats, error) {
8181
}, nil
8282
}
8383

84-
func (mds *Datastore) getUniqueID(ctx context.Context) (string, error) {
85-
sql, args, err := sb.Select(metadataUniqueIDColumn).From(mds.driver.Metadata()).ToSql()
86-
if err != nil {
87-
return "", fmt.Errorf("unable to generate query sql: %w", err)
88-
}
84+
func (mds *Datastore) UniqueID(ctx context.Context) (string, error) {
85+
if mds.uniqueID.Load() == nil {
86+
sql, args, err := sb.Select(metadataUniqueIDColumn).From(mds.driver.Metadata()).ToSql()
87+
if err != nil {
88+
return "", fmt.Errorf("unable to generate query sql: %w", err)
89+
}
8990

90-
var uniqueID string
91-
if err := mds.db.QueryRowContext(ctx, sql, args...).Scan(&uniqueID); err != nil {
92-
return "", fmt.Errorf("unable to query unique ID: %w", err)
91+
var uniqueID string
92+
if err := mds.db.QueryRowContext(ctx, sql, args...).Scan(&uniqueID); err != nil {
93+
return "", fmt.Errorf("unable to query unique ID: %w", err)
94+
}
95+
mds.uniqueID.Store(&uniqueID)
96+
return uniqueID, nil
9397
}
9498

95-
return uniqueID, nil
99+
return *mds.uniqueID.Load(), nil
96100
}

internal/datastore/postgres/postgres.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,8 @@ type pgDatastore struct {
300300
maxRetries uint8
301301
watchEnabled bool
302302

303+
uniqueID atomic.Pointer[string]
304+
303305
gcGroup *errgroup.Group
304306
gcCtx context.Context
305307
cancelGc context.CancelFunc
@@ -547,7 +549,7 @@ func (pgd *pgDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, e
547549

548550
if version == headMigration {
549551
// Ensure a datastore ID is present. This ensures the tables have not been truncated.
550-
uniqueID, err := pgd.datastoreUniqueID(ctx)
552+
uniqueID, err := pgd.UniqueID(ctx)
551553
if err != nil {
552554
return datastore.ReadyState{}, fmt.Errorf("database validation failed: %w; if you have previously run `TRUNCATE`, this database is no longer valid and must be remigrated. See: https://spicedb.dev/d/truncate-unsupported", err)
553555
}

internal/datastore/postgres/stats.go

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,25 @@ var (
2828
Where(sq.Eq{colRelname: tableTuple})
2929
)
3030

31-
func (pgd *pgDatastore) datastoreUniqueID(ctx context.Context) (string, error) {
32-
idSQL, idArgs, err := queryUniqueID.ToSql()
33-
if err != nil {
34-
return "", fmt.Errorf("unable to generate query sql: %w", err)
31+
func (pgd *pgDatastore) UniqueID(ctx context.Context) (string, error) {
32+
if pgd.uniqueID.Load() == nil {
33+
idSQL, idArgs, err := queryUniqueID.ToSql()
34+
if err != nil {
35+
return "", fmt.Errorf("unable to generate query sql: %w", err)
36+
}
37+
38+
var uniqueID string
39+
if err := pgx.BeginTxFunc(ctx, pgd.readPool, pgd.readTxOptions, func(tx pgx.Tx) error {
40+
return tx.QueryRow(ctx, idSQL, idArgs...).Scan(&uniqueID)
41+
}); err != nil {
42+
return "", fmt.Errorf("unable to query unique ID: %w", err)
43+
}
44+
45+
pgd.uniqueID.Store(&uniqueID)
46+
return uniqueID, nil
3547
}
3648

37-
var uniqueID string
38-
return uniqueID, pgx.BeginTxFunc(ctx, pgd.readPool, pgd.readTxOptions, func(tx pgx.Tx) error {
39-
return tx.QueryRow(ctx, idSQL, idArgs...).Scan(&uniqueID)
40-
})
49+
return *pgd.uniqueID.Load(), nil
4150
}
4251

4352
func (pgd *pgDatastore) Statistics(ctx context.Context) (datastore.Stats, error) {

internal/datastore/proxy/observable.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ func NewObservableDatastoreProxy(d datastore.Datastore) datastore.Datastore {
6767

6868
type observableProxy struct{ delegate datastore.Datastore }
6969

70+
func (p *observableProxy) UniqueID(ctx context.Context) (string, error) {
71+
return p.delegate.UniqueID(ctx)
72+
}
73+
7074
func (p *observableProxy) SnapshotReader(rev datastore.Revision) datastore.Reader {
7175
delegateReader := p.delegate.SnapshotReader(rev)
7276
return &observableReader{delegateReader}

internal/datastore/proxy/proxy_test/mock.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ type MockDatastore struct {
1515
mock.Mock
1616
}
1717

18+
func (dm *MockDatastore) UniqueID(_ context.Context) (string, error) {
19+
return "mockds", nil
20+
}
21+
1822
func (dm *MockDatastore) SnapshotReader(rev datastore.Revision) datastore.Reader {
1923
args := dm.Called(rev)
2024
return args.Get(0).(datastore.Reader)

internal/datastore/proxy/schemacaching/watchingcache_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,10 @@ type fakeDatastore struct {
337337
lock sync.RWMutex
338338
}
339339

340+
func (fds *fakeDatastore) UniqueID(_ context.Context) (string, error) {
341+
return "fakedsforwatch", nil
342+
}
343+
340344
func (fds *fakeDatastore) updateNamespace(name string, def *corev1.NamespaceDefinition, revision datastore.Revision) {
341345
fds.lock.Lock()
342346
defer fds.lock.Unlock()

internal/datastore/proxy/singleflight.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ type singleflightProxy struct {
2525

2626
var _ datastore.Datastore = (*singleflightProxy)(nil)
2727

28+
func (p *singleflightProxy) UniqueID(ctx context.Context) (string, error) {
29+
return p.delegate.UniqueID(ctx)
30+
}
31+
2832
func (p *singleflightProxy) SnapshotReader(rev datastore.Revision) datastore.Reader {
2933
return p.delegate.SnapshotReader(rev)
3034
}

internal/datastore/spanner/revisions.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313

1414
var ParseRevisionString = revisions.RevisionParser(revisions.Timestamp)
1515

16-
func (sd spannerDatastore) headRevisionInternal(ctx context.Context) (datastore.Revision, error) {
16+
func (sd *spannerDatastore) headRevisionInternal(ctx context.Context) (datastore.Revision, error) {
1717
now, err := sd.now(ctx)
1818
if err != nil {
1919
return datastore.NoRevision, fmt.Errorf(errRevision, err)
@@ -22,11 +22,11 @@ func (sd spannerDatastore) headRevisionInternal(ctx context.Context) (datastore.
2222
return revisions.NewForTime(now), nil
2323
}
2424

25-
func (sd spannerDatastore) HeadRevision(ctx context.Context) (datastore.Revision, error) {
25+
func (sd *spannerDatastore) HeadRevision(ctx context.Context) (datastore.Revision, error) {
2626
return sd.headRevisionInternal(ctx)
2727
}
2828

29-
func (sd spannerDatastore) now(ctx context.Context) (time.Time, error) {
29+
func (sd *spannerDatastore) now(ctx context.Context) (time.Time, error) {
3030
var timestamp time.Time
3131
if err := sd.client.Single().Query(ctx, spanner.NewStatement("SELECT CURRENT_TIMESTAMP()")).Do(func(r *spanner.Row) error {
3232
return r.Columns(&timestamp)

internal/datastore/spanner/spanner.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"os"
77
"regexp"
88
"strconv"
9+
"sync/atomic"
910
"time"
1011

1112
"cloud.google.com/go/spanner"
@@ -78,6 +79,8 @@ type spannerDatastore struct {
7879
client *spanner.Client
7980
config spannerOptions
8081
database string
82+
83+
uniqueID atomic.Pointer[string]
8184
}
8285

8386
// NewSpannerDatastore returns a datastore backed by cloud spanner
@@ -143,7 +146,7 @@ func NewSpannerDatastore(ctx context.Context, database string, opts ...Option) (
143146
maxRevisionStaleness := time.Duration(float64(config.revisionQuantization.Nanoseconds())*
144147
config.maxRevisionStalenessPercent) * time.Nanosecond
145148

146-
ds := spannerDatastore{
149+
ds := &spannerDatastore{
147150
RemoteClockRevisions: revisions.NewRemoteClockRevisions(
148151
defaultChangeStreamRetention,
149152
maxRevisionStaleness,
@@ -195,7 +198,7 @@ func (t *traceableRTX) Query(ctx context.Context, statement spanner.Statement) *
195198
return t.delegate.Query(ctx, statement)
196199
}
197200

198-
func (sd spannerDatastore) SnapshotReader(revisionRaw datastore.Revision) datastore.Reader {
201+
func (sd *spannerDatastore) SnapshotReader(revisionRaw datastore.Revision) datastore.Reader {
199202
r := revisionRaw.(revisions.TimestampRevision)
200203

201204
txSource := func() readTX {
@@ -205,7 +208,7 @@ func (sd spannerDatastore) SnapshotReader(revisionRaw datastore.Revision) datast
205208
return spannerReader{executor, txSource}
206209
}
207210

208-
func (sd spannerDatastore) ReadWriteTx(ctx context.Context, fn datastore.TxUserFunc, opts ...options.RWTOptionsOption) (datastore.Revision, error) {
211+
func (sd *spannerDatastore) ReadWriteTx(ctx context.Context, fn datastore.TxUserFunc, opts ...options.RWTOptionsOption) (datastore.Revision, error) {
209212
config := options.NewRWTOptionsWithOptions(opts...)
210213

211214
ctx, span := tracer.Start(ctx, "ReadWriteTx")
@@ -248,7 +251,7 @@ func (sd spannerDatastore) ReadWriteTx(ctx context.Context, fn datastore.TxUserF
248251
return revisions.NewForTime(ts), nil
249252
}
250253

251-
func (sd spannerDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, error) {
254+
func (sd *spannerDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, error) {
252255
headMigration, err := migrations.SpannerMigrations.HeadRevision()
253256
if err != nil {
254257
return datastore.ReadyState{}, fmt.Errorf("invalid head migration found for spanner: %w", err)
@@ -275,11 +278,11 @@ func (sd spannerDatastore) ReadyState(ctx context.Context) (datastore.ReadyState
275278
}, nil
276279
}
277280

278-
func (sd spannerDatastore) Features(_ context.Context) (*datastore.Features, error) {
281+
func (sd *spannerDatastore) Features(_ context.Context) (*datastore.Features, error) {
279282
return &datastore.Features{Watch: datastore.Feature{Enabled: true}}, nil
280283
}
281284

282-
func (sd spannerDatastore) Close() error {
285+
func (sd *spannerDatastore) Close() error {
283286
sd.client.Close()
284287
return nil
285288
}

internal/datastore/spanner/spanner_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import (
1818
)
1919

2020
// Implement TestableDatastore interface
21-
func (sd spannerDatastore) ExampleRetryableError() error {
21+
func (sd *spannerDatastore) ExampleRetryableError() error {
2222
return status.New(codes.Aborted, "retryable").Err()
2323
}
2424

internal/datastore/spanner/stats.go

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,29 @@ var (
2020
rng = rand.NewSource(time.Now().UnixNano())
2121
)
2222

23-
func (sd spannerDatastore) Statistics(ctx context.Context) (datastore.Stats, error) {
24-
var uniqueID string
25-
if err := sd.client.Single().Read(
26-
context.Background(),
27-
tableMetadata,
28-
spanner.AllKeys(),
29-
[]string{colUniqueID},
30-
).Do(func(r *spanner.Row) error {
31-
return r.Columns(&uniqueID)
32-
}); err != nil {
23+
func (sd *spannerDatastore) UniqueID(ctx context.Context) (string, error) {
24+
if sd.uniqueID.Load() == nil {
25+
var uniqueID string
26+
if err := sd.client.Single().Read(
27+
ctx,
28+
tableMetadata,
29+
spanner.AllKeys(),
30+
[]string{colUniqueID},
31+
).Do(func(r *spanner.Row) error {
32+
return r.Columns(&uniqueID)
33+
}); err != nil {
34+
return "", fmt.Errorf("unable to read unique ID: %w", err)
35+
}
36+
sd.uniqueID.Store(&uniqueID)
37+
return uniqueID, nil
38+
}
39+
40+
return *sd.uniqueID.Load(), nil
41+
}
42+
43+
func (sd *spannerDatastore) Statistics(ctx context.Context) (datastore.Stats, error) {
44+
uniqueID, err := sd.UniqueID(ctx)
45+
if err != nil {
3346
return datastore.Stats{}, fmt.Errorf("unable to read unique ID: %w", err)
3447
}
3548

internal/datastore/spanner/watch.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func parseDatabaseName(db string) (project, instance, database string, err error
5151
return matches[1], matches[2], matches[3], nil
5252
}
5353

54-
func (sd spannerDatastore) Watch(ctx context.Context, afterRevision datastore.Revision, opts datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) {
54+
func (sd *spannerDatastore) Watch(ctx context.Context, afterRevision datastore.Revision, opts datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) {
5555
watchBufferLength := opts.WatchBufferLength
5656
if watchBufferLength <= 0 {
5757
watchBufferLength = sd.watchBufferLength
@@ -65,7 +65,7 @@ func (sd spannerDatastore) Watch(ctx context.Context, afterRevision datastore.Re
6565
return updates, errs
6666
}
6767

68-
func (sd spannerDatastore) watch(
68+
func (sd *spannerDatastore) watch(
6969
ctx context.Context,
7070
afterRevisionRaw datastore.Revision,
7171
opts datastore.WatchOptions,

pkg/datastore/datastore.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -459,6 +459,11 @@ func (wo WatchOptions) WithCheckpointInterval(interval time.Duration) WatchOptio
459459

460460
// Datastore represents tuple access for a single namespace.
461461
type Datastore interface {
462+
// UniqueID returns a unique identifier for the datastore. This identifier
463+
// must be stable across restarts of the datastore if the datastore is
464+
// persistent.
465+
UniqueID(context.Context) (string, error)
466+
462467
// SnapshotReader creates a read-only handle that reads the datastore at the specified revision.
463468
// Any errors establishing the reader will be returned by subsequent calls.
464469
SnapshotReader(Revision) Reader

0 commit comments

Comments
 (0)