diff --git a/pulpcore/app/models/status.py b/pulpcore/app/models/status.py index 2abfe8c501..dee5ef2ad0 100644 --- a/pulpcore/app/models/status.py +++ b/pulpcore/app/models/status.py @@ -81,6 +81,9 @@ class AppStatus(BaseModel): ttl = models.DurationField(null=False) last_heartbeat = models.DateTimeField(auto_now=True) + def __str__(self) -> str: + return f"<{self.__class__.__name__}[{self.app_type}] {self.name}>" + @property def online(self) -> bool: """ @@ -90,7 +93,7 @@ def online(self) -> bool: return self.last_heartbeat >= age_threshold @property - def missing(self): + def missing(self) -> bool: """ Whether an app can be considered 'missing' @@ -101,7 +104,7 @@ def missing(self): """ return not self.online - def save_heartbeat(self): + def save_heartbeat(self) -> None: """ Update the last_heartbeat field to now and save it. @@ -113,7 +116,7 @@ def save_heartbeat(self): """ self.save(update_fields=["last_heartbeat"]) - async def asave_heartbeat(self): + async def asave_heartbeat(self) -> None: """ Update the last_heartbeat field to now and save it. diff --git a/pulpcore/tasking/worker.py b/pulpcore/tasking/worker.py index 6cce69c7ac..23313b6629 100644 --- a/pulpcore/tasking/worker.py +++ b/pulpcore/tasking/worker.py @@ -83,6 +83,10 @@ def _f(self, *args, **kwargs): class PulpcoreWorker: + @property + def leader(self): + return self.leader_heartbeat[0] == str(self.app_status.pulp_id) + def __init__(self, auxiliary=False): # Notification states from several signal handlers self.shutdown_requested = False @@ -97,13 +101,14 @@ def __init__(self, auxiliary=False): self.task = None self.name = f"{os.getpid()}@{socket.getfqdn()}" self.heartbeat_period = timedelta(seconds=settings.WORKER_TTL / 3) - self.last_metric_heartbeat = timezone.now() self.versions = {app.label: app.version for app in pulp_plugin_configs()} self.cursor = connection.cursor() self.app_status = AppStatus.objects.create( name=self.name, app_type="worker", versions=self.versions ) + self.leader_heartbeat = (None, timezone.now() + self.heartbeat_period) + # This defaults to immediate task cancellation. # It will be set into the future on moderately graceful worker shutdown, # and set to None for fully graceful shutdown. @@ -160,8 +165,8 @@ def _signal_handler(self, thesignal, frame): def _pg_notify_handler(self, notification): if notification.channel == "pulp_worker_wakeup": if notification.payload == TASK_WAKEUP_UNBLOCK: - # Auxiliary workers don't do this. - self.wakeup_unblock = not self.auxiliary + # Only the leader unblocks. + self.wakeup_unblock = self.leader elif notification.payload == TASK_WAKEUP_HANDLE: self.wakeup_handle = True else: @@ -170,13 +175,20 @@ def _pg_notify_handler(self, notification): self.wakeup_unblock = not self.auxiliary self.wakeup_handle = True - elif notification.channel == "pulp_worker_metrics_heartbeat": - self.last_metric_heartbeat = datetime.fromisoformat(notification.payload) + elif notification.channel == "pulp_worker_leader_heartbeat": + worker_id, timestamp = notification.payload.split("@") + self.leader_heartbeat = (worker_id, datetime.fromisoformat(timestamp)) elif self.task and notification.channel == "pulp_worker_cancel": if notification.payload == str(self.task.pk): self.cancel_task = True def shutdown(self): + if self.leader: + # Announce new election by timing out this heartbeat. + self.cursor.execute( + "SELECT pg_notify('pulp_worker_leader_heartbeat', %s)", + (f"{self.app_status.pulp_id}@{str(timezone.now() - self.heartbeat_period)}",), + ) self.app_status.delete() _logger.info(_("Worker %s was shut down."), self.name) @@ -256,8 +268,6 @@ def record_unblocked_waiting_tasks_metric(self, now): unblocked_tasks_stats["longest_unblocked_waiting_time"].seconds ) - self.cursor.execute(f"NOTIFY pulp_worker_metrics_heartbeat, '{str(now)}'") - def beat(self): now = timezone.now() if self.app_status.last_heartbeat < now - self.heartbeat_period: @@ -268,15 +278,29 @@ def beat(self): self.ignored_task_countdown = IGNORED_TASKS_CLEANUP_INTERVAL self.cleanup_ignored_tasks() if not self.auxiliary: - self.worker_cleanup_countdown -= 1 - if self.worker_cleanup_countdown <= 0: - self.worker_cleanup_countdown = WORKER_CLEANUP_INTERVAL - self.app_worker_cleanup() + if self.leader: + _logger.debug("Send leader heartbeat.") + self.cursor.execute( + "SELECT pg_notify('pulp_worker_leader_heartbeat', %s)", + (f"{self.app_status.pulp_id}@{str(now + 3 * self.heartbeat_period)}",), + ) + + self.dispatch_scheduled_tasks() - self.dispatch_scheduled_tasks() + self.worker_cleanup_countdown -= 1 + if self.worker_cleanup_countdown <= 0: + self.worker_cleanup_countdown = WORKER_CLEANUP_INTERVAL + self.app_worker_cleanup() - if self.otel_enabled and now > self.last_metric_heartbeat + self.heartbeat_period: - self.record_unblocked_waiting_tasks_metric(now) + if self.otel_enabled: + self.record_unblocked_waiting_tasks_metric(now) + else: + if self.leader_heartbeat[1] < now: + _logger.info("Attempt to become leader.") + self.cursor.execute( + "SELECT pg_notify('pulp_worker_leader_heartbeat', %s)", + (f"{self.app_status.pulp_id}@{str(now + 3 * self.heartbeat_period)}",), + ) def notify_workers(self, reason): self.cursor.execute("SELECT pg_notify('pulp_worker_wakeup', %s)", (reason,)) @@ -616,7 +640,7 @@ def run(self, burst=False): # Subscribe to pgsql channels connection.connection.add_notify_handler(self._pg_notify_handler) self.cursor.execute("LISTEN pulp_worker_cancel") - self.cursor.execute("LISTEN pulp_worker_metrics_heartbeat") + self.cursor.execute("LISTEN pulp_worker_leader_heartbeat") if burst: if not self.auxiliary: # Attempt to flush the task queue completely. @@ -636,6 +660,6 @@ def run(self, burst=False): # rest until notified to wakeup self.sleep() self.cursor.execute("UNLISTEN pulp_worker_wakeup") - self.cursor.execute("UNLISTEN pulp_worker_metrics_heartbeat") + self.cursor.execute("UNLISTEN pulp_worker_leader_heartbeat") self.cursor.execute("UNLISTEN pulp_worker_cancel") self.shutdown()