From 71e2540484dc8315e3a345be1b899491efba48df Mon Sep 17 00:00:00 2001 From: San Date: Tue, 24 Dec 2024 23:10:43 +0900 Subject: [PATCH 1/2] Replace _tsignal_stopping.wait() with wait_for_stop() - Introduced new wait_for_stop() method in WorkerClass for better encapsulation - Updated all direct calls to _tsignal_stopping.wait() to use wait_for_stop() - Updated documentation to reflect the new method - Added code formatting improvements and whitespace cleanup --- README.md | 2 +- docs/api.md | 5 +++-- docs/usage.md | 4 ++-- examples/stock_monitor_simple.py | 2 +- src/tsignal/contrib/extensions/property.py | 3 +++ .../contrib/patterns/worker/decorators.py | 21 +++++++++++++++---- src/tsignal/core.py | 19 ++++++++--------- 7 files changed, 36 insertions(+), 20 deletions(-) diff --git a/README.md b/README.md index ebca47f..bd870ee 100644 --- a/README.md +++ b/README.md @@ -147,7 +147,7 @@ class DataProcessor: async def run(self, *args, **kwargs): # The main entry point for the worker thread’s event loop # Wait for tasks or stopping signal - await self._tsignal_stopping.wait() + await self.wait_for_stop() async def process_data(self, data): # Perform heavy computation in the worker thread diff --git a/docs/api.md b/docs/api.md index 018d129..5ce8aa4 100644 --- a/docs/api.md +++ b/docs/api.md @@ -75,7 +75,7 @@ class Worker: # run is the main entry point in the worker thread print("Worker started with config:", config) # Wait until stop is requested - await self._tsignal_stopping.wait() + await self.wait_for_stop() self.finished.emit() async def do_work(self, data): @@ -201,6 +201,7 @@ Slots can be async. When a signal with an async slot is emitted: - `run(*args, **kwargs)` defines the worker’s main logic. - `queue_task(coro)` schedules coroutines on the worker's event loop. - `stop()` requests a graceful shutdown, causing `run()` to end after `_tsignal_stopping` is triggered. +- `wait_for_stop()` is a coroutine that waits for the worker to stop. **Signature Match for** ``run()``: @@ -262,7 +263,7 @@ class BackgroundWorker: async def run(self): # Just wait until stopped - await self._tsignal_stopping.wait() + await self.wait_for_stop() async def heavy_task(self, data): await asyncio.sleep(2) # Simulate heavy computation diff --git a/docs/usage.md b/docs/usage.md index 21554f1..f7502a5 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -269,7 +269,7 @@ class BackgroundWorker: async def run(self, *args, **kwargs): # The main entry point in the worker thread. # Wait until stopped - await self._tsignal_stopping.wait() + await self.wait_for_stop() async def heavy_task(self, data): await asyncio.sleep(2) # Simulate heavy computation @@ -292,7 +292,7 @@ If `run()` accepts additional parameters, simply provide them to `start()`: ```python async def run(self, config=None): # Use config here - await self._tsignal_stopping.wait() + await self.wait_for_stop() ``` ```python diff --git a/examples/stock_monitor_simple.py b/examples/stock_monitor_simple.py index 61f9521..5a47b8a 100644 --- a/examples/stock_monitor_simple.py +++ b/examples/stock_monitor_simple.py @@ -70,7 +70,7 @@ async def run(self, *args, **kwargs): self._running = True self._update_task = asyncio.create_task(self.update_loop()) # Wait until run() is finished - await self._tsignal_stopping.wait() + await self.wait_for_stop() # Clean up self._running = False diff --git a/src/tsignal/contrib/extensions/property.py b/src/tsignal/contrib/extensions/property.py index ed46fdc..f855953 100644 --- a/src/tsignal/contrib/extensions/property.py +++ b/src/tsignal/contrib/extensions/property.py @@ -93,6 +93,7 @@ def __set_name__(self, owner, name): def __get__(self, obj, objtype=None): if obj is None: return self + if self.fget is None: raise AttributeError("unreadable attribute") @@ -104,6 +105,7 @@ def __get__(self, obj, objtype=None): future = asyncio.run_coroutine_threadsafe( self._get_value(obj), obj._tsignal_loop ) + return future.result() else: return self._get_value_sync(obj) @@ -123,6 +125,7 @@ def __set__(self, obj, value): future = asyncio.run_coroutine_threadsafe( self._set_value(obj, value), obj._tsignal_loop ) + # Wait for completion like slot direct calls return future.result() else: diff --git a/src/tsignal/contrib/patterns/worker/decorators.py b/src/tsignal/contrib/patterns/worker/decorators.py index 9d0360b..8b7c204 100644 --- a/src/tsignal/contrib/patterns/worker/decorators.py +++ b/src/tsignal/contrib/patterns/worker/decorators.py @@ -75,7 +75,7 @@ def result_ready(self): async def run(self, config=None): print("Worker started with config:", config) # Wait until stop is requested - await self._tsignal_stopping.wait() + await self.wait_for_stop() print("Worker finishing...") async def do_work(self, data): @@ -108,9 +108,7 @@ def __init__(self): All operations that access or modify worker's lifecycle state must be performed while holding this lock. """ - self._tsignal_lifecycle_lock = ( - threading.RLock() - ) # Renamed lock for loop and thread + self._tsignal_lifecycle_lock = threading.RLock() self._tsignal_stopping = asyncio.Event() self._tsignal_affinity = object() self._tsignal_process_queue_task = None @@ -120,8 +118,10 @@ def __init__(self): @property def event_loop(self) -> asyncio.AbstractEventLoop: """Returns the worker's event loop""" + if not self._tsignal_loop: raise RuntimeError("Worker not started") + return self._tsignal_loop @t_signal @@ -134,6 +134,7 @@ def stopped(self): async def run(self, *args, **kwargs): """Run the worker.""" + logger.debug("[WorkerClass][run] calling super") super_run = getattr(super(), _WorkerConstants.RUN, None) @@ -161,8 +162,10 @@ async def run(self, *args, **kwargs): async def _process_queue(self): """Process the task queue.""" + while not self._tsignal_stopping.is_set(): coro = await self._tsignal_task_queue.get() + try: await coro except Exception as e: @@ -176,12 +179,14 @@ async def _process_queue(self): async def start_queue(self): """Start the task queue processing. Returns the queue task.""" + self._tsignal_process_queue_task = asyncio.create_task( self._process_queue() ) def queue_task(self, coro): """Method to add a task to the queue""" + if not asyncio.iscoroutine(coro): logger.error( "[WorkerClass][queue_task] Task must be a coroutine object: %s", @@ -196,6 +201,7 @@ def queue_task(self, coro): def start(self, *args, **kwargs): """Start the worker thread.""" + run_coro = kwargs.pop(_WorkerConstants.RUN_CORO, None) if run_coro is not None and not asyncio.iscoroutine(run_coro): @@ -207,6 +213,7 @@ def start(self, *args, **kwargs): def thread_main(): """Thread main function.""" + self._tsignal_task_queue = asyncio.Queue() with self._tsignal_lifecycle_lock: @@ -215,6 +222,7 @@ def thread_main(): async def runner(): """Runner function.""" + self.started.emit() if run_coro is not None: @@ -324,4 +332,9 @@ def move_to_thread(self, target): self._tsignal_affinity, ) + async def wait_for_stop(self): + """Wait for the worker to stop.""" + + await self._tsignal_stopping.wait() + return WorkerClass diff --git a/src/tsignal/core.py b/src/tsignal/core.py index 7fac8af..b0d466d 100644 --- a/src/tsignal/core.py +++ b/src/tsignal/core.py @@ -73,6 +73,7 @@ def is_valid(self): if self.is_weak and isinstance(self.receiver_ref, weakref.ref): return self.receiver_ref() is not None + return True def get_slot_to_call(self): @@ -126,10 +127,7 @@ def wrap(*args, **kwargs): ), ) - return func(*args, **kwargs) - else: - # Call sync function -> return result - return func(*args, **kwargs) + return func(*args, **kwargs) return wrap @@ -303,12 +301,6 @@ def standalone_func(value): ) is_coro_slot = asyncio.iscoroutinefunction(maybe_slot) - is_coro_slot = asyncio.iscoroutinefunction( - receiver_or_slot.__func__ - if hasattr(receiver_or_slot, "__self__") - else receiver_or_slot - ) - if is_bound_method: obj = receiver_or_slot.__self__ @@ -394,6 +386,7 @@ def standalone_func(value): def _cleanup_on_ref_dead(self, ref): """Cleanup connections on weak reference death.""" + # ref is a weak reference to the receiver # Remove connections associated with the dead receiver with self.connections_lock: @@ -448,6 +441,7 @@ def disconnect(self, receiver: object = None, slot: Callable = None) -> int: # No receiver or slot specified, remove all connections. count = len(self.connections) self.connections.clear() + return count original_count = len(self.connections) @@ -496,6 +490,7 @@ def disconnect(self, receiver: object = None, slot: Callable = None) -> int: "[TSignal][disconnect][END] disconnected: %s", disconnected, ) + return disconnected def emit(self, *args, **kwargs): @@ -726,6 +721,7 @@ def wrap(self): if not hasattr(self, f"_{sig_name}"): setattr(self, f"_{sig_name}", TSignal()) + return getattr(self, f"_{sig_name}") return TSignalProperty(wrap, sig_name) @@ -814,6 +810,7 @@ async def wrap(self, *args, **kwargs): future = asyncio.run_coroutine_threadsafe( func(self, *args, **kwargs), self._tsignal_loop ) + return await asyncio.wrap_future(future) return await func(self, *args, **kwargs) @@ -853,6 +850,7 @@ def callback(): future.set_exception(e) self._tsignal_loop.call_soon_threadsafe(callback) + return future.result() return func(self, *args, **kwargs) @@ -938,6 +936,7 @@ def __init__(self, *args, **kwargs): original_init(self, *args, **kwargs) cls.__init__ = __init__ + return cls if cls is None: From ba807f6acfd708413acc3f5e1099ceaef903d61c Mon Sep 17 00:00:00 2001 From: San Date: Tue, 24 Dec 2024 23:14:33 +0900 Subject: [PATCH 2/2] Bump version to 0.4.4 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 80dcfc8..bceb56d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "tsignal" -version = "0.4.3" +version = "0.4.4" description = "A Python Signal-Slot library inspired by Qt" readme = "README.md" requires-python = ">=3.10"