Skip to content

Commit a04dd54

Browse files
committed
Export the unique, stable, ID from datastore
1 parent ec216dc commit a04dd54

File tree

20 files changed

+154
-39
lines changed

20 files changed

+154
-39
lines changed

internal/datastore/context.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ func NewSeparatingContextDatastoreProxy(d datastore.Datastore) datastore.StrictR
3939

4040
type ctxProxy struct{ delegate datastore.Datastore }
4141

42+
func (p *ctxProxy) UniqueID(ctx context.Context) (string, error) {
43+
return p.delegate.UniqueID(SeparateContextWithTracing(ctx))
44+
}
45+
4246
func (p *ctxProxy) ReadWriteTx(
4347
ctx context.Context,
4448
f datastore.TxUserFunc,

internal/datastore/crdb/crdb.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"regexp"
88
"strconv"
9+
"sync/atomic"
910
"time"
1011

1112
"github.com/IBM/pgxpoolprometheus"
@@ -345,6 +346,8 @@ type crdbDatastore struct {
345346
cancel context.CancelFunc
346347
filterMaximumIDCount uint16
347348
supportsIntegrity bool
349+
350+
uniqueID atomic.Pointer[string]
348351
}
349352

350353
func (cds *crdbDatastore) SnapshotReader(rev datastore.Revision) datastore.Reader {

internal/datastore/crdb/stats.go

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,33 @@ const (
2020
colUniqueID = "unique_id"
2121
)
2222

23-
var (
24-
queryReadUniqueID = psql.Select(colUniqueID).From(tableMetadata)
25-
uniqueID string
26-
)
23+
var queryReadUniqueID = psql.Select(colUniqueID).From(tableMetadata)
2724

28-
func (cds *crdbDatastore) Statistics(ctx context.Context) (datastore.Stats, error) {
29-
if len(uniqueID) == 0 {
25+
func (cds *crdbDatastore) UniqueID(ctx context.Context) (string, error) {
26+
if cds.uniqueID.Load() == nil {
3027
sql, args, err := queryReadUniqueID.ToSql()
3128
if err != nil {
32-
return datastore.Stats{}, fmt.Errorf("unable to prepare unique ID sql: %w", err)
29+
return "", fmt.Errorf("unable to prepare unique ID sql: %w", err)
3330
}
31+
32+
var uniqueID string
3433
if err := cds.readPool.QueryRowFunc(ctx, func(ctx context.Context, row pgx.Row) error {
3534
return row.Scan(&uniqueID)
3635
}, sql, args...); err != nil {
37-
return datastore.Stats{}, fmt.Errorf("unable to query unique ID: %w", err)
36+
return "", fmt.Errorf("unable to query unique ID: %w", err)
3837
}
38+
39+
cds.uniqueID.Store(&uniqueID)
40+
return uniqueID, nil
41+
}
42+
43+
return *cds.uniqueID.Load(), nil
44+
}
45+
46+
func (cds *crdbDatastore) Statistics(ctx context.Context) (datastore.Stats, error) {
47+
uniqueID, err := cds.UniqueID(ctx)
48+
if err != nil {
49+
return datastore.Stats{}, err
3950
}
4051

4152
var nsDefs []datastore.RevisionedNamespace

internal/datastore/memdb/memdb.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,10 @@ type snapshot struct {
100100
db *memdb.MemDB
101101
}
102102

103+
func (mdb *memdbDatastore) UniqueID(_ context.Context) (string, error) {
104+
return mdb.uniqueID, nil
105+
}
106+
103107
func (mdb *memdbDatastore) SnapshotReader(dr datastore.Revision) datastore.Reader {
104108
mdb.RLock()
105109
defer mdb.RUnlock()

internal/datastore/mysql/datastore.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -503,6 +503,8 @@ type Datastore struct {
503503
createTxn sq.InsertBuilder
504504
createBaseTxn string
505505

506+
uniqueID atomic.Pointer[string]
507+
506508
*QueryBuilder
507509
*revisions.CachedOptimizedRevisions
508510
revisions.CommonDecoder
@@ -586,7 +588,7 @@ func (mds *Datastore) isSeeded(ctx context.Context) (bool, error) {
586588
return false, nil
587589
}
588590

589-
_, err = mds.getUniqueID(ctx)
591+
_, err = mds.UniqueID(ctx)
590592
if err != nil {
591593
return false, nil
592594
}

internal/datastore/mysql/stats.go

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ func (mds *Datastore) Statistics(ctx context.Context) (datastore.Stats, error) {
3030
}
3131
}
3232

33-
uniqueID, err := mds.getUniqueID(ctx)
33+
uniqueID, err := mds.UniqueID(ctx)
3434
if err != nil {
3535
return datastore.Stats{}, err
3636
}
@@ -88,16 +88,20 @@ func (mds *Datastore) Statistics(ctx context.Context) (datastore.Stats, error) {
8888
}, nil
8989
}
9090

91-
func (mds *Datastore) getUniqueID(ctx context.Context) (string, error) {
92-
sql, args, err := sb.Select(metadataUniqueIDColumn).From(mds.driver.Metadata()).ToSql()
93-
if err != nil {
94-
return "", fmt.Errorf("unable to generate query sql: %w", err)
95-
}
91+
func (mds *Datastore) UniqueID(ctx context.Context) (string, error) {
92+
if mds.uniqueID.Load() == nil {
93+
sql, args, err := sb.Select(metadataUniqueIDColumn).From(mds.driver.Metadata()).ToSql()
94+
if err != nil {
95+
return "", fmt.Errorf("unable to generate query sql: %w", err)
96+
}
9697

97-
var uniqueID string
98-
if err := mds.db.QueryRowContext(ctx, sql, args...).Scan(&uniqueID); err != nil {
99-
return "", fmt.Errorf("unable to query unique ID: %w", err)
98+
var uniqueID string
99+
if err := mds.db.QueryRowContext(ctx, sql, args...).Scan(&uniqueID); err != nil {
100+
return "", fmt.Errorf("unable to query unique ID: %w", err)
101+
}
102+
mds.uniqueID.Store(&uniqueID)
103+
return uniqueID, nil
100104
}
101105

102-
return uniqueID, nil
106+
return *mds.uniqueID.Load(), nil
103107
}

internal/datastore/postgres/postgres.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,7 @@ type pgDatastore struct {
416416
includeQueryParametersInTraces bool
417417

418418
credentialsProvider datastore.CredentialsProvider
419+
uniqueID atomic.Pointer[string]
419420

420421
gcGroup *errgroup.Group
421422
gcCtx context.Context
@@ -691,8 +692,9 @@ func (pgd *pgDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, e
691692
if !state.IsReady {
692693
return state, nil
693694
}
695+
694696
// Ensure a datastore ID is present. This ensures the tables have not been truncated.
695-
uniqueID, err := pgd.datastoreUniqueID(ctx)
697+
uniqueID, err := pgd.UniqueID(ctx)
696698
if err != nil {
697699
return datastore.ReadyState{}, fmt.Errorf("database validation failed: %w; if you have previously run `TRUNCATE`, this database is no longer valid and must be remigrated. See: https://spicedb.dev/d/truncate-unsupported", err)
698700
}

internal/datastore/postgres/stats.go

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,25 @@ var (
2828
Where(sq.Eq{colRelname: tableTuple})
2929
)
3030

31-
func (pgd *pgDatastore) datastoreUniqueID(ctx context.Context) (string, error) {
32-
idSQL, idArgs, err := queryUniqueID.ToSql()
33-
if err != nil {
34-
return "", fmt.Errorf("unable to generate query sql: %w", err)
31+
func (pgd *pgDatastore) UniqueID(ctx context.Context) (string, error) {
32+
if pgd.uniqueID.Load() == nil {
33+
idSQL, idArgs, err := queryUniqueID.ToSql()
34+
if err != nil {
35+
return "", fmt.Errorf("unable to generate query sql: %w", err)
36+
}
37+
38+
var uniqueID string
39+
if err := pgx.BeginTxFunc(ctx, pgd.readPool, pgd.readTxOptions, func(tx pgx.Tx) error {
40+
return tx.QueryRow(ctx, idSQL, idArgs...).Scan(&uniqueID)
41+
}); err != nil {
42+
return "", fmt.Errorf("unable to query unique ID: %w", err)
43+
}
44+
45+
pgd.uniqueID.Store(&uniqueID)
46+
return uniqueID, nil
3547
}
3648

37-
var uniqueID string
38-
return uniqueID, pgx.BeginTxFunc(ctx, pgd.readPool, pgd.readTxOptions, func(tx pgx.Tx) error {
39-
return tx.QueryRow(ctx, idSQL, idArgs...).Scan(&uniqueID)
40-
})
49+
return *pgd.uniqueID.Load(), nil
4150
}
4251

4352
func (pgd *pgDatastore) Statistics(ctx context.Context) (datastore.Stats, error) {

internal/datastore/proxy/observable.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ func NewObservableDatastoreProxy(d datastore.Datastore) datastore.Datastore {
6868

6969
type observableProxy struct{ delegate datastore.Datastore }
7070

71+
func (p *observableProxy) UniqueID(ctx context.Context) (string, error) {
72+
return p.delegate.UniqueID(ctx)
73+
}
74+
7175
func (p *observableProxy) SnapshotReader(rev datastore.Revision) datastore.Reader {
7276
delegateReader := p.delegate.SnapshotReader(rev)
7377
return &observableReader{delegateReader}

internal/datastore/proxy/proxy_test/mock.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ type MockDatastore struct {
1717
mock.Mock
1818
}
1919

20+
func (dm *MockDatastore) UniqueID(_ context.Context) (string, error) {
21+
return "mockds", nil
22+
}
23+
2024
func (dm *MockDatastore) SnapshotReader(rev datastore.Revision) datastore.Reader {
2125
args := dm.Called(rev)
2226
return args.Get(0).(datastore.Reader)

internal/datastore/proxy/replicated_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,10 @@ func (f fakeDatastore) Statistics(_ context.Context) (datastore.Stats, error) {
142142
return datastore.Stats{}, nil
143143
}
144144

145+
func (f fakeDatastore) UniqueID(_ context.Context) (string, error) {
146+
return "fake", nil
147+
}
148+
145149
func (f fakeDatastore) Close() error {
146150
return nil
147151
}

internal/datastore/proxy/schemacaching/watchingcache_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,10 @@ type fakeDatastore struct {
333333
lock sync.RWMutex
334334
}
335335

336+
func (fds *fakeDatastore) UniqueID(_ context.Context) (string, error) {
337+
return "fakedsforwatch", nil
338+
}
339+
336340
func (fds *fakeDatastore) updateNamespace(name string, def *corev1.NamespaceDefinition, revision datastore.Revision) {
337341
fds.lock.Lock()
338342
defer fds.lock.Unlock()

internal/datastore/proxy/singleflight.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ type singleflightProxy struct {
2424

2525
var _ datastore.Datastore = (*singleflightProxy)(nil)
2626

27+
func (p *singleflightProxy) UniqueID(ctx context.Context) (string, error) {
28+
return p.delegate.UniqueID(ctx)
29+
}
30+
2731
func (p *singleflightProxy) SnapshotReader(rev datastore.Revision) datastore.Reader {
2832
return p.delegate.SnapshotReader(rev)
2933
}

internal/datastore/spanner/revisions.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,27 @@ var (
1616
nowStmt = spanner.NewStatement("SELECT CURRENT_TIMESTAMP()")
1717
)
1818

19+
func (sd *spannerDatastore) headRevisionInternal(ctx context.Context) (datastore.Revision, error) {
20+
now, err := sd.now(ctx)
21+
if err != nil {
22+
return datastore.NoRevision, fmt.Errorf(errRevision, err)
23+
}
24+
25+
return revisions.NewForTime(now), nil
26+
}
27+
1928
func (sd *spannerDatastore) HeadRevision(ctx context.Context) (datastore.Revision, error) {
29+
return sd.headRevisionInternal(ctx)
30+
}
31+
32+
func (sd *spannerDatastore) now(ctx context.Context) (time.Time, error) {
2033
var timestamp time.Time
2134
if err := sd.client.Single().Query(ctx, nowStmt).Do(func(r *spanner.Row) error {
2235
return r.Columns(&timestamp)
2336
}); err != nil {
24-
return datastore.NoRevision, fmt.Errorf(errRevision, err)
37+
return timestamp, fmt.Errorf(errRevision, err)
2538
}
26-
return revisions.NewForTime(timestamp), nil
39+
return timestamp, nil
2740
}
2841

2942
func (sd *spannerDatastore) staleHeadRevision(ctx context.Context) (datastore.Revision, error) {

internal/datastore/spanner/spanner.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"regexp"
99
"strconv"
1010
"sync"
11+
"sync/atomic"
1112
"time"
1213

1314
"cloud.google.com/go/spanner"
@@ -97,6 +98,7 @@ type spannerDatastore struct {
9798

9899
tableSizesStatsTable string
99100
filterMaximumIDCount uint16
101+
uniqueID atomic.Pointer[string]
100102
}
101103

102104
// NewSpannerDatastore returns a datastore backed by cloud spanner

internal/datastore/spanner/stats.go

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,29 @@ var querySomeRandomRelationships = fmt.Sprintf(`SELECT %s FROM %s LIMIT 10`,
2828

2929
const defaultEstimatedBytesPerRelationships = 20 // determined by looking at some sample clusters
3030

31+
func (sd *spannerDatastore) UniqueID(ctx context.Context) (string, error) {
32+
if sd.uniqueID.Load() == nil {
33+
var uniqueID string
34+
if err := sd.client.Single().Read(
35+
ctx,
36+
tableMetadata,
37+
spanner.AllKeys(),
38+
[]string{colUniqueID},
39+
).Do(func(r *spanner.Row) error {
40+
return r.Columns(&uniqueID)
41+
}); err != nil {
42+
return "", fmt.Errorf("unable to read unique ID: %w", err)
43+
}
44+
sd.uniqueID.Store(&uniqueID)
45+
return uniqueID, nil
46+
}
47+
48+
return *sd.uniqueID.Load(), nil
49+
}
50+
3151
func (sd *spannerDatastore) Statistics(ctx context.Context) (datastore.Stats, error) {
32-
var uniqueID string
33-
if err := sd.client.Single().Read(
34-
context.Background(),
35-
tableMetadata,
36-
spanner.AllKeys(),
37-
[]string{colUniqueID},
38-
).Do(func(r *spanner.Row) error {
39-
return r.Columns(&uniqueID)
40-
}); err != nil {
52+
uniqueID, err := sd.UniqueID(ctx)
53+
if err != nil {
4154
return datastore.Stats{}, fmt.Errorf("unable to read unique ID: %w", err)
4255
}
4356

pkg/datastore/datastore.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -628,6 +628,11 @@ func (wo WatchOptions) WithCheckpointInterval(interval time.Duration) WatchOptio
628628

629629
// ReadOnlyDatastore is an interface for reading relationships from the datastore.
630630
type ReadOnlyDatastore interface {
631+
// UniqueID returns a unique identifier for the datastore. This identifier
632+
// must be stable across restarts of the datastore if the datastore is
633+
// persistent.
634+
UniqueID(context.Context) (string, error)
635+
631636
// SnapshotReader creates a read-only handle that reads the datastore at the specified revision.
632637
// Any errors establishing the reader will be returned by subsequent calls.
633638
SnapshotReader(Revision) Reader

pkg/datastore/datastore_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -582,6 +582,10 @@ type fakeDatastore struct {
582582
delegate Datastore
583583
}
584584

585+
func (f fakeDatastore) UniqueID(_ context.Context) (string, error) {
586+
return "fake", nil
587+
}
588+
585589
func (f fakeDatastore) Unwrap() Datastore {
586590
return f.delegate
587591
}

pkg/datastore/test/basic.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,3 +73,21 @@ func DeleteAllDataTest(t *testing.T, tester DatastoreTester) {
7373
}
7474
}
7575
}
76+
77+
func UniqueIDTest(t *testing.T, tester DatastoreTester) {
78+
require := require.New(t)
79+
80+
// Create the datastore.
81+
ds, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1)
82+
require.NoError(err)
83+
84+
// Ensure the unique ID is not empty.
85+
uniqueID, err := ds.UniqueID(context.Background())
86+
require.NoError(err)
87+
require.NotEmpty(uniqueID)
88+
89+
// Ensure the unique ID is stable.
90+
uniqueID2, err := ds.UniqueID(context.Background())
91+
require.NoError(err)
92+
require.Equal(uniqueID, uniqueID2)
93+
}

pkg/datastore/test/datastore.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ func AllWithExceptions(t *testing.T, tester DatastoreTester, except Categories,
105105
runner = parallel
106106
}
107107

108+
t.Run("TestUniqueID", func(t *testing.T) { UniqueIDTest(t, tester) })
108109
t.Run("TestUseAfterClose", runner(tester, UseAfterCloseTest))
109110

110111
t.Run("TestNamespaceNotFound", runner(tester, NamespaceNotFoundTest))

0 commit comments

Comments
 (0)