Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions pulpcore/app/models/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ class AppStatus(BaseModel):
ttl = models.DurationField(null=False)
last_heartbeat = models.DateTimeField(auto_now=True)

def __str__(self) -> str:
return f"<{self.__class__.__name__}[{self.app_type}] {self.name}>"

@property
def online(self) -> bool:
"""
Expand All @@ -90,7 +93,7 @@ def online(self) -> bool:
return self.last_heartbeat >= age_threshold

@property
def missing(self):
def missing(self) -> bool:
"""
Whether an app can be considered 'missing'

Expand All @@ -101,7 +104,7 @@ def missing(self):
"""
return not self.online

def save_heartbeat(self):
def save_heartbeat(self) -> None:
"""
Update the last_heartbeat field to now and save it.

Expand All @@ -113,7 +116,7 @@ def save_heartbeat(self):
"""
self.save(update_fields=["last_heartbeat"])

async def asave_heartbeat(self):
async def asave_heartbeat(self) -> None:
"""
Update the last_heartbeat field to now and save it.

Expand Down
56 changes: 40 additions & 16 deletions pulpcore/tasking/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ def _f(self, *args, **kwargs):


class PulpcoreWorker:
@property
def leader(self):
return self.leader_heartbeat[0] == str(self.app_status.pulp_id)

def __init__(self, auxiliary=False):
# Notification states from several signal handlers
self.shutdown_requested = False
Expand All @@ -97,13 +101,14 @@ def __init__(self, auxiliary=False):
self.task = None
self.name = f"{os.getpid()}@{socket.getfqdn()}"
self.heartbeat_period = timedelta(seconds=settings.WORKER_TTL / 3)
self.last_metric_heartbeat = timezone.now()
self.versions = {app.label: app.version for app in pulp_plugin_configs()}
self.cursor = connection.cursor()
self.app_status = AppStatus.objects.create(
name=self.name, app_type="worker", versions=self.versions
)

self.leader_heartbeat = (None, timezone.now() + self.heartbeat_period)

# This defaults to immediate task cancellation.
# It will be set into the future on moderately graceful worker shutdown,
# and set to None for fully graceful shutdown.
Expand Down Expand Up @@ -160,8 +165,8 @@ def _signal_handler(self, thesignal, frame):
def _pg_notify_handler(self, notification):
if notification.channel == "pulp_worker_wakeup":
if notification.payload == TASK_WAKEUP_UNBLOCK:
# Auxiliary workers don't do this.
self.wakeup_unblock = not self.auxiliary
# Only the leader unblocks.
self.wakeup_unblock = self.leader
elif notification.payload == TASK_WAKEUP_HANDLE:
self.wakeup_handle = True
else:
Expand All @@ -170,13 +175,20 @@ def _pg_notify_handler(self, notification):
self.wakeup_unblock = not self.auxiliary
self.wakeup_handle = True

elif notification.channel == "pulp_worker_metrics_heartbeat":
self.last_metric_heartbeat = datetime.fromisoformat(notification.payload)
elif notification.channel == "pulp_worker_leader_heartbeat":
worker_id, timestamp = notification.payload.split("@")
self.leader_heartbeat = (worker_id, datetime.fromisoformat(timestamp))
elif self.task and notification.channel == "pulp_worker_cancel":
if notification.payload == str(self.task.pk):
self.cancel_task = True

def shutdown(self):
if self.leader:
# Announce new election by timing out this heartbeat.
self.cursor.execute(
"SELECT pg_notify('pulp_worker_leader_heartbeat', %s)",
(f"{self.app_status.pulp_id}@{str(timezone.now() - self.heartbeat_period)}",),
)
self.app_status.delete()
_logger.info(_("Worker %s was shut down."), self.name)

Expand Down Expand Up @@ -256,8 +268,6 @@ def record_unblocked_waiting_tasks_metric(self, now):
unblocked_tasks_stats["longest_unblocked_waiting_time"].seconds
)

self.cursor.execute(f"NOTIFY pulp_worker_metrics_heartbeat, '{str(now)}'")

def beat(self):
now = timezone.now()
if self.app_status.last_heartbeat < now - self.heartbeat_period:
Expand All @@ -268,15 +278,29 @@ def beat(self):
self.ignored_task_countdown = IGNORED_TASKS_CLEANUP_INTERVAL
self.cleanup_ignored_tasks()
if not self.auxiliary:
self.worker_cleanup_countdown -= 1
if self.worker_cleanup_countdown <= 0:
self.worker_cleanup_countdown = WORKER_CLEANUP_INTERVAL
self.app_worker_cleanup()
if self.leader:
_logger.debug("Send leader heartbeat.")
self.cursor.execute(
"SELECT pg_notify('pulp_worker_leader_heartbeat', %s)",
(f"{self.app_status.pulp_id}@{str(now + 3 * self.heartbeat_period)}",),
)

self.dispatch_scheduled_tasks()

self.dispatch_scheduled_tasks()
self.worker_cleanup_countdown -= 1
if self.worker_cleanup_countdown <= 0:
self.worker_cleanup_countdown = WORKER_CLEANUP_INTERVAL
self.app_worker_cleanup()

if self.otel_enabled and now > self.last_metric_heartbeat + self.heartbeat_period:
self.record_unblocked_waiting_tasks_metric(now)
if self.otel_enabled:
self.record_unblocked_waiting_tasks_metric(now)
else:
if self.leader_heartbeat[1] < now:
_logger.info("Attempt to become leader.")
self.cursor.execute(
"SELECT pg_notify('pulp_worker_leader_heartbeat', %s)",
(f"{self.app_status.pulp_id}@{str(now + 3 * self.heartbeat_period)}",),
)

def notify_workers(self, reason):
self.cursor.execute("SELECT pg_notify('pulp_worker_wakeup', %s)", (reason,))
Expand Down Expand Up @@ -616,7 +640,7 @@ def run(self, burst=False):
# Subscribe to pgsql channels
connection.connection.add_notify_handler(self._pg_notify_handler)
self.cursor.execute("LISTEN pulp_worker_cancel")
self.cursor.execute("LISTEN pulp_worker_metrics_heartbeat")
self.cursor.execute("LISTEN pulp_worker_leader_heartbeat")
if burst:
if not self.auxiliary:
# Attempt to flush the task queue completely.
Expand All @@ -636,6 +660,6 @@ def run(self, burst=False):
# rest until notified to wakeup
self.sleep()
self.cursor.execute("UNLISTEN pulp_worker_wakeup")
self.cursor.execute("UNLISTEN pulp_worker_metrics_heartbeat")
self.cursor.execute("UNLISTEN pulp_worker_leader_heartbeat")
self.cursor.execute("UNLISTEN pulp_worker_cancel")
self.shutdown()
Loading