diff --git a/taskiq/cli/worker/process_manager.py b/taskiq/cli/worker/process_manager.py index c6633b1..6f1f245 100644 --- a/taskiq/cli/worker/process_manager.py +++ b/taskiq/cli/worker/process_manager.py @@ -2,10 +2,8 @@ import os import signal import sys -from contextlib import suppress from dataclasses import dataclass -from multiprocessing import Event, Process, Queue, current_process -from multiprocessing.synchronize import Event as EventType +from multiprocessing import Process, Queue, current_process from time import sleep from typing import Any, Callable, List, Optional @@ -78,7 +76,6 @@ def handle( logger.debug(f"Process {worker.name} is already terminated.") # Waiting worker shutdown. worker.join() - event: EventType = Event() new_process = Process( target=worker_func, kwargs={"args": args}, @@ -88,7 +85,7 @@ def handle( new_process.start() logger.info(f"Process {new_process.name} restarted with pid {new_process.pid}") workers[self.worker_num] = new_process - _wait_for_worker_startup(new_process, event) + _wait_for_worker_startup(new_process) @dataclass @@ -96,11 +93,10 @@ class ShutdownAction(ProcessActionBase): """This action shuts down process manager loop.""" -def _wait_for_worker_startup(process: Process, event: EventType) -> None: - while process.is_alive(): - with suppress(TimeoutError): - event.wait(0.1) - return +def _wait_for_worker_startup(process: Process) -> None: + while not process.is_alive(): + logger.info(f"Waiting for {process.name} to start...") + sleep(0.1) def schedule_workers_reload( @@ -186,9 +182,7 @@ def __init__( def prepare_workers(self) -> None: """Spawn multiple processes.""" - events: List[EventType] = [] for process in range(self.args.workers): - event = Event() work_proc = Process( target=self.worker_function, kwargs={"args": self.args}, @@ -202,11 +196,10 @@ def prepare_workers(self) -> None: work_proc.pid, ) self.workers.append(work_proc) - events.append(event) # Wait for workers startup - for worker, event in zip(self.workers, events): - _wait_for_worker_startup(worker, event) + for worker in self.workers: + _wait_for_worker_startup(worker) def start(self) -> Optional[int]: # noqa: C901 """