Skip to content

Commit b5025bf

Browse files
committed
Introduce cron_shutdown and cron_startup
Normally the main background worker used to start tasks cannot be stopped, because it always auto restarts. `cron_shutdown` function can be used to stop the background worker. `cron_startup` can be then used to start the scheduler again.
1 parent 237c525 commit b5025bf

File tree

2 files changed

+152
-18
lines changed

2 files changed

+152
-18
lines changed

pg_cron--1.6--1.7.sql

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
DROP FUNCTION IF EXISTS cron.shutdown();
2+
CREATE FUNCTION cron.shutdown()
3+
RETURNS bool
4+
LANGUAGE C STRICT
5+
AS 'MODULE_PATHNAME', $$cron_shutdown$$;
6+
COMMENT ON FUNCTION cron.shutdown()
7+
IS 'shutdown pg_cron';
8+
REVOKE ALL ON FUNCTION cron.shutdown() FROM PUBLIC;
9+
10+
DROP FUNCTION IF EXISTS cron.startup();
11+
CREATE FUNCTION cron.startup()
12+
RETURNS bool
13+
LANGUAGE C STRICT
14+
AS 'MODULE_PATHNAME', $$cron_startup$$;
15+
COMMENT ON FUNCTION cron.startup()
16+
IS 'starts up pg_cron after it was shut down';
17+
REVOKE ALL ON FUNCTION cron.startup() FROM PUBLIC;

src/pg_cron.c

Lines changed: 135 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ typedef enum
120120
/* forward declarations */
121121
void _PG_init(void);
122122
void _PG_fini(void);
123+
static void cron_prepare_backgroud_worker(BackgroundWorker* worker);
123124
static void pg_cron_sigterm(SIGNAL_ARGS);
124125
static void pg_cron_sighup(SIGNAL_ARGS);
125126
PGDLLEXPORT void PgCronLauncherMain(Datum arg);
@@ -150,6 +151,10 @@ static bool jobStartupTimeout(CronTask *task, TimestampTz currentTime);
150151
static char* pg_cron_cmdTuples(char *msg);
151152
static void bgw_generate_returned_message(StringInfoData *display_msg, ErrorData edata);
152153

154+
/* SQL-callable functions */
155+
PG_FUNCTION_INFO_V1(cron_shutdown);
156+
PG_FUNCTION_INFO_V1(cron_startup);
157+
153158
/* global settings */
154159
char *CronTableDatabaseName = "postgres";
155160
static bool CronLogStatement = true;
@@ -170,6 +175,9 @@ static bool UseBackgroundWorkers = false;
170175

171176
char *cron_timezone = NULL;
172177

178+
static shmem_request_hook_type prev_shmem_request_hook = NULL;
179+
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
180+
173181
static const struct config_enum_entry cron_message_level_options[] = {
174182
{"debug5", DEBUG5, false},
175183
{"debug4", DEBUG4, false},
@@ -189,6 +197,116 @@ static const struct config_enum_entry cron_message_level_options[] = {
189197

190198
static const char *cron_error_severity(int elevel);
191199

200+
struct scheduler_shared_data_t {
201+
pid_t scheduler_pid;
202+
bool restart_scheduler;
203+
LWLockId lock;
204+
};
205+
206+
static struct scheduler_shared_data_t* scheduler_shared_data = NULL;
207+
208+
Datum
209+
cron_shutdown(PG_FUNCTION_ARGS)
210+
{
211+
bool result = false;
212+
Oid funcid;
213+
pid_t pid;
214+
215+
LWLockAcquire(scheduler_shared_data->lock, LW_EXCLUSIVE);
216+
pid = scheduler_shared_data->scheduler_pid;
217+
scheduler_shared_data->scheduler_pid = 0;
218+
scheduler_shared_data->restart_scheduler = false;
219+
LWLockRelease(scheduler_shared_data->lock);
220+
221+
funcid = fmgr_internal_function("pg_terminate_backend");
222+
if (funcid == InvalidOid)
223+
{
224+
ereport(ERROR, (errmsg("Function pg_terminate_backend not found")));
225+
}
226+
result = DatumGetBool(OidFunctionCall1(funcid, Int32GetDatum(pid)));
227+
PG_RETURN_BOOL(result);
228+
}
229+
230+
Datum
231+
cron_startup(PG_FUNCTION_ARGS)
232+
{
233+
BackgroundWorker worker;
234+
BackgroundWorkerHandle *handle;
235+
BgwHandleStatus status;
236+
bool result = false;
237+
pid_t pid;
238+
239+
LWLockAcquire(scheduler_shared_data->lock, LW_EXCLUSIVE);
240+
pid = scheduler_shared_data->scheduler_pid;
241+
if (pid == 0)
242+
{
243+
cron_prepare_backgroud_worker(&worker);
244+
worker.bgw_notify_pid = MyProcPid;
245+
RegisterDynamicBackgroundWorker(&worker, &handle);
246+
status = WaitForBackgroundWorkerStartup(handle, &pid);
247+
if (status == BGWH_STARTED)
248+
{
249+
scheduler_shared_data->scheduler_pid = pid;
250+
result = true;
251+
}
252+
}
253+
LWLockRelease(scheduler_shared_data->lock);
254+
PG_RETURN_BOOL(result);
255+
}
256+
257+
static void
258+
cron_shmem_request(void)
259+
{
260+
if (prev_shmem_request_hook)
261+
{
262+
prev_shmem_request_hook();
263+
}
264+
265+
RequestNamedLWLockTranche("cron", 1);
266+
}
267+
268+
static void
269+
cron_shmem_startup(void)
270+
{
271+
bool found;
272+
if (prev_shmem_startup_hook)
273+
{
274+
prev_shmem_startup_hook();
275+
}
276+
277+
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
278+
InitShmemIndex();
279+
scheduler_shared_data = (struct scheduler_shared_data_t*)
280+
ShmemInitStruct("cron_scheduler_shared_data", sizeof(struct scheduler_shared_data_t), &found);
281+
if (!found)
282+
{
283+
scheduler_shared_data->scheduler_pid = 0;
284+
scheduler_shared_data->restart_scheduler = true;
285+
scheduler_shared_data->lock = &(GetNamedLWLockTranche("cron"))->lock;
286+
}
287+
LWLockRelease(AddinShmemInitLock);
288+
}
289+
290+
static void
291+
cron_prepare_backgroud_worker(BackgroundWorker* worker)
292+
{
293+
/* set up common data for all our workers */
294+
worker->bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
295+
worker->bgw_start_time = BgWorkerStart_RecoveryFinished;
296+
worker->bgw_restart_time = 1;
297+
#if (PG_VERSION_NUM < 100000)
298+
worker->bgw_main = PgCronLauncherMain;
299+
#endif
300+
worker->bgw_main_arg = Int32GetDatum(0);
301+
worker->bgw_notify_pid = 0;
302+
sprintf(worker->bgw_library_name, "pg_cron");
303+
sprintf(worker->bgw_function_name, "PgCronLauncherMain");
304+
snprintf(worker->bgw_name, BGW_MAXLEN, "pg_cron launcher");
305+
#if (PG_VERSION_NUM >= 110000)
306+
snprintf(worker->bgw_type, BGW_MAXLEN, "pg_cron launcher");
307+
#endif
308+
}
309+
192310
/*
193311
* _PG_init gets called when the extension is loaded.
194312
*/
@@ -212,6 +330,12 @@ _PG_init(void)
212330
/* watch for invalidation events */
213331
CacheRegisterRelcacheCallback(InvalidateJobCacheCallback, (Datum) 0);
214332

333+
prev_shmem_request_hook = shmem_request_hook;
334+
shmem_request_hook = cron_shmem_request;
335+
336+
prev_shmem_startup_hook = shmem_startup_hook;
337+
shmem_startup_hook = cron_shmem_startup;
338+
215339
DefineCustomStringVariable(
216340
"cron.database_name",
217341
gettext_noop("Database in which pg_cron metadata is kept."),
@@ -328,22 +452,7 @@ _PG_init(void)
328452
GUC_SUPERUSER_ONLY,
329453
check_timezone, NULL, NULL);
330454

331-
/* set up common data for all our workers */
332-
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
333-
worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
334-
worker.bgw_restart_time = 1;
335-
#if (PG_VERSION_NUM < 100000)
336-
worker.bgw_main = PgCronLauncherMain;
337-
#endif
338-
worker.bgw_main_arg = Int32GetDatum(0);
339-
worker.bgw_notify_pid = 0;
340-
sprintf(worker.bgw_library_name, "pg_cron");
341-
sprintf(worker.bgw_function_name, "PgCronLauncherMain");
342-
snprintf(worker.bgw_name, BGW_MAXLEN, "pg_cron launcher");
343-
#if (PG_VERSION_NUM >= 110000)
344-
snprintf(worker.bgw_type, BGW_MAXLEN, "pg_cron launcher");
345-
#endif
346-
455+
cron_prepare_backgroud_worker(&worker);
347456
RegisterBackgroundWorker(&worker);
348457
}
349458

@@ -561,6 +670,11 @@ PgCronLauncherMain(Datum arg)
561670
{
562671
MemoryContext CronLoopContext = NULL;
563672
struct rlimit limit;
673+
int retcode;
674+
675+
LWLockAcquire(scheduler_shared_data->lock, LW_EXCLUSIVE);
676+
scheduler_shared_data->scheduler_pid = MyProcPid;
677+
LWLockRelease(scheduler_shared_data->lock);
564678

565679
/* Establish signal handlers before unblocking signals. */
566680
pqsignal(SIGHUP, pg_cron_sighup);
@@ -671,8 +785,11 @@ PgCronLauncherMain(Datum arg)
671785

672786
ereport(LOG, (errmsg("pg_cron scheduler shutting down")));
673787

674-
/* return error code to trigger restart */
675-
proc_exit(1);
788+
/* return error code to trigger restart unless shutting down cron scheduler */
789+
LWLockAcquire(scheduler_shared_data->lock, LW_EXCLUSIVE);
790+
retcode = (int)scheduler_shared_data->restart_scheduler;
791+
LWLockRelease(scheduler_shared_data->lock);
792+
proc_exit((int)retcode);
676793
}
677794

678795

0 commit comments

Comments
 (0)