Skip to content

feat: schema watch support for mysql driver #2224

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/datastore/mysql/datastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestMySQLDatastoreDSNWithoutParseTime(t *testing.T) {
func TestMySQL8Datastore(t *testing.T) {
b := testdatastore.RunMySQLForTestingWithOptions(t, testdatastore.MySQLTesterOptions{MigrateForNewDatastore: true}, "")
dst := datastoreTester{b: b, t: t}
test.AllWithExceptions(t, test.DatastoreTesterFunc(dst.createDatastore), test.WithCategories(test.WatchSchemaCategory, test.WatchCheckpointsCategory), true)
test.AllWithExceptions(t, test.DatastoreTesterFunc(dst.createDatastore), test.WithCategories(test.WatchCheckpointsCategory), true)
additionalMySQLTests(t, b)
}

Expand Down
29 changes: 25 additions & 4 deletions internal/datastore/mysql/query_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type QueryBuilder struct {
ReadNamespaceQuery sq.SelectBuilder
DeleteNamespaceQuery sq.UpdateBuilder
DeleteNamespaceRelationshipsQuery sq.UpdateBuilder
QueryChangedNamespacesQuery sq.SelectBuilder

ReadCounterQuery sq.SelectBuilder
InsertCounterQuery sq.InsertBuilder
Expand All @@ -30,10 +31,11 @@ type QueryBuilder struct {
QueryChangedQuery sq.SelectBuilder
CountRelsQuery sq.SelectBuilder

WriteCaveatQuery sq.InsertBuilder
ReadCaveatQuery sq.SelectBuilder
ListCaveatsQuery sq.SelectBuilder
DeleteCaveatQuery sq.UpdateBuilder
WriteCaveatQuery sq.InsertBuilder
ReadCaveatQuery sq.SelectBuilder
ListCaveatsQuery sq.SelectBuilder
DeleteCaveatQuery sq.UpdateBuilder
QueryChangedCaveatsQuery sq.SelectBuilder
}

// NewQueryBuilder returns a new QueryBuilder instance. The migration
Expand All @@ -49,6 +51,7 @@ func NewQueryBuilder(driver *migrations.MySQLDriver) *QueryBuilder {
builder.WriteNamespaceQuery = writeNamespace(driver.Namespace())
builder.ReadNamespaceQuery = readNamespace(driver.Namespace())
builder.DeleteNamespaceQuery = deleteNamespace(driver.Namespace())
builder.QueryChangedNamespacesQuery = changedNamespaces(driver.Namespace())

// counters builders
builder.ReadCounterQuery = readCounter(driver.RelationshipCounters())
Expand All @@ -71,6 +74,7 @@ func NewQueryBuilder(driver *migrations.MySQLDriver) *QueryBuilder {
builder.ListCaveatsQuery = listCaveats(driver.Caveat())
builder.WriteCaveatQuery = writeCaveat(driver.Caveat())
builder.DeleteCaveatQuery = deleteCaveat(driver.Caveat())
builder.QueryChangedCaveatsQuery = changedCaveats(driver.Caveat())

return &builder
}
Expand Down Expand Up @@ -223,3 +227,20 @@ func queryChanged(tableTuple string) sq.SelectBuilder {
colDeletedTxn,
).From(tableTuple)
}

func changedCaveats(tableTuple string) sq.SelectBuilder {
return sb.Select(
colName,
colCaveatDefinition,
colCreatedTxn,
colDeletedTxn,
).From(tableTuple)
}

func changedNamespaces(tableTuple string) sq.SelectBuilder {
return sb.Select(
colConfig,
colCreatedTxn,
colDeletedTxn,
).From(tableTuple)
}
160 changes: 151 additions & 9 deletions internal/datastore/mysql/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package mysql
import (
"context"
"errors"
"fmt"
"time"

"github.com/authzed/spicedb/internal/datastore/common"
"github.com/authzed/spicedb/internal/datastore/revisions"
"github.com/authzed/spicedb/pkg/datastore"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
"github.com/authzed/spicedb/pkg/tuple"

sq "github.com/Masterminds/squirrel"
Expand All @@ -29,12 +31,6 @@ func (mds *Datastore) Watch(ctx context.Context, afterRevisionRaw datastore.Revi
updates := make(chan *datastore.RevisionChanges, watchBufferLength)
errs := make(chan error, 1)

if options.Content&datastore.WatchSchema == datastore.WatchSchema {
close(updates)
errs <- errors.New("schema watch unsupported in MySQL")
return updates, errs
}

if options.EmissionStrategy == datastore.EmitImmediatelyStrategy {
close(updates)
errs <- errors.New("emit immediately strategy is unsupported in MySQL")
Expand Down Expand Up @@ -177,7 +173,30 @@ func (mds *Datastore) loadChanges(
}

// Load the changes relationships for the revision range.
sql, args, err = mds.QueryChangedQuery.Where(sq.Or{
if err := mds.loadRelationshipChanges(ctx, afterRevision, newRevision, stagedChanges); err != nil {
return nil, 0, err
}

// Load namespace changes for the revision range.
if options.Content&datastore.WatchSchema == datastore.WatchSchema {
if err := mds.loadNamespaceChanges(ctx, afterRevision, newRevision, stagedChanges); err != nil {
return nil, 0, err
}
}

// Load caveat changes for the revision range.
if options.Content&datastore.WatchSchema == datastore.WatchSchema {
if err := mds.loadCaveatChanges(ctx, afterRevision, newRevision, stagedChanges); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put this under the if statement above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return nil, 0, err
}
}

changes, err = stagedChanges.AsRevisionChanges(revisions.TransactionIDKeyLessThanFunc)
return
}

func (mds *Datastore) loadRelationshipChanges(ctx context.Context, afterRevision uint64, newRevision uint64, stagedChanges *common.Changes[revisions.TransactionIDRevision, uint64]) (err error) {
sql, args, err := mds.QueryChangedQuery.Where(sq.Or{
sq.And{
sq.Gt{colCreatedTxn: afterRevision},
sq.LtOrEq{colCreatedTxn: newRevision},
Expand All @@ -191,7 +210,7 @@ func (mds *Datastore) loadChanges(
return
}

rows, err = mds.db.QueryContext(ctx, sql, args...)
rows, err := mds.db.QueryContext(ctx, sql, args...)
if err != nil {
if errors.Is(err, context.Canceled) {
err = datastore.NewWatchCanceledErr()
Expand Down Expand Up @@ -265,7 +284,130 @@ func (mds *Datastore) loadChanges(
if err = rows.Err(); err != nil {
return
}
return
}

func (mds *Datastore) loadNamespaceChanges(ctx context.Context, afterRevision uint64, newRevision uint64, stagedChanges *common.Changes[revisions.TransactionIDRevision, uint64]) (err error) {
sql, args, err := mds.QueryChangedNamespacesQuery.Where(sq.Or{
sq.And{
sq.Gt{colCreatedTxn: afterRevision},
sq.LtOrEq{colCreatedTxn: newRevision},
},
sq.And{
sq.Gt{colDeletedTxn: afterRevision},
sq.LtOrEq{colDeletedTxn: newRevision},
},
}).ToSql()
if err != nil {
return fmt.Errorf("unable to prepare changes SQL: %w", err)
}

rows, err := mds.db.QueryContext(ctx, sql, args...)
if err != nil {
if errors.Is(err, context.Canceled) {
err = datastore.NewWatchCanceledErr()
}
return
}
defer common.LogOnError(ctx, rows.Close)

for rows.Next() {
var createdTxn uint64
var deletedTxn uint64
var config []byte

err = rows.Scan(
&config,
&createdTxn,
&deletedTxn,
)
if err != nil {
return
}
loaded := &core.NamespaceDefinition{}
if err := loaded.UnmarshalVT(config); err != nil {
return fmt.Errorf("unable to parse changed namespace: %w", err)
}

if createdTxn > afterRevision && createdTxn <= newRevision {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the filter here if we're doing so in the SQL query?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The query can return a wider net of rows in the watch window, hence a double check, was implemented earlier for relationships too, also tested without the if statement, datastore tests tend to break in that case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah

if err = stagedChanges.AddChangedDefinition(ctx, revisions.NewForTransactionID(createdTxn), loaded); err != nil {
return
}
}
if deletedTxn > afterRevision && deletedTxn <= newRevision {
if err = stagedChanges.AddDeletedNamespace(ctx, revisions.NewForTransactionID(deletedTxn), loaded.Name); err != nil {
return
}
}
}

if err = rows.Err(); err != nil {
return fmt.Errorf("unable to load changes: %w", err)
}

return
}

func (mds *Datastore) loadCaveatChanges(ctx context.Context, afterRevision uint64, newRevision uint64, stagedChanges *common.Changes[revisions.TransactionIDRevision, uint64]) (err error) {
sql, args, err := mds.QueryChangedCaveatsQuery.Where(sq.Or{
sq.And{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to combine these methods

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, should I do same for loadRelationships and combine that too? (right now loadSchemaChanges is only called when schema watch is enabled)

sq.Gt{colCreatedTxn: afterRevision},
sq.LtOrEq{colCreatedTxn: newRevision},
},
sq.And{
sq.Gt{colDeletedTxn: afterRevision},
sq.LtOrEq{colDeletedTxn: newRevision},
},
}).ToSql()
if err != nil {
return fmt.Errorf("unable to prepare changes SQL: %w", err)
}

rows, err := mds.db.QueryContext(ctx, sql, args...)
if err != nil {
if errors.Is(err, context.Canceled) {
err = datastore.NewWatchCanceledErr()
}
return
}

defer common.LogOnError(ctx, rows.Close)

for rows.Next() {
var createdTxn uint64
var deletedTxn uint64
var config []byte
var name string

err = rows.Scan(
&name,
&config,
&createdTxn,
&deletedTxn,
)
if err != nil {
return fmt.Errorf("unable to parse changed caveat: %w", err)
}
loaded := &core.CaveatDefinition{}
if err := loaded.UnmarshalVT(config); err != nil {
return fmt.Errorf(errUnableToReadConfig, err)
}

if createdTxn > afterRevision && createdTxn <= newRevision {
if err = stagedChanges.AddChangedDefinition(ctx, revisions.NewForTransactionID(createdTxn), loaded); err != nil {
return
}
}
if deletedTxn > afterRevision && deletedTxn <= newRevision {
if err = stagedChanges.AddDeletedCaveat(ctx, revisions.NewForTransactionID(deletedTxn), loaded.Name); err != nil {
return
}
}
}

if err = rows.Err(); err != nil {
return fmt.Errorf("unable to load changes: %w", err)
}

changes, err = stagedChanges.AsRevisionChanges(revisions.TransactionIDKeyLessThanFunc)
return
}
Loading