Skip to content

Commit 1b65ae3

Browse files
committed
Export the unique, stable, ID from datastore
1 parent 9456ce0 commit 1b65ae3

File tree

20 files changed

+153
-39
lines changed

20 files changed

+153
-39
lines changed

internal/datastore/context.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ func NewSeparatingContextDatastoreProxy(d datastore.Datastore) datastore.StrictR
3939

4040
type ctxProxy struct{ delegate datastore.Datastore }
4141

42+
func (p *ctxProxy) UniqueID(ctx context.Context) (string, error) {
43+
return p.delegate.UniqueID(SeparateContextWithTracing(ctx))
44+
}
45+
4246
func (p *ctxProxy) ReadWriteTx(
4347
ctx context.Context,
4448
f datastore.TxUserFunc,

internal/datastore/crdb/crdb.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"regexp"
88
"strconv"
9+
"sync/atomic"
910
"time"
1011

1112
"github.com/IBM/pgxpoolprometheus"
@@ -278,6 +279,8 @@ type crdbDatastore struct {
278279
pruneGroup *errgroup.Group
279280
ctx context.Context
280281
cancel context.CancelFunc
282+
283+
uniqueID atomic.Pointer[string]
281284
}
282285

283286
func (cds *crdbDatastore) SnapshotReader(rev datastore.Revision) datastore.Reader {

internal/datastore/crdb/stats.go

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,33 @@ const (
1818
colUniqueID = "unique_id"
1919
)
2020

21-
var (
22-
queryReadUniqueID = psql.Select(colUniqueID).From(tableMetadata)
23-
uniqueID string
24-
)
21+
var queryReadUniqueID = psql.Select(colUniqueID).From(tableMetadata)
2522

26-
func (cds *crdbDatastore) Statistics(ctx context.Context) (datastore.Stats, error) {
27-
if len(uniqueID) == 0 {
23+
func (cds *crdbDatastore) UniqueID(ctx context.Context) (string, error) {
24+
if cds.uniqueID.Load() == nil {
2825
sql, args, err := queryReadUniqueID.ToSql()
2926
if err != nil {
30-
return datastore.Stats{}, fmt.Errorf("unable to prepare unique ID sql: %w", err)
27+
return "", fmt.Errorf("unable to prepare unique ID sql: %w", err)
3128
}
29+
30+
var uniqueID string
3231
if err := cds.readPool.QueryRowFunc(ctx, func(ctx context.Context, row pgx.Row) error {
3332
return row.Scan(&uniqueID)
3433
}, sql, args...); err != nil {
35-
return datastore.Stats{}, fmt.Errorf("unable to query unique ID: %w", err)
34+
return "", fmt.Errorf("unable to query unique ID: %w", err)
3635
}
36+
37+
cds.uniqueID.Store(&uniqueID)
38+
return uniqueID, nil
39+
}
40+
41+
return *cds.uniqueID.Load(), nil
42+
}
43+
44+
func (cds *crdbDatastore) Statistics(ctx context.Context) (datastore.Stats, error) {
45+
uniqueID, err := cds.UniqueID(ctx)
46+
if err != nil {
47+
return datastore.Stats{}, err
3748
}
3849

3950
var nsDefs []datastore.RevisionedNamespace

internal/datastore/memdb/memdb.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,10 @@ type snapshot struct {
9999
db *memdb.MemDB
100100
}
101101

102+
func (mdb *memdbDatastore) UniqueID(_ context.Context) (string, error) {
103+
return mdb.uniqueID, nil
104+
}
105+
102106
func (mdb *memdbDatastore) SnapshotReader(dr datastore.Revision) datastore.Reader {
103107
mdb.RLock()
104108
defer mdb.RUnlock()

internal/datastore/mysql/datastore.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -496,6 +496,8 @@ type Datastore struct {
496496
createTxn string
497497
createBaseTxn string
498498

499+
uniqueID atomic.Pointer[string]
500+
499501
*QueryBuilder
500502
*revisions.CachedOptimizedRevisions
501503
revisions.CommonDecoder
@@ -572,7 +574,7 @@ func (mds *Datastore) isSeeded(ctx context.Context) (bool, error) {
572574
return false, nil
573575
}
574576

575-
_, err = mds.getUniqueID(ctx)
577+
_, err = mds.UniqueID(ctx)
576578
if err != nil {
577579
return false, nil
578580
}

internal/datastore/mysql/stats.go

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func (mds *Datastore) Statistics(ctx context.Context) (datastore.Stats, error) {
2828
}
2929
}
3030

31-
uniqueID, err := mds.getUniqueID(ctx)
31+
uniqueID, err := mds.UniqueID(ctx)
3232
if err != nil {
3333
return datastore.Stats{}, err
3434
}
@@ -81,16 +81,20 @@ func (mds *Datastore) Statistics(ctx context.Context) (datastore.Stats, error) {
8181
}, nil
8282
}
8383

84-
func (mds *Datastore) getUniqueID(ctx context.Context) (string, error) {
85-
sql, args, err := sb.Select(metadataUniqueIDColumn).From(mds.driver.Metadata()).ToSql()
86-
if err != nil {
87-
return "", fmt.Errorf("unable to generate query sql: %w", err)
88-
}
84+
func (mds *Datastore) UniqueID(ctx context.Context) (string, error) {
85+
if mds.uniqueID.Load() == nil {
86+
sql, args, err := sb.Select(metadataUniqueIDColumn).From(mds.driver.Metadata()).ToSql()
87+
if err != nil {
88+
return "", fmt.Errorf("unable to generate query sql: %w", err)
89+
}
8990

90-
var uniqueID string
91-
if err := mds.db.QueryRowContext(ctx, sql, args...).Scan(&uniqueID); err != nil {
92-
return "", fmt.Errorf("unable to query unique ID: %w", err)
91+
var uniqueID string
92+
if err := mds.db.QueryRowContext(ctx, sql, args...).Scan(&uniqueID); err != nil {
93+
return "", fmt.Errorf("unable to query unique ID: %w", err)
94+
}
95+
mds.uniqueID.Store(&uniqueID)
96+
return uniqueID, nil
9397
}
9498

95-
return uniqueID, nil
99+
return *mds.uniqueID.Load(), nil
96100
}

internal/datastore/postgres/postgres.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,7 @@ type pgDatastore struct {
384384
inStrictReadMode bool
385385

386386
credentialsProvider datastore.CredentialsProvider
387+
uniqueID atomic.Pointer[string]
387388

388389
gcGroup *errgroup.Group
389390
gcCtx context.Context
@@ -648,7 +649,7 @@ func (pgd *pgDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, e
648649

649650
if version == headMigration {
650651
// Ensure a datastore ID is present. This ensures the tables have not been truncated.
651-
uniqueID, err := pgd.datastoreUniqueID(ctx)
652+
uniqueID, err := pgd.UniqueID(ctx)
652653
if err != nil {
653654
return datastore.ReadyState{}, fmt.Errorf("database validation failed: %w; if you have previously run `TRUNCATE`, this database is no longer valid and must be remigrated. See: https://spicedb.dev/d/truncate-unsupported", err)
654655
}

internal/datastore/postgres/stats.go

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,25 @@ var (
2828
Where(sq.Eq{colRelname: tableTuple})
2929
)
3030

31-
func (pgd *pgDatastore) datastoreUniqueID(ctx context.Context) (string, error) {
32-
idSQL, idArgs, err := queryUniqueID.ToSql()
33-
if err != nil {
34-
return "", fmt.Errorf("unable to generate query sql: %w", err)
31+
func (pgd *pgDatastore) UniqueID(ctx context.Context) (string, error) {
32+
if pgd.uniqueID.Load() == nil {
33+
idSQL, idArgs, err := queryUniqueID.ToSql()
34+
if err != nil {
35+
return "", fmt.Errorf("unable to generate query sql: %w", err)
36+
}
37+
38+
var uniqueID string
39+
if err := pgx.BeginTxFunc(ctx, pgd.readPool, pgd.readTxOptions, func(tx pgx.Tx) error {
40+
return tx.QueryRow(ctx, idSQL, idArgs...).Scan(&uniqueID)
41+
}); err != nil {
42+
return "", fmt.Errorf("unable to query unique ID: %w", err)
43+
}
44+
45+
pgd.uniqueID.Store(&uniqueID)
46+
return uniqueID, nil
3547
}
3648

37-
var uniqueID string
38-
return uniqueID, pgx.BeginTxFunc(ctx, pgd.readPool, pgd.readTxOptions, func(tx pgx.Tx) error {
39-
return tx.QueryRow(ctx, idSQL, idArgs...).Scan(&uniqueID)
40-
})
49+
return *pgd.uniqueID.Load(), nil
4150
}
4251

4352
func (pgd *pgDatastore) Statistics(ctx context.Context) (datastore.Stats, error) {

internal/datastore/proxy/observable.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ func NewObservableDatastoreProxy(d datastore.Datastore) datastore.Datastore {
6767

6868
type observableProxy struct{ delegate datastore.Datastore }
6969

70+
func (p *observableProxy) UniqueID(ctx context.Context) (string, error) {
71+
return p.delegate.UniqueID(ctx)
72+
}
73+
7074
func (p *observableProxy) SnapshotReader(rev datastore.Revision) datastore.Reader {
7175
delegateReader := p.delegate.SnapshotReader(rev)
7276
return &observableReader{delegateReader}

internal/datastore/proxy/proxy_test/mock.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ type MockDatastore struct {
1515
mock.Mock
1616
}
1717

18+
func (dm *MockDatastore) UniqueID(_ context.Context) (string, error) {
19+
return "mockds", nil
20+
}
21+
1822
func (dm *MockDatastore) SnapshotReader(rev datastore.Revision) datastore.Reader {
1923
args := dm.Called(rev)
2024
return args.Get(0).(datastore.Reader)

internal/datastore/proxy/replicated_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,10 @@ func (f fakeDatastore) Statistics(_ context.Context) (datastore.Stats, error) {
138138
return datastore.Stats{}, nil
139139
}
140140

141+
func (f fakeDatastore) UniqueID(_ context.Context) (string, error) {
142+
return "fake", nil
143+
}
144+
141145
func (f fakeDatastore) Close() error {
142146
return nil
143147
}

internal/datastore/proxy/schemacaching/watchingcache_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,10 @@ type fakeDatastore struct {
337337
lock sync.RWMutex
338338
}
339339

340+
func (fds *fakeDatastore) UniqueID(_ context.Context) (string, error) {
341+
return "fakedsforwatch", nil
342+
}
343+
340344
func (fds *fakeDatastore) updateNamespace(name string, def *corev1.NamespaceDefinition, revision datastore.Revision) {
341345
fds.lock.Lock()
342346
defer fds.lock.Unlock()

internal/datastore/proxy/singleflight.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ type singleflightProxy struct {
2424

2525
var _ datastore.Datastore = (*singleflightProxy)(nil)
2626

27+
func (p *singleflightProxy) UniqueID(ctx context.Context) (string, error) {
28+
return p.delegate.UniqueID(ctx)
29+
}
30+
2731
func (p *singleflightProxy) SnapshotReader(rev datastore.Revision) datastore.Reader {
2832
return p.delegate.SnapshotReader(rev)
2933
}

internal/datastore/spanner/revisions.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,27 @@ var (
1616
nowStmt = spanner.NewStatement("SELECT CURRENT_TIMESTAMP()")
1717
)
1818

19+
func (sd *spannerDatastore) headRevisionInternal(ctx context.Context) (datastore.Revision, error) {
20+
now, err := sd.now(ctx)
21+
if err != nil {
22+
return datastore.NoRevision, fmt.Errorf(errRevision, err)
23+
}
24+
25+
return revisions.NewForTime(now), nil
26+
}
27+
1928
func (sd *spannerDatastore) HeadRevision(ctx context.Context) (datastore.Revision, error) {
29+
return sd.headRevisionInternal(ctx)
30+
}
31+
32+
func (sd *spannerDatastore) now(ctx context.Context) (time.Time, error) {
2033
var timestamp time.Time
2134
if err := sd.client.Single().Query(ctx, nowStmt).Do(func(r *spanner.Row) error {
2235
return r.Columns(&timestamp)
2336
}); err != nil {
24-
return datastore.NoRevision, fmt.Errorf(errRevision, err)
37+
return timestamp, fmt.Errorf(errRevision, err)
2538
}
26-
return revisions.NewForTime(timestamp), nil
39+
return timestamp, nil
2740
}
2841

2942
func (sd *spannerDatastore) staleHeadRevision(ctx context.Context) (datastore.Revision, error) {

internal/datastore/spanner/spanner.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"regexp"
88
"strconv"
99
"sync"
10+
"sync/atomic"
1011
"time"
1112

1213
"cloud.google.com/go/spanner"
@@ -91,6 +92,7 @@ type spannerDatastore struct {
9192
cachedEstimatedBytesPerRelationship uint64
9293

9394
tableSizesStatsTable string
95+
uniqueID atomic.Pointer[string]
9496
}
9597

9698
// NewSpannerDatastore returns a datastore backed by cloud spanner

internal/datastore/spanner/stats.go

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,29 @@ var querySomeRandomRelationships = fmt.Sprintf(`SELECT %s FROM %s LIMIT 10`,
2525

2626
const defaultEstimatedBytesPerRelationships = 20 // determined by looking at some sample clusters
2727

28+
func (sd *spannerDatastore) UniqueID(ctx context.Context) (string, error) {
29+
if sd.uniqueID.Load() == nil {
30+
var uniqueID string
31+
if err := sd.client.Single().Read(
32+
ctx,
33+
tableMetadata,
34+
spanner.AllKeys(),
35+
[]string{colUniqueID},
36+
).Do(func(r *spanner.Row) error {
37+
return r.Columns(&uniqueID)
38+
}); err != nil {
39+
return "", fmt.Errorf("unable to read unique ID: %w", err)
40+
}
41+
sd.uniqueID.Store(&uniqueID)
42+
return uniqueID, nil
43+
}
44+
45+
return *sd.uniqueID.Load(), nil
46+
}
47+
2848
func (sd *spannerDatastore) Statistics(ctx context.Context) (datastore.Stats, error) {
29-
var uniqueID string
30-
if err := sd.client.Single().Read(
31-
context.Background(),
32-
tableMetadata,
33-
spanner.AllKeys(),
34-
[]string{colUniqueID},
35-
).Do(func(r *spanner.Row) error {
36-
return r.Columns(&uniqueID)
37-
}); err != nil {
49+
uniqueID, err := sd.UniqueID(ctx)
50+
if err != nil {
3851
return datastore.Stats{}, fmt.Errorf("unable to read unique ID: %w", err)
3952
}
4053

pkg/datastore/datastore.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -543,6 +543,11 @@ func (wo WatchOptions) WithCheckpointInterval(interval time.Duration) WatchOptio
543543

544544
// ReadOnlyDatastore is an interface for reading relationships from the datastore.
545545
type ReadOnlyDatastore interface {
546+
// UniqueID returns a unique identifier for the datastore. This identifier
547+
// must be stable across restarts of the datastore if the datastore is
548+
// persistent.
549+
UniqueID(context.Context) (string, error)
550+
546551
// SnapshotReader creates a read-only handle that reads the datastore at the specified revision.
547552
// Any errors establishing the reader will be returned by subsequent calls.
548553
SnapshotReader(Revision) Reader

pkg/datastore/datastore_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -582,6 +582,10 @@ type fakeDatastore struct {
582582
delegate Datastore
583583
}
584584

585+
func (f fakeDatastore) UniqueID(_ context.Context) (string, error) {
586+
return "fake", nil
587+
}
588+
585589
func (f fakeDatastore) Unwrap() Datastore {
586590
return f.delegate
587591
}

pkg/datastore/test/basic.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,21 @@ func UseAfterCloseTest(t *testing.T, tester DatastoreTester) {
2222
_, err = ds.HeadRevision(context.Background())
2323
require.Error(err)
2424
}
25+
26+
func UniqueIDTest(t *testing.T, tester DatastoreTester) {
27+
require := require.New(t)
28+
29+
// Create the datastore.
30+
ds, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1)
31+
require.NoError(err)
32+
33+
// Ensure the unique ID is not empty.
34+
uniqueID, err := ds.UniqueID(context.Background())
35+
require.NoError(err)
36+
require.NotEmpty(uniqueID)
37+
38+
// Ensure the unique ID is stable.
39+
uniqueID2, err := ds.UniqueID(context.Background())
40+
require.NoError(err)
41+
require.Equal(uniqueID, uniqueID2)
42+
}

pkg/datastore/test/datastore.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ func WithCategories(cats ...string) Categories {
8787
// AllWithExceptions runs all generic datastore tests on a DatastoreTester, except
8888
// those specified test categories
8989
func AllWithExceptions(t *testing.T, tester DatastoreTester, except Categories) {
90+
t.Run("TestUniqueID", func(t *testing.T) { UniqueIDTest(t, tester) })
9091
t.Run("TestUseAfterClose", func(t *testing.T) { UseAfterCloseTest(t, tester) })
9192

9293
t.Run("TestNamespaceNotFound", func(t *testing.T) { NamespaceNotFoundTest(t, tester) })

0 commit comments

Comments
 (0)