Skip to content

Introduce cron_shutdown function for stopping the main background worker #381

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions pg_cron--1.6--1.7.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
DROP FUNCTION IF EXISTS cron.shutdown();
CREATE FUNCTION cron.shutdown()
RETURNS bool
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$cron_shutdown$$;
COMMENT ON FUNCTION cron.shutdown()
IS 'shutdown pg_cron';
REVOKE ALL ON FUNCTION cron.shutdown() FROM PUBLIC;

DROP FUNCTION IF EXISTS cron.startup();
CREATE FUNCTION cron.startup()
RETURNS bool
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$cron_startup$$;
COMMENT ON FUNCTION cron.startup()
IS 'starts up pg_cron after it was shut down';
REVOKE ALL ON FUNCTION cron.startup() FROM PUBLIC;
153 changes: 135 additions & 18 deletions src/pg_cron.c
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ typedef enum
/* forward declarations */
void _PG_init(void);
void _PG_fini(void);
static void cron_prepare_backgroud_worker(BackgroundWorker* worker);
static void pg_cron_sigterm(SIGNAL_ARGS);
static void pg_cron_sighup(SIGNAL_ARGS);
PGDLLEXPORT void PgCronLauncherMain(Datum arg);
Expand Down Expand Up @@ -150,6 +151,10 @@ static bool jobStartupTimeout(CronTask *task, TimestampTz currentTime);
static char* pg_cron_cmdTuples(char *msg);
static void bgw_generate_returned_message(StringInfoData *display_msg, ErrorData edata);

/* SQL-callable functions */
PG_FUNCTION_INFO_V1(cron_shutdown);
PG_FUNCTION_INFO_V1(cron_startup);

/* global settings */
char *CronTableDatabaseName = "postgres";
static bool CronLogStatement = true;
Expand All @@ -170,6 +175,9 @@ static bool UseBackgroundWorkers = false;

char *cron_timezone = NULL;

static shmem_request_hook_type prev_shmem_request_hook = NULL;
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;

static const struct config_enum_entry cron_message_level_options[] = {
{"debug5", DEBUG5, false},
{"debug4", DEBUG4, false},
Expand All @@ -189,6 +197,116 @@ static const struct config_enum_entry cron_message_level_options[] = {

static const char *cron_error_severity(int elevel);

struct scheduler_shared_data_t {
pid_t scheduler_pid;
bool restart_scheduler;
LWLockId lock;
};

static struct scheduler_shared_data_t* scheduler_shared_data = NULL;

Datum
cron_shutdown(PG_FUNCTION_ARGS)
{
bool result = false;
Oid funcid;
pid_t pid;

LWLockAcquire(scheduler_shared_data->lock, LW_EXCLUSIVE);
pid = scheduler_shared_data->scheduler_pid;
scheduler_shared_data->scheduler_pid = 0;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the code style should be adapted to the rest of the project

(except the Vixie cron logic in entry.c/misc.c)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the style. Let me know if it's OK now.

scheduler_shared_data->restart_scheduler = false;
LWLockRelease(scheduler_shared_data->lock);

funcid = fmgr_internal_function("pg_terminate_backend");
if (funcid == InvalidOid)
{
ereport(ERROR, (errmsg("Function pg_terminate_backend not found")));
}
result = DatumGetBool(OidFunctionCall1(funcid, Int32GetDatum(pid)));
PG_RETURN_BOOL(result);
}

Datum
cron_startup(PG_FUNCTION_ARGS)
{
BackgroundWorker worker;
BackgroundWorkerHandle *handle;
BgwHandleStatus status;
bool result = false;
pid_t pid;

LWLockAcquire(scheduler_shared_data->lock, LW_EXCLUSIVE);
pid = scheduler_shared_data->scheduler_pid;
if (pid == 0)
{
cron_prepare_backgroud_worker(&worker);
worker.bgw_notify_pid = MyProcPid;
RegisterDynamicBackgroundWorker(&worker, &handle);
status = WaitForBackgroundWorkerStartup(handle, &pid);
if (status == BGWH_STARTED)
{
scheduler_shared_data->scheduler_pid = pid;
result = true;
}
}
LWLockRelease(scheduler_shared_data->lock);
PG_RETURN_BOOL(result);
}

static void
cron_shmem_request(void)
{
if (prev_shmem_request_hook)
{
prev_shmem_request_hook();
}

RequestNamedLWLockTranche("cron", 1);
}

static void
cron_shmem_startup(void)
{
bool found;
if (prev_shmem_startup_hook)
{
prev_shmem_startup_hook();
}

LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
InitShmemIndex();
scheduler_shared_data = (struct scheduler_shared_data_t*)
ShmemInitStruct("cron_scheduler_shared_data", sizeof(struct scheduler_shared_data_t), &found);
if (!found)
{
scheduler_shared_data->scheduler_pid = 0;
scheduler_shared_data->restart_scheduler = true;
scheduler_shared_data->lock = &(GetNamedLWLockTranche("cron"))->lock;
}
LWLockRelease(AddinShmemInitLock);
}

static void
cron_prepare_backgroud_worker(BackgroundWorker* worker)
{
/* set up common data for all our workers */
worker->bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
worker->bgw_start_time = BgWorkerStart_RecoveryFinished;
worker->bgw_restart_time = 1;
#if (PG_VERSION_NUM < 100000)
worker->bgw_main = PgCronLauncherMain;
#endif
worker->bgw_main_arg = Int32GetDatum(0);
worker->bgw_notify_pid = 0;
sprintf(worker->bgw_library_name, "pg_cron");
sprintf(worker->bgw_function_name, "PgCronLauncherMain");
snprintf(worker->bgw_name, BGW_MAXLEN, "pg_cron launcher");
#if (PG_VERSION_NUM >= 110000)
snprintf(worker->bgw_type, BGW_MAXLEN, "pg_cron launcher");
#endif
}

/*
* _PG_init gets called when the extension is loaded.
*/
Expand All @@ -212,6 +330,12 @@ _PG_init(void)
/* watch for invalidation events */
CacheRegisterRelcacheCallback(InvalidateJobCacheCallback, (Datum) 0);

prev_shmem_request_hook = shmem_request_hook;
shmem_request_hook = cron_shmem_request;

prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = cron_shmem_startup;

DefineCustomStringVariable(
"cron.database_name",
gettext_noop("Database in which pg_cron metadata is kept."),
Expand Down Expand Up @@ -328,22 +452,7 @@ _PG_init(void)
GUC_SUPERUSER_ONLY,
check_timezone, NULL, NULL);

/* set up common data for all our workers */
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
worker.bgw_restart_time = 1;
#if (PG_VERSION_NUM < 100000)
worker.bgw_main = PgCronLauncherMain;
#endif
worker.bgw_main_arg = Int32GetDatum(0);
worker.bgw_notify_pid = 0;
sprintf(worker.bgw_library_name, "pg_cron");
sprintf(worker.bgw_function_name, "PgCronLauncherMain");
snprintf(worker.bgw_name, BGW_MAXLEN, "pg_cron launcher");
#if (PG_VERSION_NUM >= 110000)
snprintf(worker.bgw_type, BGW_MAXLEN, "pg_cron launcher");
#endif

cron_prepare_backgroud_worker(&worker);
RegisterBackgroundWorker(&worker);
}

Expand Down Expand Up @@ -561,6 +670,11 @@ PgCronLauncherMain(Datum arg)
{
MemoryContext CronLoopContext = NULL;
struct rlimit limit;
int retcode;

LWLockAcquire(scheduler_shared_data->lock, LW_EXCLUSIVE);
scheduler_shared_data->scheduler_pid = MyProcPid;
LWLockRelease(scheduler_shared_data->lock);

/* Establish signal handlers before unblocking signals. */
pqsignal(SIGHUP, pg_cron_sighup);
Expand Down Expand Up @@ -671,8 +785,11 @@ PgCronLauncherMain(Datum arg)

ereport(LOG, (errmsg("pg_cron scheduler shutting down")));

/* return error code to trigger restart */
proc_exit(1);
/* return error code to trigger restart unless shutting down cron scheduler */
LWLockAcquire(scheduler_shared_data->lock, LW_EXCLUSIVE);
retcode = (int)scheduler_shared_data->restart_scheduler;
LWLockRelease(scheduler_shared_data->lock);
proc_exit((int)retcode);
}


Expand Down