Skip to content

Commit 38d91a8

Browse files
qiuyuhangpolardb-bot[bot]
authored andcommitted
feat: postpone waiting of sync ddl
Due to the shared-storage architecture, higher requirement for DDL synchronization is needed. Primary needs to acquire locks among all replicas before doing acture operation. But it cause too much waiting on PolarDB PG, some of them are just redundant. For example, primary acqueries DDL lock and do nothing. This happends really often in vacuum. And another case is create new table, truncate temp table are also waiting for the DDL lock, which is unnecessary. In this commit, postpone the waiting of sync ddl till the actual file operations, eg, truncate and unlink. This will reduce a lot of unnecessary waitings. The waiting on the operation of temp table is also ignored. And thanks to the postpone, the waiting time can also be collapse. When the waiting begins, it's likely to have all replicas replayed beyond the point. Add a new GUC polar_enable_sync_ddl_legacy to enable the old style synchronous ddl.
1 parent 3e128b9 commit 38d91a8

File tree

12 files changed

+112
-91
lines changed

12 files changed

+112
-91
lines changed

src/backend/access/transam/xact.c

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2175,6 +2175,8 @@ StartTransaction(void)
21752175
enable_timeout_after(TRANSACTION_TIMEOUT, TransactionTimeout);
21762176

21772177
ShowTransactionState("StartTransaction");
2178+
2179+
Assert(polar_ddl_lock_lsn == InvalidXLogRecPtr);
21782180
}
21792181

21802182

@@ -2290,6 +2292,9 @@ CommitTransaction(void)
22902292
if (!is_parallel_worker)
22912293
PreCommit_CheckForSerializationFailure();
22922294

2295+
if (polar_enable_sync_ddl)
2296+
polar_wait_ddl_lock_for_pending_deletes();
2297+
22932298
/* Prevent cancel/die interrupt while cleaning up */
22942299
HOLD_INTERRUPTS();
22952300

@@ -2448,6 +2453,8 @@ CommitTransaction(void)
24482453
XactTopFullTransactionId = InvalidFullTransactionId;
24492454
nParallelCurrentXids = 0;
24502455

2456+
polar_ddl_lock_lsn = InvalidXLogRecPtr;
2457+
24512458
/*
24522459
* done with commit processing, set current transaction state back to
24532460
* default
@@ -2577,6 +2584,9 @@ PrepareTransaction(void)
25772584
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
25782585
errmsg("cannot PREPARE a transaction that has exported snapshots")));
25792586

2587+
if (polar_enable_sync_ddl)
2588+
polar_wait_ddl_lock_for_pending_deletes();
2589+
25802590
/* Prevent cancel/die interrupt while cleaning up */
25812591
HOLD_INTERRUPTS();
25822592

@@ -2739,6 +2749,8 @@ PrepareTransaction(void)
27392749
XactTopFullTransactionId = InvalidFullTransactionId;
27402750
nParallelCurrentXids = 0;
27412751

2752+
polar_ddl_lock_lsn = InvalidXLogRecPtr;
2753+
27422754
/*
27432755
* done with 1st phase commit processing, set current transaction state
27442756
* back to default
@@ -2964,6 +2976,8 @@ AbortTransaction(void)
29642976
pgstat_report_xact_timestamp(0);
29652977
}
29662978

2979+
polar_ddl_lock_lsn = InvalidXLogRecPtr;
2980+
29672981
/*
29682982
* State remains TRANS_ABORT until CleanupTransaction().
29692983
*/

src/backend/access/transam/xlogutils.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1280,7 +1280,7 @@ polar_get_recovery_bulk_extend_size(BlockNumber target_block, BlockNumber nblock
12801280
/* Avoid acceed maximum possible length */
12811281
bulk_extend_size = Min(MaxBlockNumber - nblocks, bulk_extend_size);
12821282

1283-
/* Extend the relation to blockNum + 1 at least */
1283+
/* Extend the relation to target_block + 1 at least */
12841284
bulk_extend_size = Max(target_block - nblocks + 1, bulk_extend_size);
12851285

12861286
return bulk_extend_size;

src/backend/catalog/storage.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include "catalog/storage.h"
2929
#include "catalog/storage_xlog.h"
3030
#include "miscadmin.h"
31+
#include "replication/syncrep.h"
3132
#include "storage/bulk_write.h"
3233
#include "storage/freespace.h"
3334
#include "storage/smgr.h"
@@ -342,6 +343,9 @@ RelationTruncate(Relation rel, BlockNumber nblocks)
342343
}
343344
}
344345

346+
if (polar_enable_sync_ddl && reln->smgr_rnode.backend == InvalidBackendId)
347+
polar_wait_ddl_lock();
348+
345349
RelationPreTruncate(rel);
346350

347351
/*

src/backend/commands/dbcommands.c

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2214,14 +2214,6 @@ movedb(const char *dbname, const char *tblspcname)
22142214
/* Start new transaction for the remaining work; don't need a snapshot */
22152215
StartTransactionCommand();
22162216

2217-
/*
2218-
* Remove files from the old tablespace
2219-
*/
2220-
if (!rmtree(src_dbpath, true))
2221-
ereport(WARNING,
2222-
(errmsg("some useless files may be left behind in old database directory \"%s\"",
2223-
src_dbpath)));
2224-
22252217
/*
22262218
* Record the filesystem change in XLOG
22272219
*/
@@ -2235,11 +2227,21 @@ movedb(const char *dbname, const char *tblspcname)
22352227
XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_drop_rec));
22362228
XLogRegisterData((char *) &src_tblspcoid, sizeof(Oid));
22372229

2238-
(void) XLogInsert(RM_DBASE_ID,
2239-
XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
2230+
polar_ddl_lock_lsn = XLogInsert(RM_DBASE_ID,
2231+
XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
2232+
if (polar_enable_sync_ddl)
2233+
polar_wait_ddl_lock();
22402234
POLAR_RECORD_DB_STATE(xlrec.ntablespaces, xlrec.tablespace_ids, xlrec.db_id, POLAR_DB_DROPED);
22412235
}
22422236

2237+
/*
2238+
* Remove files from the old tablespace
2239+
*/
2240+
if (!rmtree(src_dbpath, true))
2241+
ereport(WARNING,
2242+
(errmsg("some useless files may be left behind in old database directory \"%s\"",
2243+
src_dbpath)));
2244+
22432245
/* Now it's safe to release the database lock */
22442246
UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
22452247
AccessExclusiveLock);
@@ -3504,7 +3506,6 @@ static void
35043506
polar_sync_dropdb_wal(Oid db_id)
35053507
{
35063508
xl_dbase_drop_rec xlrec;
3507-
XLogRecPtr polar_recptr;
35083509
Oid *tablespace_ids;
35093510

35103511
Relation rel;
@@ -3565,8 +3566,8 @@ polar_sync_dropdb_wal(Oid db_id)
35653566
XLogRegisterData((char *) &xlrec, MinSizeOfDbaseDropRec);
35663567
XLogRegisterData((char *) tablespace_ids, ntblspc * sizeof(Oid));
35673568

3568-
polar_recptr = XLogInsert(RM_DBASE_ID,
3569-
XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
3569+
polar_ddl_lock_lsn = XLogInsert(RM_DBASE_ID,
3570+
XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
35703571
POLAR_RECORD_DB_STATE(ntblspc, tablespace_ids, db_id, POLAR_DB_DROPED);
35713572

35723573
list_free(ltblspc);
@@ -3575,13 +3576,6 @@ polar_sync_dropdb_wal(Oid db_id)
35753576
table_endscan(scan);
35763577
table_close(rel, AccessShareLock);
35773578

3578-
/*
3579-
* POLAR: synchronous ddl, enable standby lock, wait all replica node
3580-
* replay this log
3581-
*/
35823579
if (polar_enable_sync_ddl)
3583-
{
3584-
XLogFlush(polar_recptr);
3585-
SyncRepWaitForLSN(polar_recptr, false, true);
3586-
}
3580+
polar_wait_ddl_lock();
35873581
}

src/backend/replication/syncrep.c

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,16 @@
8888
#include "utils/ps_status.h"
8989

9090
/* POLAR */
91+
#include "access/xlogutils.h"
92+
#include "catalog/storage.h"
9193
#include "replication/slot.h"
9294

9395
/* User-settable parameters for sync rep */
9496
char *SyncRepStandbyNames;
97+
bool polar_enable_sync_ddl = true;
98+
bool polar_enable_sync_ddl_legacy = false;
99+
100+
XLogRecPtr polar_ddl_lock_lsn = InvalidXLogRecPtr;
95101

96102
#define SyncStandbysDefined() \
97103
(SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
@@ -152,7 +158,7 @@ static bool polar_get_ddl_applyptr(XLogRecPtr *ddl_applyptr, bool *replication_s
152158
* remote_apply, because only commit records provide apply feedback.
153159
*/
154160
void
155-
SyncRepWaitForLSN(XLogRecPtr lsn, bool commit, bool polar_force_wait_apply)
161+
SyncRepWaitForLSN(XLogRecPtr lsn, bool commit, bool sync_ddl)
156162
{
157163
char *new_status = NULL;
158164
const char *old_status;
@@ -180,7 +186,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit, bool polar_force_wait_apply)
180186
if ((!SyncRepRequested() ||
181187
((((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_status) &
182188
(SYNC_STANDBY_INIT | SYNC_STANDBY_DEFINED)) == SYNC_STANDBY_INIT) &&
183-
!polar_force_wait_apply)
189+
!sync_ddl)
184190
return;
185191

186192
/* Cap the level for anything other than commit to remote flush only. */
@@ -193,7 +199,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit, bool polar_force_wait_apply)
193199
* POLAR: when enable shared storage, ddl must be in synchronous mode. We
194200
* use streaming replication standby lock for synchronous ddl.
195201
*/
196-
if (polar_enable_shared_storage_mode && polar_force_wait_apply)
202+
if (polar_enable_shared_storage_mode && sync_ddl)
197203
mode = POLAR_SYNC_DDL_WAIT_APPLY;
198204

199205
Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
@@ -214,7 +220,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit, bool polar_force_wait_apply)
214220
* (SYNC_STANDBY_INIT is not set), fall back to a check based on the LSN,
215221
* then do a direct GUC check.
216222
*/
217-
if (polar_force_wait_apply)
223+
if (sync_ddl)
218224
{
219225
/*
220226
* POLAR: When using synchronous ddl, WalSndCtl's sync_standbys_status
@@ -326,7 +332,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit, bool polar_force_wait_apply)
326332
* We do NOT reset ProcDiePending, so that the process will die after
327333
* the commit is cleaned up.
328334
*/
329-
if (ProcDiePending && !polar_force_wait_apply)
335+
if (ProcDiePending && !sync_ddl)
330336
{
331337
ereport(WARNING,
332338
(errcode(ERRCODE_ADMIN_SHUTDOWN),
@@ -337,7 +343,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit, bool polar_force_wait_apply)
337343
break;
338344
}
339345
/* POLAR: cancel the synchronous ddl. */
340-
else if (ProcDiePending && polar_force_wait_apply)
346+
else if (ProcDiePending && sync_ddl)
341347
{
342348
whereToSendOutput = DestNone;
343349
SyncRepCancelWait();
@@ -353,7 +359,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit, bool polar_force_wait_apply)
353359
* altogether is not helpful, so we just terminate the wait with a
354360
* suitable warning.
355361
*/
356-
if (QueryCancelPending && !polar_force_wait_apply)
362+
if (QueryCancelPending && !sync_ddl)
357363
{
358364
QueryCancelPending = false;
359365
ereport(WARNING,
@@ -367,7 +373,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit, bool polar_force_wait_apply)
367373
* POLAR: cancel the synchronous ddl if a query cancel interrupt
368374
* arrives.
369375
*/
370-
else if (QueryCancelPending && polar_force_wait_apply)
376+
else if (QueryCancelPending && sync_ddl)
371377
{
372378
QueryCancelPending = false;
373379
SyncRepCancelWait();
@@ -1360,3 +1366,33 @@ polar_get_ddl_applyptr(XLogRecPtr *ddl_applyptr, bool *replica_slot_all_active)
13601366
LWLockRelease(ReplicationSlotControlLock);
13611367
return replica_slot_exist;
13621368
}
1369+
1370+
void
1371+
polar_wait_ddl_lock(void)
1372+
{
1373+
if (InRecovery || !polar_enable_shared_storage_mode)
1374+
return;
1375+
1376+
if (XLogRecPtrIsInvalid(polar_ddl_lock_lsn))
1377+
return;
1378+
1379+
XLogFlush(polar_ddl_lock_lsn);
1380+
SyncRepWaitForLSN(polar_ddl_lock_lsn, false, true);
1381+
1382+
polar_ddl_lock_lsn = InvalidXLogRecPtr;
1383+
}
1384+
1385+
void
1386+
polar_wait_ddl_lock_for_pending_deletes(void)
1387+
{
1388+
RelFileNode *rels;
1389+
int nrels;
1390+
1391+
nrels = smgrGetPendingDeletes(true, &rels);
1392+
1393+
if (nrels > 0)
1394+
{
1395+
pfree(rels);
1396+
polar_wait_ddl_lock();
1397+
}
1398+
}

src/backend/storage/ipc/standby.c

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1391,24 +1391,16 @@ LogAccessExclusiveLock(Oid dbOid, Oid relOid)
13911391
{
13921392
xl_standby_lock xlrec;
13931393

1394-
/* POLAR */
1395-
XLogRecPtr polar_sync_recptr;
1396-
13971394
xlrec.xid = GetCurrentTransactionId();
13981395

13991396
xlrec.dbOid = dbOid;
14001397
xlrec.relOid = relOid;
14011398

1402-
polar_sync_recptr = LogAccessExclusiveLocks(1, &xlrec);
1399+
polar_ddl_lock_lsn = LogAccessExclusiveLocks(1, &xlrec);
14031400
MyXactFlags |= XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK;
14041401

1405-
/* POLAR: synchronous ddl, wait all replicas to replay this log */
1406-
if (polar_enable_shared_storage_mode && polar_enable_sync_ddl)
1407-
{
1408-
Assert(!XLogRecPtrIsInvalid(polar_sync_recptr));
1409-
XLogFlush(polar_sync_recptr);
1410-
SyncRepWaitForLSN(polar_sync_recptr, false, true);
1411-
}
1402+
if (polar_enable_sync_ddl_legacy)
1403+
polar_wait_ddl_lock();
14121404
}
14131405

14141406
/*

src/backend/utils/misc/guc.c

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -864,8 +864,6 @@ bool polar_enable_track_network_stat;
864864
bool polar_enable_track_network_timing;
865865
bool polar_enable_alloc_checkinterrupts;
866866

867-
bool polar_enable_sync_ddl;
868-
869867
/* POLAR end */
870868

871869
bool polar_disable_escape_inside_gbk_character;
@@ -1591,7 +1589,7 @@ static struct config_bool ConfigureNamesBool[] =
15911589
},
15921590

15931591
{
1594-
{"polar_enable_sync_ddl", PGC_SIGHUP, REPLICATION_STANDBY,
1592+
{"polar_enable_sync_ddl", PGC_USERSET, REPLICATION_STANDBY,
15951593
gettext_noop("Enable synchronous ddl."),
15961594
NULL,
15971595
GUC_NO_SHOW_ALL | GUC_NO_RESET_ALL | POLAR_GUC_IS_INVISIBLE | POLAR_GUC_IS_UNCHANGABLE
@@ -1601,6 +1599,17 @@ static struct config_bool ConfigureNamesBool[] =
16011599
NULL, NULL, NULL
16021600
},
16031601

1602+
{
1603+
{"polar_enable_sync_ddl_legacy", PGC_USERSET, REPLICATION_STANDBY,
1604+
gettext_noop("Enable old style synchronous ddl."),
1605+
NULL,
1606+
GUC_NO_SHOW_ALL | GUC_NO_RESET_ALL | POLAR_GUC_IS_INVISIBLE | POLAR_GUC_IS_UNCHANGABLE
1607+
},
1608+
&polar_enable_sync_ddl_legacy,
1609+
false,
1610+
NULL, NULL, NULL
1611+
},
1612+
16041613
{
16051614
{"polar_enable_alloc_checkinterrupts", PGC_SIGHUP, UNGROUPED,
16061615
gettext_noop("Enable check interrupt when allocating memory."),

src/include/replication/syncrep.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,13 @@ extern PGDLLIMPORT char *syncrep_parse_error_msg;
8484

8585
/* user-settable parameters for synchronous replication */
8686
extern PGDLLIMPORT char *SyncRepStandbyNames;
87+
extern bool polar_enable_sync_ddl;
88+
extern bool polar_enable_sync_ddl_legacy;
89+
90+
extern XLogRecPtr polar_ddl_lock_lsn;
8791

8892
/* called by user backend */
89-
extern void SyncRepWaitForLSN(XLogRecPtr lsn, bool commit, bool polar_force_wait_apply);
93+
extern void SyncRepWaitForLSN(XLogRecPtr lsn, bool commit, bool sync_ddl);
9094

9195
/* called at backend exit */
9296
extern void SyncRepCleanupAtProcExit(void);
@@ -120,5 +124,7 @@ extern void syncrep_scanner_finish(void);
120124
* POLAR: called by walsender & drop replication slot
121125
*/
122126
extern bool polar_release_ddl_waiters(void);
127+
extern void polar_wait_ddl_lock(void);
128+
extern void polar_wait_ddl_lock_for_pending_deletes(void);
123129

124130
#endif /* _SYNCREP_H */

src/include/utils/guc.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,6 @@ extern bool polar_enable_debug;
326326

327327
extern char *polar_disk_name;
328328
extern char *polar_storage_cluster_name;
329-
extern bool polar_enable_sync_ddl;
330329

331330
/* POLAR end */
332331

src/test/perl/Makefile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ install: all installdirs
2727
$(INSTALL_DATA) $(srcdir)/PostgreSQL/Test/BackgroundPsql.pm '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Test/BackgroundPsql.pm'
2828
$(INSTALL_DATA) $(srcdir)/PostgreSQL/Version.pm '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Version.pm'
2929
$(INSTALL_DATA) $(srcdir)/PolarDB/DCRegression.pm '$(DESTDIR)$(pgxsdir)/$(subdir)/PolarDB/DCRegression.pm'
30+
$(INSTALL_DATA) $(srcdir)/PolarDB/Task.pm '$(DESTDIR)$(pgxsdir)/$(subdir)/PolarDB/Task.pm'
3031

3132
uninstall:
3233
rm -f '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Test/Utils.pm'
@@ -36,5 +37,6 @@ uninstall:
3637
rm -f '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Test/BackgroundPsql.pm'
3738
rm -f '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Version.pm'
3839
rm -f '$(DESTDIR)$(pgxsdir)/$(subdir)/PolarDB/DCRegression.pm'
40+
rm -f '$(DESTDIR)$(pgxsdir)/$(subdir)/PolarDB/Task.pm'
3941

4042
endif

0 commit comments

Comments
 (0)