-
Notifications
You must be signed in to change notification settings - Fork 316
feat: schema watch support for mysql driver #2224
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 4 commits
dac084e
a470f6e
7736b28
674a979
bdc6eeb
b465181
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,11 +3,13 @@ package mysql | |
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"time" | ||
|
||
"github.com/authzed/spicedb/internal/datastore/common" | ||
"github.com/authzed/spicedb/internal/datastore/revisions" | ||
"github.com/authzed/spicedb/pkg/datastore" | ||
core "github.com/authzed/spicedb/pkg/proto/core/v1" | ||
"github.com/authzed/spicedb/pkg/tuple" | ||
|
||
sq "github.com/Masterminds/squirrel" | ||
|
@@ -29,12 +31,6 @@ func (mds *Datastore) Watch(ctx context.Context, afterRevisionRaw datastore.Revi | |
updates := make(chan *datastore.RevisionChanges, watchBufferLength) | ||
errs := make(chan error, 1) | ||
|
||
if options.Content&datastore.WatchSchema == datastore.WatchSchema { | ||
close(updates) | ||
errs <- errors.New("schema watch unsupported in MySQL") | ||
return updates, errs | ||
} | ||
|
||
if options.EmissionStrategy == datastore.EmitImmediatelyStrategy { | ||
close(updates) | ||
errs <- errors.New("emit immediately strategy is unsupported in MySQL") | ||
|
@@ -177,7 +173,30 @@ func (mds *Datastore) loadChanges( | |
} | ||
|
||
// Load the changes relationships for the revision range. | ||
sql, args, err = mds.QueryChangedQuery.Where(sq.Or{ | ||
if err := mds.loadRelationshipChanges(ctx, afterRevision, newRevision, stagedChanges); err != nil { | ||
return nil, 0, err | ||
} | ||
|
||
// Load namespace changes for the revision range. | ||
if options.Content&datastore.WatchSchema == datastore.WatchSchema { | ||
if err := mds.loadNamespaceChanges(ctx, afterRevision, newRevision, stagedChanges); err != nil { | ||
return nil, 0, err | ||
} | ||
} | ||
|
||
// Load caveat changes for the revision range. | ||
if options.Content&datastore.WatchSchema == datastore.WatchSchema { | ||
if err := mds.loadCaveatChanges(ctx, afterRevision, newRevision, stagedChanges); err != nil { | ||
return nil, 0, err | ||
} | ||
} | ||
|
||
changes, err = stagedChanges.AsRevisionChanges(revisions.TransactionIDKeyLessThanFunc) | ||
return | ||
} | ||
|
||
func (mds *Datastore) loadRelationshipChanges(ctx context.Context, afterRevision uint64, newRevision uint64, stagedChanges *common.Changes[revisions.TransactionIDRevision, uint64]) (err error) { | ||
sql, args, err := mds.QueryChangedQuery.Where(sq.Or{ | ||
sq.And{ | ||
sq.Gt{colCreatedTxn: afterRevision}, | ||
sq.LtOrEq{colCreatedTxn: newRevision}, | ||
|
@@ -191,7 +210,7 @@ func (mds *Datastore) loadChanges( | |
return | ||
} | ||
|
||
rows, err = mds.db.QueryContext(ctx, sql, args...) | ||
rows, err := mds.db.QueryContext(ctx, sql, args...) | ||
if err != nil { | ||
if errors.Is(err, context.Canceled) { | ||
err = datastore.NewWatchCanceledErr() | ||
|
@@ -265,7 +284,130 @@ func (mds *Datastore) loadChanges( | |
if err = rows.Err(); err != nil { | ||
return | ||
} | ||
return | ||
} | ||
|
||
func (mds *Datastore) loadNamespaceChanges(ctx context.Context, afterRevision uint64, newRevision uint64, stagedChanges *common.Changes[revisions.TransactionIDRevision, uint64]) (err error) { | ||
sql, args, err := mds.QueryChangedNamespacesQuery.Where(sq.Or{ | ||
sq.And{ | ||
sq.Gt{colCreatedTxn: afterRevision}, | ||
sq.LtOrEq{colCreatedTxn: newRevision}, | ||
}, | ||
sq.And{ | ||
sq.Gt{colDeletedTxn: afterRevision}, | ||
sq.LtOrEq{colDeletedTxn: newRevision}, | ||
}, | ||
}).ToSql() | ||
if err != nil { | ||
return fmt.Errorf("unable to prepare changes SQL: %w", err) | ||
} | ||
|
||
rows, err := mds.db.QueryContext(ctx, sql, args...) | ||
if err != nil { | ||
if errors.Is(err, context.Canceled) { | ||
err = datastore.NewWatchCanceledErr() | ||
} | ||
return | ||
} | ||
defer common.LogOnError(ctx, rows.Close) | ||
|
||
for rows.Next() { | ||
var createdTxn uint64 | ||
var deletedTxn uint64 | ||
var config []byte | ||
|
||
err = rows.Scan( | ||
&config, | ||
&createdTxn, | ||
&deletedTxn, | ||
) | ||
if err != nil { | ||
return | ||
} | ||
loaded := &core.NamespaceDefinition{} | ||
if err := loaded.UnmarshalVT(config); err != nil { | ||
return fmt.Errorf("unable to parse changed namespace: %w", err) | ||
} | ||
|
||
if createdTxn > afterRevision && createdTxn <= newRevision { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why the filter here if we're doing so in the SQL query? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The query can return a wider net of rows in the watch window, hence a double check, was implemented earlier for relationships too, also tested without the if statement, datastore tests tend to break in that case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah |
||
if err = stagedChanges.AddChangedDefinition(ctx, revisions.NewForTransactionID(createdTxn), loaded); err != nil { | ||
return | ||
} | ||
} | ||
if deletedTxn > afterRevision && deletedTxn <= newRevision { | ||
if err = stagedChanges.AddDeletedNamespace(ctx, revisions.NewForTransactionID(deletedTxn), loaded.Name); err != nil { | ||
return | ||
} | ||
} | ||
} | ||
|
||
if err = rows.Err(); err != nil { | ||
return fmt.Errorf("unable to load changes: %w", err) | ||
} | ||
|
||
return | ||
} | ||
|
||
func (mds *Datastore) loadCaveatChanges(ctx context.Context, afterRevision uint64, newRevision uint64, stagedChanges *common.Changes[revisions.TransactionIDRevision, uint64]) (err error) { | ||
sql, args, err := mds.QueryChangedCaveatsQuery.Where(sq.Or{ | ||
sq.And{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should be able to combine these methods There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done, should I do same for loadRelationships and combine that too? (right now |
||
sq.Gt{colCreatedTxn: afterRevision}, | ||
sq.LtOrEq{colCreatedTxn: newRevision}, | ||
}, | ||
sq.And{ | ||
sq.Gt{colDeletedTxn: afterRevision}, | ||
sq.LtOrEq{colDeletedTxn: newRevision}, | ||
}, | ||
}).ToSql() | ||
if err != nil { | ||
return fmt.Errorf("unable to prepare changes SQL: %w", err) | ||
} | ||
|
||
rows, err := mds.db.QueryContext(ctx, sql, args...) | ||
if err != nil { | ||
if errors.Is(err, context.Canceled) { | ||
err = datastore.NewWatchCanceledErr() | ||
} | ||
return | ||
} | ||
|
||
defer common.LogOnError(ctx, rows.Close) | ||
|
||
for rows.Next() { | ||
var createdTxn uint64 | ||
var deletedTxn uint64 | ||
var config []byte | ||
var name string | ||
|
||
err = rows.Scan( | ||
&name, | ||
&config, | ||
&createdTxn, | ||
&deletedTxn, | ||
) | ||
if err != nil { | ||
return fmt.Errorf("unable to parse changed caveat: %w", err) | ||
} | ||
loaded := &core.CaveatDefinition{} | ||
if err := loaded.UnmarshalVT(config); err != nil { | ||
return fmt.Errorf(errUnableToReadConfig, err) | ||
} | ||
|
||
if createdTxn > afterRevision && createdTxn <= newRevision { | ||
if err = stagedChanges.AddChangedDefinition(ctx, revisions.NewForTransactionID(createdTxn), loaded); err != nil { | ||
return | ||
} | ||
} | ||
if deletedTxn > afterRevision && deletedTxn <= newRevision { | ||
if err = stagedChanges.AddDeletedCaveat(ctx, revisions.NewForTransactionID(deletedTxn), loaded.Name); err != nil { | ||
return | ||
} | ||
} | ||
} | ||
|
||
if err = rows.Err(); err != nil { | ||
return fmt.Errorf("unable to load changes: %w", err) | ||
} | ||
|
||
changes, err = stagedChanges.AsRevisionChanges(revisions.TransactionIDKeyLessThanFunc) | ||
return | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put this under the if statement above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done