Skip to content

Commit 5254b5d

Browse files
authored
Add replicated relations to constraints check (#1556)
1 parent eac9972 commit 5254b5d

File tree

9 files changed

+171
-12
lines changed

9 files changed

+171
-12
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ pooler_d_run:
9595
####################### TESTS #######################
9696

9797
unittest:
98-
go test ./cmd/... ./pkg/... ./router/... ./coordinator/... ./yacc/console...
98+
go test -timeout 120s ./cmd/... ./pkg/... ./router/... ./coordinator/... ./yacc/console...
9999
go test -race -count 20 -timeout 30s ./qdb/...
100100

101101
regress_local: proxy_2sh_run

pkg/coord/clustered_coord.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -946,15 +946,31 @@ func (qc *ClusteredCoordinator) checkKeyRangeMove(ctx context.Context, req *kr.B
946946
}
947947
}
948948

949-
deferrable, constraintName, err := datatransfers.CheckConstraints(ctx, sourceConn, rels)
949+
exists, err := qc.qdb.CheckDistribution(ctx, distributions.REPLICATED)
950+
if err != nil {
951+
return fmt.Errorf("error checking for replicated distribution: %s", err)
952+
}
953+
replRels := []string{}
954+
if exists {
955+
replDs, err := qc.GetDistribution(ctx, distributions.REPLICATED)
956+
if err != nil {
957+
return fmt.Errorf("error getting replicated distribution: %s", err)
958+
}
959+
replRels = make([]string, 0, len(replDs.Relations))
960+
for _, r := range replDs.Relations {
961+
replRels = append(replRels, r.GetFullName())
962+
}
963+
}
964+
965+
deferrable, constraintName, err := datatransfers.CheckConstraints(ctx, sourceConn, rels, replRels)
950966
if err != nil {
951967
return spqrerror.Newf(spqrerror.SPQR_TRANSFER_ERROR, "error checking table constraints on source shard: %s", err)
952968
}
953969
if !deferrable {
954970
return spqrerror.Newf(spqrerror.SPQR_TRANSFER_ERROR, "found non-deferrable constraint or constraint referencing non-distributed table on source shard: \"%s\"", constraintName)
955971
}
956972

957-
deferrable, constraintName, err = datatransfers.CheckConstraints(ctx, destConn, rels)
973+
deferrable, constraintName, err = datatransfers.CheckConstraints(ctx, destConn, rels, replRels)
958974
if err != nil {
959975
return spqrerror.Newf(spqrerror.SPQR_TRANSFER_ERROR, "error checking table constraints on destination shard: %s", err)
960976
}

pkg/datatransfers/data_transfers.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -574,22 +574,32 @@ func CheckColumnExists(ctx context.Context, conn *pgx.Conn, relName, schema, col
574574
// Parameters:
575575
// - ctx (context.Context): the context for database operations;
576576
// - conn (*pgx.Conn): the connection to the database;
577-
// - relNames (string): the list of relations, for which to check for constraints;
577+
// - dsRels (string): the list of distributed relations, for which to check for constraints;
578+
// - rpRels (string): the list of replicated relations, which can also be referred to by distributed relations;
578579
//
579580
// Returns:
580581
// - bool: true if no such constraints are found, false otherwise;
581582
// - string: name of a constraint, if found;
582583
// - error: an error if there was a problem executing the query.
583-
func CheckConstraints(ctx context.Context, conn *pgx.Conn, relNames []string) (bool, string, error) {
584-
if len(relNames) == 0 {
584+
func CheckConstraints(ctx context.Context, conn *pgx.Conn, dsRels []string, rpRels []string) (bool, string, error) {
585+
if len(dsRels) == 0 {
585586
return true, "", nil
586587
}
587-
relOidList := make([]string, len(relNames))
588-
for i, relName := range relNames {
589-
relOidList[i] = fmt.Sprintf("'%s'::regclass::oid", relName)
588+
dsRelOidList := make([]string, len(dsRels))
589+
for i, relName := range dsRels {
590+
dsRelOidList[i] = fmt.Sprintf("'%s'::regclass::oid", relName)
590591
}
591-
relOids := strings.Join(relOidList, ", ")
592-
rows, err := conn.Query(ctx, fmt.Sprintf(`SELECT conname FROM pg_constraint WHERE conrelid IN (%s) and confrelid != 0 and (condeferrable=false or not (confrelid IN (%s))) LIMIT 1`, relOids, relOids))
592+
dsRelOids := strings.Join(dsRelOidList, ", ")
593+
rpRelsClause := ""
594+
if len(rpRels) > 0 {
595+
refRelOidList := make([]string, len(rpRels))
596+
for i, relName := range rpRels {
597+
refRelOidList[i] = fmt.Sprintf("'%s'::regclass::oid", relName)
598+
}
599+
rpRelOids := strings.Join(refRelOidList, ", ")
600+
rpRelsClause = fmt.Sprintf(" and not (confrelid IN (%s))", rpRelOids)
601+
}
602+
rows, err := conn.Query(ctx, fmt.Sprintf(`SELECT conname FROM pg_constraint WHERE conrelid IN (%s) and confrelid != 0 and (condeferrable=false or not (confrelid IN (%s)))%s LIMIT 1`, dsRelOids, dsRelOids, rpRelsClause))
593603
if err != nil {
594604
return false, "", err
595605
}

pkg/mock/pgx/mock_pgxconn_iface.go

Lines changed: 51 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

qdb/etcdqdb.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1319,6 +1319,20 @@ func (q *EtcdQDB) GetDistribution(ctx context.Context, id string) (*Distribution
13191319
return distrib, nil
13201320
}
13211321

1322+
// TODO : unit tests
1323+
func (q *EtcdQDB) CheckDistribution(ctx context.Context, id string) (bool, error) {
1324+
spqrlog.Zero.Debug().
1325+
Str("id", id).
1326+
Msg("etcdqdb: get distribution by id")
1327+
1328+
resp, err := q.cli.Get(ctx, distributionNodePath(id), clientv3.WithCountOnly())
1329+
if err != nil {
1330+
return false, err
1331+
}
1332+
1333+
return resp.Count == 1, nil
1334+
}
1335+
13221336
// TODO : unit tests
13231337
func (q *EtcdQDB) GetRelationDistribution(ctx context.Context, relName *rfqn.RelationFQN) (*Distribution, error) {
13241338
spqrlog.Zero.Debug().

qdb/memqdb.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -845,6 +845,16 @@ func (q *MemQDB) GetDistribution(_ context.Context, id string) (*Distribution, e
845845
}
846846
}
847847

848+
// TODO : unit tests
849+
func (q *MemQDB) CheckDistribution(_ context.Context, id string) (bool, error) {
850+
spqrlog.Zero.Debug().Str("id", id).Msg("memqdb: check distribution")
851+
q.mu.RLock()
852+
defer q.mu.RUnlock()
853+
854+
_, ok := q.Distributions[id]
855+
return ok, nil
856+
}
857+
848858
func (q *MemQDB) GetRelationDistribution(_ context.Context, relation *rfqn.RelationFQN) (*Distribution, error) {
849859
spqrlog.Zero.Debug().Str("relation", relation.RelationName).Msg("memqdb: get distribution for table")
850860
q.mu.RLock()

qdb/mock/qdb.go

Lines changed: 30 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

qdb/qdb.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ type QDB interface {
7070
ListDistributions(ctx context.Context) ([]*Distribution, error)
7171
DropDistribution(ctx context.Context, id string) error
7272
GetDistribution(ctx context.Context, id string) (*Distribution, error)
73-
// TODO: fix this by passing FQRN (fully qualified relation name (+schema))
73+
CheckDistribution(ctx context.Context, id string) (bool, error)
7474
GetRelationDistribution(ctx context.Context, relation *rfqn.RelationFQN) (*Distribution, error)
7575

7676
// Reference relations

test/feature/features/redistribute_check.feature

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,3 +310,31 @@ Feature: Redistribution test
310310
"""
311311
found non-deferrable constraint or constraint referencing non-distributed table on destination shard: "xmove_pkey_fkey"
312312
"""
313+
314+
Scenario: REDISTRIBUTE KEY RANGE allows constraints on reference tables
315+
When I execute SQL on host "coordinator"
316+
"""
317+
CREATE KEY RANGE kr1 FROM 0 ROUTE TO sh1 FOR DISTRIBUTION ds1;
318+
CREATE REFERENCE TABLE ref;
319+
"""
320+
Then command return code should be "0"
321+
322+
When I run SQL on host "shard1"
323+
"""
324+
CREATE TABLE ref(id INT PRIMARY KEY);
325+
CREATE TABLE xMove(w_id INT, ext_id INT REFERENCES ref(id));
326+
"""
327+
Then command return code should be "0"
328+
329+
When I run SQL on host "shard2"
330+
"""
331+
CREATE TABLE ref(id INT PRIMARY KEY);
332+
CREATE TABLE xMove(w_id INT, ext_id INT REFERENCES ref(id));
333+
"""
334+
Then command return code should be "0"
335+
336+
When I run SQL on host "coordinator" with timeout "150" seconds
337+
"""
338+
REDISTRIBUTE KEY RANGE kr1 TO sh2 CHECK;
339+
"""
340+
Then command return code should be "0"

0 commit comments

Comments
 (0)