diff --git a/pg_cron--1.6--1.7.sql b/pg_cron--1.6--1.7.sql new file mode 100644 index 0000000..4eea46c --- /dev/null +++ b/pg_cron--1.6--1.7.sql @@ -0,0 +1,17 @@ +DROP FUNCTION IF EXISTS cron.shutdown(); +CREATE FUNCTION cron.shutdown() + RETURNS bool + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$cron_shutdown$$; +COMMENT ON FUNCTION cron.shutdown() + IS 'shutdown pg_cron'; +REVOKE ALL ON FUNCTION cron.shutdown() FROM PUBLIC; + +DROP FUNCTION IF EXISTS cron.startup(); +CREATE FUNCTION cron.startup() + RETURNS bool + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$cron_startup$$; +COMMENT ON FUNCTION cron.startup() + IS 'starts up pg_cron after it was shut down'; +REVOKE ALL ON FUNCTION cron.startup() FROM PUBLIC; diff --git a/src/pg_cron.c b/src/pg_cron.c index 0588c21..7e3dbad 100644 --- a/src/pg_cron.c +++ b/src/pg_cron.c @@ -120,6 +120,7 @@ typedef enum /* forward declarations */ void _PG_init(void); void _PG_fini(void); +static void cron_prepare_backgroud_worker(BackgroundWorker* worker); static void pg_cron_sigterm(SIGNAL_ARGS); static void pg_cron_sighup(SIGNAL_ARGS); PGDLLEXPORT void PgCronLauncherMain(Datum arg); @@ -150,6 +151,10 @@ static bool jobStartupTimeout(CronTask *task, TimestampTz currentTime); static char* pg_cron_cmdTuples(char *msg); static void bgw_generate_returned_message(StringInfoData *display_msg, ErrorData edata); +/* SQL-callable functions */ +PG_FUNCTION_INFO_V1(cron_shutdown); +PG_FUNCTION_INFO_V1(cron_startup); + /* global settings */ char *CronTableDatabaseName = "postgres"; static bool CronLogStatement = true; @@ -170,6 +175,9 @@ static bool UseBackgroundWorkers = false; char *cron_timezone = NULL; +static shmem_request_hook_type prev_shmem_request_hook = NULL; +static shmem_startup_hook_type prev_shmem_startup_hook = NULL; + static const struct config_enum_entry cron_message_level_options[] = { {"debug5", DEBUG5, false}, {"debug4", DEBUG4, false}, @@ -189,6 +197,116 @@ static const struct config_enum_entry cron_message_level_options[] = { static const char *cron_error_severity(int elevel); +struct scheduler_shared_data_t { + pid_t scheduler_pid; + bool restart_scheduler; + LWLockId lock; +}; + +static struct scheduler_shared_data_t* scheduler_shared_data = NULL; + +Datum +cron_shutdown(PG_FUNCTION_ARGS) +{ + bool result = false; + Oid funcid; + pid_t pid; + + LWLockAcquire(scheduler_shared_data->lock, LW_EXCLUSIVE); + pid = scheduler_shared_data->scheduler_pid; + scheduler_shared_data->scheduler_pid = 0; + scheduler_shared_data->restart_scheduler = false; + LWLockRelease(scheduler_shared_data->lock); + + funcid = fmgr_internal_function("pg_terminate_backend"); + if (funcid == InvalidOid) + { + ereport(ERROR, (errmsg("Function pg_terminate_backend not found"))); + } + result = DatumGetBool(OidFunctionCall1(funcid, Int32GetDatum(pid))); + PG_RETURN_BOOL(result); +} + +Datum +cron_startup(PG_FUNCTION_ARGS) +{ + BackgroundWorker worker; + BackgroundWorkerHandle *handle; + BgwHandleStatus status; + bool result = false; + pid_t pid; + + LWLockAcquire(scheduler_shared_data->lock, LW_EXCLUSIVE); + pid = scheduler_shared_data->scheduler_pid; + if (pid == 0) + { + cron_prepare_backgroud_worker(&worker); + worker.bgw_notify_pid = MyProcPid; + RegisterDynamicBackgroundWorker(&worker, &handle); + status = WaitForBackgroundWorkerStartup(handle, &pid); + if (status == BGWH_STARTED) + { + scheduler_shared_data->scheduler_pid = pid; + result = true; + } + } + LWLockRelease(scheduler_shared_data->lock); + PG_RETURN_BOOL(result); +} + +static void +cron_shmem_request(void) +{ + if (prev_shmem_request_hook) + { + prev_shmem_request_hook(); + } + + RequestNamedLWLockTranche("cron", 1); +} + +static void +cron_shmem_startup(void) +{ + bool found; + if (prev_shmem_startup_hook) + { + prev_shmem_startup_hook(); + } + + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); + InitShmemIndex(); + scheduler_shared_data = (struct scheduler_shared_data_t*) + ShmemInitStruct("cron_scheduler_shared_data", sizeof(struct scheduler_shared_data_t), &found); + if (!found) + { + scheduler_shared_data->scheduler_pid = 0; + scheduler_shared_data->restart_scheduler = true; + scheduler_shared_data->lock = &(GetNamedLWLockTranche("cron"))->lock; + } + LWLockRelease(AddinShmemInitLock); +} + +static void +cron_prepare_backgroud_worker(BackgroundWorker* worker) +{ + /* set up common data for all our workers */ + worker->bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; + worker->bgw_start_time = BgWorkerStart_RecoveryFinished; + worker->bgw_restart_time = 1; +#if (PG_VERSION_NUM < 100000) + worker->bgw_main = PgCronLauncherMain; +#endif + worker->bgw_main_arg = Int32GetDatum(0); + worker->bgw_notify_pid = 0; + sprintf(worker->bgw_library_name, "pg_cron"); + sprintf(worker->bgw_function_name, "PgCronLauncherMain"); + snprintf(worker->bgw_name, BGW_MAXLEN, "pg_cron launcher"); +#if (PG_VERSION_NUM >= 110000) + snprintf(worker->bgw_type, BGW_MAXLEN, "pg_cron launcher"); +#endif +} + /* * _PG_init gets called when the extension is loaded. */ @@ -212,6 +330,12 @@ _PG_init(void) /* watch for invalidation events */ CacheRegisterRelcacheCallback(InvalidateJobCacheCallback, (Datum) 0); + prev_shmem_request_hook = shmem_request_hook; + shmem_request_hook = cron_shmem_request; + + prev_shmem_startup_hook = shmem_startup_hook; + shmem_startup_hook = cron_shmem_startup; + DefineCustomStringVariable( "cron.database_name", gettext_noop("Database in which pg_cron metadata is kept."), @@ -328,22 +452,7 @@ _PG_init(void) GUC_SUPERUSER_ONLY, check_timezone, NULL, NULL); - /* set up common data for all our workers */ - worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; - worker.bgw_start_time = BgWorkerStart_RecoveryFinished; - worker.bgw_restart_time = 1; -#if (PG_VERSION_NUM < 100000) - worker.bgw_main = PgCronLauncherMain; -#endif - worker.bgw_main_arg = Int32GetDatum(0); - worker.bgw_notify_pid = 0; - sprintf(worker.bgw_library_name, "pg_cron"); - sprintf(worker.bgw_function_name, "PgCronLauncherMain"); - snprintf(worker.bgw_name, BGW_MAXLEN, "pg_cron launcher"); -#if (PG_VERSION_NUM >= 110000) - snprintf(worker.bgw_type, BGW_MAXLEN, "pg_cron launcher"); -#endif - + cron_prepare_backgroud_worker(&worker); RegisterBackgroundWorker(&worker); } @@ -561,6 +670,11 @@ PgCronLauncherMain(Datum arg) { MemoryContext CronLoopContext = NULL; struct rlimit limit; + int retcode; + + LWLockAcquire(scheduler_shared_data->lock, LW_EXCLUSIVE); + scheduler_shared_data->scheduler_pid = MyProcPid; + LWLockRelease(scheduler_shared_data->lock); /* Establish signal handlers before unblocking signals. */ pqsignal(SIGHUP, pg_cron_sighup); @@ -671,8 +785,11 @@ PgCronLauncherMain(Datum arg) ereport(LOG, (errmsg("pg_cron scheduler shutting down"))); - /* return error code to trigger restart */ - proc_exit(1); + /* return error code to trigger restart unless shutting down cron scheduler */ + LWLockAcquire(scheduler_shared_data->lock, LW_EXCLUSIVE); + retcode = (int)scheduler_shared_data->restart_scheduler; + LWLockRelease(scheduler_shared_data->lock); + proc_exit((int)retcode); }