Skip to content

Commit 8db642a

Browse files
authored
Merge pull request authzed#1720 from authzed/memdb-schemawatch-support
implements schema watch support for MemDB
2 parents bdcc2a6 + 54a86d6 commit 8db642a

File tree

4 files changed

+54
-24
lines changed

4 files changed

+54
-24
lines changed

internal/datastore/memdb/memdb.go

Lines changed: 52 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ import (
99
"sync"
1010
"time"
1111

12+
"github.com/authzed/spicedb/internal/datastore/common"
13+
"github.com/authzed/spicedb/pkg/spiceerrors"
14+
1215
"github.com/google/uuid"
1316
"github.com/hashicorp/go-memdb"
1417

@@ -195,40 +198,72 @@ func (mdb *memdbDatastore) ReadWriteTx(
195198
mdb.Lock()
196199
defer mdb.Unlock()
197200

198-
// Record the changes that were made
199-
newChanges := datastore.RevisionChanges{
200-
Revision: newRevision,
201-
RelationshipChanges: nil,
202-
}
201+
tracked := common.NewChanges(revisions.TimestampIDKeyFunc, datastore.WatchRelationships|datastore.WatchSchema)
203202
if tx != nil {
204203
for _, change := range tx.Changes() {
205-
if change.Table == tableRelationship {
204+
switch change.Table {
205+
case tableRelationship:
206206
if change.After != nil {
207207
rt, err := change.After.(*relationship).RelationTuple()
208208
if err != nil {
209209
return datastore.NoRevision, err
210210
}
211-
newChanges.RelationshipChanges = append(newChanges.RelationshipChanges, &corev1.RelationTupleUpdate{
212-
Operation: corev1.RelationTupleUpdate_TOUCH,
213-
Tuple: rt,
214-
})
215-
}
216-
if change.After == nil && change.Before != nil {
211+
212+
if err := tracked.AddRelationshipChange(ctx, newRevision, rt, corev1.RelationTupleUpdate_TOUCH); err != nil {
213+
return datastore.NoRevision, err
214+
}
215+
} else if change.After == nil && change.Before != nil {
217216
rt, err := change.Before.(*relationship).RelationTuple()
218217
if err != nil {
219218
return datastore.NoRevision, err
220219
}
221-
newChanges.RelationshipChanges = append(newChanges.RelationshipChanges, &corev1.RelationTupleUpdate{
222-
Operation: corev1.RelationTupleUpdate_DELETE,
223-
Tuple: rt,
224-
})
220+
221+
if err := tracked.AddRelationshipChange(ctx, newRevision, rt, corev1.RelationTupleUpdate_DELETE); err != nil {
222+
return datastore.NoRevision, err
223+
}
224+
} else {
225+
return datastore.NoRevision, spiceerrors.MustBugf("unexpected relationship change")
226+
}
227+
case tableNamespace:
228+
if change.After != nil {
229+
loaded := &corev1.NamespaceDefinition{}
230+
if err := loaded.UnmarshalVT(change.After.(*namespace).configBytes); err != nil {
231+
return datastore.NoRevision, err
232+
}
233+
234+
tracked.AddChangedDefinition(ctx, newRevision, loaded)
235+
} else if change.After == nil && change.Before != nil {
236+
tracked.AddDeletedNamespace(ctx, newRevision, change.Before.(*namespace).name)
237+
} else {
238+
return datastore.NoRevision, spiceerrors.MustBugf("unexpected namespace change")
239+
}
240+
case tableCaveats:
241+
if change.After != nil {
242+
loaded := &corev1.CaveatDefinition{}
243+
if err := loaded.UnmarshalVT(change.After.(*caveat).definition); err != nil {
244+
return datastore.NoRevision, err
245+
}
246+
247+
tracked.AddChangedDefinition(ctx, newRevision, loaded)
248+
} else if change.After == nil && change.Before != nil {
249+
tracked.AddDeletedCaveat(ctx, newRevision, change.Before.(*caveat).name)
250+
} else {
251+
return datastore.NoRevision, spiceerrors.MustBugf("unexpected namespace change")
225252
}
226253
}
227254
}
228255

256+
var rc datastore.RevisionChanges
257+
changes := tracked.AsRevisionChanges(revisions.TimestampIDKeyLessThanFunc)
258+
if len(changes) > 1 {
259+
return datastore.NoRevision, spiceerrors.MustBugf("unexpected MemDB transaction with multiple revision changes")
260+
} else if len(changes) == 1 {
261+
rc = changes[0]
262+
}
263+
229264
change := &changelog{
230265
revisionNanos: newRevision.TimestampNanoSec(),
231-
changes: newChanges,
266+
changes: rc,
232267
}
233268
if err := tx.Insert(tableChangelog, change); err != nil {
234269
return datastore.NoRevision, fmt.Errorf("error writing changelog: %w", err)

internal/datastore/memdb/memdb_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ func (mdbt memDBTest) New(revisionQuantization, _, gcWindow time.Duration, watch
2626
}
2727

2828
func TestMemdbDatastore(t *testing.T) {
29-
test.AllWithExceptions(t, memDBTest{}, test.WithCategories(test.WatchSchemaCategory))
29+
test.All(t, memDBTest{})
3030
}
3131

3232
func TestConcurrentWritePanic(t *testing.T) {

internal/datastore/memdb/watch.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,6 @@ func (mdb *memdbDatastore) Watch(ctx context.Context, ar datastore.Revision, opt
2323
updates := make(chan *datastore.RevisionChanges, watchBufferLength)
2424
errs := make(chan error, 1)
2525

26-
if options.Content&datastore.WatchSchema == datastore.WatchSchema {
27-
errs <- errors.New("schema watch unsupported in MemDB")
28-
return updates, errs
29-
}
30-
3126
watchBufferWriteTimeout := options.WatchBufferWriteTimeout
3227
if watchBufferWriteTimeout == 0 {
3328
watchBufferWriteTimeout = mdb.watchBufferWriteTimeout

internal/datastore/revisions/commonrevision.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func (cd CommonDecoder) RevisionFromString(s string) (datastore.Revision, error)
6565
// WithInexactFloat64 is an interface that can be implemented by a revision to
6666
// provide an inexact float64 representation of the revision.
6767
type WithInexactFloat64 interface {
68-
// WithInexactFloat64 returns a float64 that is an inexact representation of the
68+
// InexactFloat64 returns a float64 that is an inexact representation of the
6969
// revision.
7070
InexactFloat64() float64
7171
}

0 commit comments

Comments
 (0)