Skip to content

Commit 31a77c5

Browse files
authored
Change ALTER RELATION semantics (#1557)
1 parent 2bd8777 commit 31a77c5

File tree

26 files changed

+1438
-704
lines changed

26 files changed

+1438
-704
lines changed

coordinator/mock/mock_coordinator.go

Lines changed: 28 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coordinator/provider/distributions.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,18 @@ func (d *DistributionsServer) AlterDistributedRelation(ctx context.Context, req
9494
return nil, d.impl.AlterDistributedRelation(ctx, req.GetId(), ds)
9595
}
9696

97+
func (d *DistributionsServer) AlterDistributedRelationSchema(ctx context.Context, req *protos.AlterDistributedRelationSchemaRequest) (*emptypb.Empty, error) {
98+
return nil, d.impl.AlterDistributedRelationSchema(ctx, req.GetId(), req.GetRelationName(), req.GetSchemaName())
99+
}
100+
101+
func (d *DistributionsServer) AlterDistributedRelationDistributionKey(ctx context.Context, req *protos.AlterDistributedRelationDistributionKeyRequest) (*emptypb.Empty, error) {
102+
key, err := distributions.DistributionKeyFromProto(req.GetDistributionKey())
103+
if err != nil {
104+
return nil, err
105+
}
106+
return nil, d.impl.AlterDistributedRelationDistributionKey(ctx, req.GetId(), req.GetRelationName(), key)
107+
}
108+
97109
func (d *DistributionsServer) GetDistribution(ctx context.Context, req *protos.GetDistributionRequest) (*protos.GetDistributionReply, error) {
98110
ds, err := d.impl.GetDistribution(ctx, req.GetId())
99111
if err != nil {

pkg/coord/adapter.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -849,6 +849,46 @@ func (a *Adapter) AlterDistributedRelation(ctx context.Context, id string, rel *
849849
return err
850850
}
851851

852+
// AlterDistributedRelationSchema alters the sequence name of a distributed relation.
853+
//
854+
// Parameters:
855+
// - ctx (context.Context): The context for the request.
856+
// - id (string): The ID of the distribution of the relation.
857+
// - relName (string): The name of the relation.
858+
// - schemaName (string): the new schema name for the relation.
859+
//
860+
// Returns:
861+
// - error: An error if the alteration of the distribution's attachments fails, otherwise nil.
862+
func (a *Adapter) AlterDistributedRelationSchema(ctx context.Context, id string, relName string, schemaName string) error {
863+
c := proto.NewDistributionServiceClient(a.conn)
864+
_, err := c.AlterDistributedRelationSchema(ctx, &proto.AlterDistributedRelationSchemaRequest{
865+
Id: id,
866+
RelationName: relName,
867+
SchemaName: schemaName,
868+
})
869+
return err
870+
}
871+
872+
// AlterDistributedRelationDistributionKey alters the distribution key metadata of a distributed relation.
873+
//
874+
// Parameters:
875+
// - ctx (context.Context): The context for the request.
876+
// - id (string): The ID of the distribution of the relation.
877+
// - relName (string): The name of the relation.
878+
// - distributionKey ([]distributions.DistributionKeyEntry): the new distribution key for the relation.
879+
//
880+
// Returns:
881+
// - error: An error if the alteration of the distribution's attachments fails, otherwise nil.
882+
func (a *Adapter) AlterDistributedRelationDistributionKey(ctx context.Context, id string, relName string, distributionKey []distributions.DistributionKeyEntry) error {
883+
c := proto.NewDistributionServiceClient(a.conn)
884+
_, err := c.AlterDistributedRelationDistributionKey(ctx, &proto.AlterDistributedRelationDistributionKeyRequest{
885+
Id: id,
886+
RelationName: relName,
887+
DistributionKey: distributions.DistributionKeyToProto(distributionKey),
888+
})
889+
return err
890+
}
891+
852892
// AlterDistributionDetach detaches a relation from a distribution using the provided ID and relation name.
853893
//
854894
// Parameters:

pkg/coord/clustered_coord.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2101,6 +2101,56 @@ func (qc *ClusteredCoordinator) AlterDistributedRelation(ctx context.Context, id
21012101
})
21022102
}
21032103

2104+
// AlterDistributedRelationSchema changes the schema name of a relation attached to a distribution
2105+
// TODO: unit tests
2106+
func (qc *ClusteredCoordinator) AlterDistributedRelationSchema(ctx context.Context, id string, relName string, schemaName string) error {
2107+
if err := qc.Coordinator.AlterDistributedRelationSchema(ctx, id, relName, schemaName); err != nil {
2108+
return err
2109+
}
2110+
2111+
return qc.traverseRouters(ctx, func(cc *grpc.ClientConn) error {
2112+
cl := proto.NewDistributionServiceClient(cc)
2113+
resp, err := cl.AlterDistributedRelationSchema(context.TODO(), &proto.AlterDistributedRelationSchemaRequest{
2114+
Id: id,
2115+
RelationName: relName,
2116+
SchemaName: schemaName,
2117+
})
2118+
if err != nil {
2119+
return err
2120+
}
2121+
2122+
spqrlog.Zero.Debug().
2123+
Interface("response", resp).
2124+
Msg("alter relation schema response")
2125+
return nil
2126+
})
2127+
}
2128+
2129+
// AlterDistributedRelationSchema changes the distribution key of a relation attached to a distribution
2130+
// TODO: unit tests
2131+
func (qc *ClusteredCoordinator) AlterDistributedRelationDistributionKey(ctx context.Context, id string, relName string, distributionKey []distributions.DistributionKeyEntry) error {
2132+
if err := qc.Coordinator.AlterDistributedRelationDistributionKey(ctx, id, relName, distributionKey); err != nil {
2133+
return err
2134+
}
2135+
2136+
return qc.traverseRouters(ctx, func(cc *grpc.ClientConn) error {
2137+
cl := proto.NewDistributionServiceClient(cc)
2138+
resp, err := cl.AlterDistributedRelationDistributionKey(context.TODO(), &proto.AlterDistributedRelationDistributionKeyRequest{
2139+
Id: id,
2140+
RelationName: relName,
2141+
DistributionKey: distributions.DistributionKeyToProto(distributionKey),
2142+
})
2143+
if err != nil {
2144+
return err
2145+
}
2146+
2147+
spqrlog.Zero.Debug().
2148+
Interface("response", resp).
2149+
Msg("alter relation distribution key response")
2150+
return nil
2151+
})
2152+
}
2153+
21042154
func (qc *ClusteredCoordinator) DropSequence(ctx context.Context, seqName string, force bool) error {
21052155
if err := qc.Coordinator.DropSequence(ctx, seqName, force); err != nil {
21062156
return err

pkg/coord/coord.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,26 @@ func (lc *Coordinator) AlterDistributedRelation(ctx context.Context, id string,
9595
return nil
9696
}
9797

98+
// AlterDistributedRelationDistributionKey implements meta.EntityMgr.
99+
func (lc *Coordinator) AlterDistributedRelationDistributionKey(ctx context.Context, id string, relName string, distributionKey []distributions.DistributionKeyEntry) error {
100+
if id == distributions.REPLICATED {
101+
return fmt.Errorf("setting distribution key is forbidden for reference relations")
102+
}
103+
ds, err := lc.GetDistribution(ctx, id)
104+
if err != nil {
105+
return err
106+
}
107+
if len(ds.ColTypes) != len(distributionKey) {
108+
return fmt.Errorf("cannot alter relation \"%s\" distribution key: numbers of columns mismatch", relName)
109+
}
110+
return lc.qdb.AlterDistributedRelationDistributionKey(ctx, id, relName, distributions.DistributionKeyToDB(distributionKey))
111+
}
112+
113+
// AlterDistributedRelationSchema implements meta.EntityMgr.
114+
func (lc *Coordinator) AlterDistributedRelationSchema(ctx context.Context, id string, relName string, schemaName string) error {
115+
return lc.qdb.AlterDistributedRelationSchema(ctx, id, relName, schemaName)
116+
}
117+
98118
// BatchMoveKeyRange implements meta.EntityMgr.
99119
func (lc *Coordinator) BatchMoveKeyRange(ctx context.Context, req *kr.BatchMoveKeyRange) error {
100120
panic("unimplemented")

pkg/meta/meta.go

Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -400,7 +400,10 @@ func processCreate(ctx context.Context, astmt spqrparser.Statement, mngr EntityM
400400
func processAlter(ctx context.Context, astmt spqrparser.Statement, mngr EntityMgr, cli *clientinteractor.PSQLInteractor) error {
401401
switch stmt := astmt.(type) {
402402
case *spqrparser.AlterDistribution:
403-
return processAlterDistribution(ctx, stmt.Element, mngr, cli)
403+
if stmt.Distribution == nil {
404+
return fmt.Errorf("failed to process 'ALTER DISTRIBUTION' statement: distribution ID is nil")
405+
}
406+
return processAlterDistribution(ctx, stmt.Element, mngr, cli, stmt.Distribution.ID)
404407
default:
405408
return ErrUnknownCoordinatorCommand
406409
}
@@ -416,7 +419,7 @@ func processAlter(ctx context.Context, astmt spqrparser.Statement, mngr EntityMg
416419
//
417420
// Returns:
418421
// - error: An error if the operation fails, otherwise nil.
419-
func processAlterDistribution(ctx context.Context, astmt spqrparser.Statement, mngr EntityMgr, cli *clientinteractor.PSQLInteractor) error {
422+
func processAlterDistribution(ctx context.Context, astmt spqrparser.Statement, mngr EntityMgr, cli *clientinteractor.PSQLInteractor, dsId string) error {
420423
switch stmt := astmt.(type) {
421424
case *spqrparser.AttachRelation:
422425

@@ -426,7 +429,7 @@ func processAlterDistribution(ctx context.Context, astmt spqrparser.Statement, m
426429
rels = append(rels, distributions.DistributedRelationFromSQL(drel))
427430
}
428431

429-
if stmt.Distribution.ID == "default" {
432+
if dsId == "default" {
430433
list, err := mngr.ListDistributions(ctx)
431434
if err != nil {
432435
return spqrerror.New(spqrerror.SPQR_OBJECT_NOT_EXIST, "error while selecting list of distributions")
@@ -437,29 +440,29 @@ func processAlterDistribution(ctx context.Context, astmt spqrparser.Statement, m
437440
if len(list) > 1 {
438441
return spqrerror.New(spqrerror.SPQR_OBJECT_NOT_EXIST, "distributions count not equal one, use FOR DISTRIBUTION syntax")
439442
}
440-
stmt.Distribution.ID = list[0].Id
443+
dsId = list[0].Id
441444
}
442445

443-
selectedDistribId := stmt.Distribution.ID
446+
selectedDistribId := dsId
444447

445448
if err := mngr.AlterDistributionAttach(ctx, selectedDistribId, rels); err != nil {
446449
return cli.ReportError(err)
447450
}
448451

449452
return cli.AlterDistributionAttach(ctx, selectedDistribId, rels)
450453
case *spqrparser.DetachRelation:
451-
if err := mngr.AlterDistributionDetach(ctx, stmt.Distribution.ID, stmt.RelationName); err != nil {
454+
if err := mngr.AlterDistributionDetach(ctx, dsId, stmt.RelationName); err != nil {
452455
return err
453456
}
454-
return cli.AlterDistributionDetach(ctx, stmt.Distribution.ID, stmt.RelationName.String())
457+
return cli.AlterDistributionDetach(ctx, dsId, stmt.RelationName.String())
455458
case *spqrparser.AlterRelation:
456-
if err := mngr.AlterDistributedRelation(ctx, stmt.Distribution.ID, distributions.DistributedRelationFromSQL(stmt.Relation)); err != nil {
459+
if err := mngr.AlterDistributedRelation(ctx, dsId, distributions.DistributedRelationFromSQL(stmt.Relation)); err != nil {
457460
return err
458461
}
459462
qName := rfqn.RelationFQN{RelationName: stmt.Relation.Name, SchemaName: stmt.Relation.SchemaName}
460-
return cli.AlterDistributedRelation(ctx, stmt.Distribution.ID, qName.String())
463+
return cli.AlterDistributedRelation(ctx, dsId, qName.String())
461464
case *spqrparser.DropDefaultShard:
462-
if distribution, err := mngr.GetDistribution(ctx, stmt.Distribution.ID); err != nil {
465+
if distribution, err := mngr.GetDistribution(ctx, dsId); err != nil {
463466
return err
464467
} else {
465468
manager := NewDefaultShardManager(distribution, mngr)
@@ -470,7 +473,7 @@ func processAlterDistribution(ctx context.Context, astmt spqrparser.Statement, m
470473
}
471474
}
472475
case *spqrparser.AlterDefaultShard:
473-
if distribution, err := mngr.GetDistribution(ctx, stmt.Distribution.ID); err != nil {
476+
if distribution, err := mngr.GetDistribution(ctx, dsId); err != nil {
474477
return err
475478
} else {
476479
manager := NewDefaultShardManager(distribution, mngr)
@@ -479,11 +482,43 @@ func processAlterDistribution(ctx context.Context, astmt spqrparser.Statement, m
479482
}
480483
return cli.MakeSimpleResponse(ctx, manager.SuccessCreateResponse(stmt.Shard))
481484
}
485+
case *spqrparser.AlterRelationV2:
486+
return processAlterRelation(ctx, stmt.Element, mngr, cli, dsId, stmt.RelationName)
482487
default:
483488
return ErrUnknownCoordinatorCommand
484489
}
485490
}
486491

492+
// processAlterDistribution processes the given 'ALTER DISTRIBUTION ALTER RELATION' statement and performs the corresponding operation.
493+
//
494+
// Parameters:
495+
// - ctx (context.Context): The context for the operation.
496+
// - astmt (spqrparser.Statement): The alter relation statement to be processed.
497+
// - mngr (EntityMgr): The entity manager for performing the operation.
498+
// - cli (*clientinteractor.PSQLInteractor): The PSQL client interactor for interacting with the PSQL server.
499+
// - dsId (string): ID of the distribution, to which the relation belongs.
500+
// - relName (string): the name of the relation to alter.
501+
//
502+
// Returns:
503+
// - error: An error if the operation fails, otherwise nil.
504+
func processAlterRelation(ctx context.Context, astmt spqrparser.Statement, mngr EntityMgr, cli *clientinteractor.PSQLInteractor, dsId string, relName string) error {
505+
switch stmt := astmt.(type) {
506+
case *spqrparser.AlterRelationSchema:
507+
if err := mngr.AlterDistributedRelationSchema(ctx, dsId, relName, stmt.SchemaName); err != nil {
508+
return err
509+
}
510+
qName := rfqn.RelationFQN{RelationName: relName, SchemaName: stmt.SchemaName}
511+
return cli.AlterDistributedRelation(ctx, dsId, qName.String())
512+
case *spqrparser.AlterRelationDistributionKey:
513+
if err := mngr.AlterDistributedRelationDistributionKey(ctx, dsId, relName, distributions.DistributionKeyFromSQL(stmt.DistributionKey)); err != nil {
514+
return err
515+
}
516+
return cli.AlterDistributedRelation(ctx, dsId, relName)
517+
default:
518+
return fmt.Errorf("unexpected 'ALTER RELATION' request type %T", stmt)
519+
}
520+
}
521+
487522
// TODO : unit tests
488523

489524
// ProcMetadataCommand processes various coordinator commands based on the provided statement.

pkg/mock/meta/mock_meta.go

Lines changed: 28 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)