Skip to content

Commit c176cb2

Browse files
committed
Setup defined (and configurable) behavior if a ZedToken from
an older datastore is used All ZedTokens are now minted with the datastore's unique ID included in the ZedToken and that ID is checked when the ZedToken is decoded. In scenarios where the datastore ID does not match, either an error is raised (watch, at_exact_snapshot) or configurable behavior is used (at_least_as_fresh) Fixes authzed#1541
1 parent 1b65ae3 commit c176cb2

32 files changed

+706
-143
lines changed

e2e/newenemy/newenemy_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -376,8 +376,8 @@ func checkDataNoNewEnemy(ctx context.Context, t testing.TB, slowNodeID int, crdb
376376
require.NoError(t, err)
377377
t.Log("r2 token: ", r2.WrittenAt.Token)
378378

379-
z1, _ := zedtoken.DecodeRevision(r1.WrittenAt, revisions.CommonDecoder{Kind: revisions.HybridLogicalClock})
380-
z2, _ := zedtoken.DecodeRevision(r2.WrittenAt, revisions.CommonDecoder{Kind: revisions.HybridLogicalClock})
379+
z1, _, _ := zedtoken.DecodeRevision(r1.WrittenAt, revisions.CommonDecoder{Kind: revisions.HybridLogicalClock})
380+
z2, _, _ := zedtoken.DecodeRevision(r2.WrittenAt, revisions.CommonDecoder{Kind: revisions.HybridLogicalClock})
381381

382382
t.Log("z1 revision: ", z1)
383383
t.Log("z2 revision: ", z2)

internal/datastore/proxy/proxy_test/mock.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,16 @@ import (
1313

1414
type MockDatastore struct {
1515
mock.Mock
16+
17+
CurrentUniqueID string
1618
}
1719

1820
func (dm *MockDatastore) UniqueID(_ context.Context) (string, error) {
19-
return "mockds", nil
21+
if dm.CurrentUniqueID == "" {
22+
return "mockds", nil
23+
}
24+
25+
return dm.CurrentUniqueID, nil
2026
}
2127

2228
func (dm *MockDatastore) SnapshotReader(rev datastore.Revision) datastore.Reader {

internal/datastore/revisions/commonrevision.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package revisions
22

33
import (
4+
"context"
5+
46
"github.com/authzed/spicedb/pkg/datastore"
57
"github.com/authzed/spicedb/pkg/spiceerrors"
68
)
@@ -43,7 +45,12 @@ func RevisionParser(kind RevisionKind) ParsingFunc {
4345

4446
// CommonDecoder is a revision decoder that can decode revisions of a given kind.
4547
type CommonDecoder struct {
46-
Kind RevisionKind
48+
Kind RevisionKind
49+
DatastoreUniqueID string
50+
}
51+
52+
func (cd CommonDecoder) UniqueID(_ context.Context) (string, error) {
53+
return cd.DatastoreUniqueID, nil
4754
}
4855

4956
func (cd CommonDecoder) RevisionFromString(s string) (datastore.Revision, error) {

internal/middleware/consistency/consistency.go

Lines changed: 66 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/authzed/spicedb/internal/services/shared"
1919
"github.com/authzed/spicedb/pkg/cursor"
2020
"github.com/authzed/spicedb/pkg/datastore"
21+
"github.com/authzed/spicedb/pkg/spiceerrors"
2122
"github.com/authzed/spicedb/pkg/zedtoken"
2223
)
2324

@@ -55,27 +56,47 @@ func RevisionFromContext(ctx context.Context) (datastore.Revision, *v1.ZedToken,
5556
handle := c.(*revisionHandle)
5657
rev := handle.revision
5758
if rev != nil {
58-
return rev, zedtoken.MustNewFromRevision(rev), nil
59+
ds := datastoremw.FromContext(ctx)
60+
if ds == nil {
61+
return nil, nil, spiceerrors.MustBugf("consistency middleware did not inject datastore")
62+
}
63+
64+
zedToken, err := zedtoken.NewFromRevision(ctx, rev, ds)
65+
if err != nil {
66+
return nil, nil, err
67+
}
68+
69+
return rev, zedToken, nil
5970
}
6071
}
6172

6273
return nil, nil, fmt.Errorf("consistency middleware did not inject revision")
6374
}
6475

76+
type MismatchingTokenOption int
77+
78+
const (
79+
TreatMismatchingTokensAsFullConsistency MismatchingTokenOption = iota
80+
81+
TreatMismatchingTokensAsMinLatency
82+
83+
TreatMismatchingTokensAsError
84+
)
85+
6586
// AddRevisionToContext adds a revision to the given context, based on the consistency block found
6687
// in the given request (if applicable).
67-
func AddRevisionToContext(ctx context.Context, req interface{}, ds datastore.Datastore) error {
88+
func AddRevisionToContext(ctx context.Context, req interface{}, ds datastore.Datastore, option MismatchingTokenOption) error {
6889
switch req := req.(type) {
6990
case hasConsistency:
70-
return addRevisionToContextFromConsistency(ctx, req, ds)
91+
return addRevisionToContextFromConsistency(ctx, req, ds, option)
7192
default:
7293
return nil
7394
}
7495
}
7596

7697
// addRevisionToContextFromConsistency adds a revision to the given context, based on the consistency block found
7798
// in the given request (if applicable).
78-
func addRevisionToContextFromConsistency(ctx context.Context, req hasConsistency, ds datastore.Datastore) error {
99+
func addRevisionToContextFromConsistency(ctx context.Context, req hasConsistency, ds datastore.Datastore, option MismatchingTokenOption) error {
79100
handle := ctx.Value(revisionKey)
80101
if handle == nil {
81102
return nil
@@ -91,7 +112,7 @@ func addRevisionToContextFromConsistency(ctx context.Context, req hasConsistency
91112
// Always use the revision encoded in the cursor.
92113
ConsistentyCounter.WithLabelValues("snapshot", "cursor").Inc()
93114

94-
requestedRev, err := cursor.DecodeToDispatchRevision(withOptionalCursor.GetOptionalCursor(), ds)
115+
requestedRev, _, err := cursor.DecodeToDispatchRevision(ctx, withOptionalCursor.GetOptionalCursor(), ds)
95116
if err != nil {
96117
return rewriteDatastoreError(ctx, err)
97118
}
@@ -130,7 +151,7 @@ func addRevisionToContextFromConsistency(ctx context.Context, req hasConsistency
130151
case consistency.GetAtLeastAsFresh() != nil:
131152
// At least as fresh as: Pick one of the datastore's revision and that specified, which
132153
// ever is later.
133-
picked, pickedRequest, err := pickBestRevision(ctx, consistency.GetAtLeastAsFresh(), ds)
154+
picked, pickedRequest, err := pickBestRevision(ctx, consistency.GetAtLeastAsFresh(), ds, option)
134155
if err != nil {
135156
return rewriteDatastoreError(ctx, err)
136157
}
@@ -147,11 +168,16 @@ func addRevisionToContextFromConsistency(ctx context.Context, req hasConsistency
147168
// Exact snapshot: Use the revision as encoded in the zed token.
148169
ConsistentyCounter.WithLabelValues("snapshot", "request").Inc()
149170

150-
requestedRev, err := zedtoken.DecodeRevision(consistency.GetAtExactSnapshot(), ds)
171+
requestedRev, status, err := zedtoken.DecodeRevision(consistency.GetAtExactSnapshot(), ds)
151172
if err != nil {
152173
return errInvalidZedToken
153174
}
154175

176+
if status == zedtoken.StatusMismatchedDatastoreID {
177+
log.Error().Str("zedtoken", consistency.GetAtExactSnapshot().Token).Msg("ZedToken specified references an older datastore but at-exact-snapshot was requested")
178+
return fmt.Errorf("ZedToken specified references an older datastore but at-exact-snapshot was requested")
179+
}
180+
155181
err = ds.CheckRevision(ctx, requestedRev)
156182
if err != nil {
157183
return rewriteDatastoreError(ctx, err)
@@ -175,7 +201,7 @@ var bypassServiceWhitelist = map[string]struct{}{
175201

176202
// UnaryServerInterceptor returns a new unary server interceptor that performs per-request exchange of
177203
// the specified consistency configuration for the revision at which to perform the request.
178-
func UnaryServerInterceptor() grpc.UnaryServerInterceptor {
204+
func UnaryServerInterceptor(option MismatchingTokenOption) grpc.UnaryServerInterceptor {
179205
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
180206
for bypass := range bypassServiceWhitelist {
181207
if strings.HasPrefix(info.FullMethod, bypass) {
@@ -184,7 +210,7 @@ func UnaryServerInterceptor() grpc.UnaryServerInterceptor {
184210
}
185211
ds := datastoremw.MustFromContext(ctx)
186212
newCtx := ContextWithHandle(ctx)
187-
if err := AddRevisionToContext(newCtx, req, ds); err != nil {
213+
if err := AddRevisionToContext(newCtx, req, ds, option); err != nil {
188214
return nil, err
189215
}
190216

@@ -194,21 +220,22 @@ func UnaryServerInterceptor() grpc.UnaryServerInterceptor {
194220

195221
// StreamServerInterceptor returns a new stream server interceptor that performs per-request exchange of
196222
// the specified consistency configuration for the revision at which to perform the request.
197-
func StreamServerInterceptor() grpc.StreamServerInterceptor {
223+
func StreamServerInterceptor(option MismatchingTokenOption) grpc.StreamServerInterceptor {
198224
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
199225
for bypass := range bypassServiceWhitelist {
200226
if strings.HasPrefix(info.FullMethod, bypass) {
201227
return handler(srv, stream)
202228
}
203229
}
204-
wrapper := &recvWrapper{stream, ContextWithHandle(stream.Context())}
230+
wrapper := &recvWrapper{stream, ContextWithHandle(stream.Context()), option}
205231
return handler(srv, wrapper)
206232
}
207233
}
208234

209235
type recvWrapper struct {
210236
grpc.ServerStream
211-
ctx context.Context
237+
ctx context.Context
238+
option MismatchingTokenOption
212239
}
213240

214241
func (s *recvWrapper) Context() context.Context { return s.ctx }
@@ -219,24 +246,48 @@ func (s *recvWrapper) RecvMsg(m interface{}) error {
219246
}
220247
ds := datastoremw.MustFromContext(s.ctx)
221248

222-
return AddRevisionToContext(s.ctx, m, ds)
249+
return AddRevisionToContext(s.ctx, m, ds, s.option)
223250
}
224251

225252
// pickBestRevision compares the provided ZedToken with the optimized revision of the datastore, and returns the most
226253
// recent one. The boolean return value will be true if the provided ZedToken is the most recent, false otherwise.
227-
func pickBestRevision(ctx context.Context, requested *v1.ZedToken, ds datastore.Datastore) (datastore.Revision, bool, error) {
254+
func pickBestRevision(ctx context.Context, requested *v1.ZedToken, ds datastore.Datastore, option MismatchingTokenOption) (datastore.Revision, bool, error) {
228255
// Calculate a revision as we see fit
229256
databaseRev, err := ds.OptimizedRevision(ctx)
230257
if err != nil {
231258
return datastore.NoRevision, false, err
232259
}
233260

234261
if requested != nil {
235-
requestedRev, err := zedtoken.DecodeRevision(requested, ds)
262+
requestedRev, status, err := zedtoken.DecodeRevision(requested, ds)
236263
if err != nil {
237264
return datastore.NoRevision, false, errInvalidZedToken
238265
}
239266

267+
if status == zedtoken.StatusMismatchedDatastoreID {
268+
switch option {
269+
case TreatMismatchingTokensAsFullConsistency:
270+
log.Warn().Str("zedtoken", requested.Token).Msg("ZedToken specified references an older datastore and SpiceDB is configured to treat this as a full consistency request")
271+
headRev, err := ds.HeadRevision(ctx)
272+
if err != nil {
273+
return datastore.NoRevision, false, err
274+
}
275+
276+
return headRev, false, nil
277+
278+
case TreatMismatchingTokensAsMinLatency:
279+
log.Warn().Str("zedtoken", requested.Token).Msg("ZedToken specified references an older datastore and SpiceDB is configured to treat this as a min latency request")
280+
return databaseRev, false, nil
281+
282+
case TreatMismatchingTokensAsError:
283+
log.Error().Str("zedtoken", requested.Token).Msg("ZedToken specified references an older datastore and SpiceDB is configured to raise an error in this scenario")
284+
return datastore.NoRevision, false, fmt.Errorf("ZedToken specified references an older datastore and SpiceDB is configured to raise an error in this scenario")
285+
286+
default:
287+
return datastore.NoRevision, false, spiceerrors.MustBugf("unknown mismatching token option: %v", option)
288+
}
289+
}
290+
240291
if databaseRev.GreaterThan(requestedRev) {
241292
return databaseRev, false, nil
242293
}

0 commit comments

Comments
 (0)