Skip to content

Replace _tsignal_stopping.wait() with wait_for_stop() #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class DataProcessor:
async def run(self, *args, **kwargs):
# The main entry point for the worker thread’s event loop
# Wait for tasks or stopping signal
await self._tsignal_stopping.wait()
await self.wait_for_stop()

async def process_data(self, data):
# Perform heavy computation in the worker thread
Expand Down
5 changes: 3 additions & 2 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class Worker:
# run is the main entry point in the worker thread
print("Worker started with config:", config)
# Wait until stop is requested
await self._tsignal_stopping.wait()
await self.wait_for_stop()
self.finished.emit()

async def do_work(self, data):
Expand Down Expand Up @@ -201,6 +201,7 @@ Slots can be async. When a signal with an async slot is emitted:
- `run(*args, **kwargs)` defines the worker’s main logic.
- `queue_task(coro)` schedules coroutines on the worker's event loop.
- `stop()` requests a graceful shutdown, causing `run()` to end after `_tsignal_stopping` is triggered.
- `wait_for_stop()` is a coroutine that waits for the worker to stop.

**Signature Match for** ``run()``:

Expand Down Expand Up @@ -262,7 +263,7 @@ class BackgroundWorker:

async def run(self):
# Just wait until stopped
await self._tsignal_stopping.wait()
await self.wait_for_stop()

async def heavy_task(self, data):
await asyncio.sleep(2) # Simulate heavy computation
Expand Down
4 changes: 2 additions & 2 deletions docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ class BackgroundWorker:
async def run(self, *args, **kwargs):
# The main entry point in the worker thread.
# Wait until stopped
await self._tsignal_stopping.wait()
await self.wait_for_stop()

async def heavy_task(self, data):
await asyncio.sleep(2) # Simulate heavy computation
Expand All @@ -292,7 +292,7 @@ If `run()` accepts additional parameters, simply provide them to `start()`:
```python
async def run(self, config=None):
# Use config here
await self._tsignal_stopping.wait()
await self.wait_for_stop()
```

```python
Expand Down
2 changes: 1 addition & 1 deletion examples/stock_monitor_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async def run(self, *args, **kwargs):
self._running = True
self._update_task = asyncio.create_task(self.update_loop())
# Wait until run() is finished
await self._tsignal_stopping.wait()
await self.wait_for_stop()
# Clean up
self._running = False

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "tsignal"
version = "0.4.3"
version = "0.4.4"
description = "A Python Signal-Slot library inspired by Qt"
readme = "README.md"
requires-python = ">=3.10"
Expand Down
3 changes: 3 additions & 0 deletions src/tsignal/contrib/extensions/property.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ def __set_name__(self, owner, name):
def __get__(self, obj, objtype=None):
if obj is None:
return self

if self.fget is None:
raise AttributeError("unreadable attribute")

Expand All @@ -104,6 +105,7 @@ def __get__(self, obj, objtype=None):
future = asyncio.run_coroutine_threadsafe(
self._get_value(obj), obj._tsignal_loop
)

return future.result()
else:
return self._get_value_sync(obj)
Expand All @@ -123,6 +125,7 @@ def __set__(self, obj, value):
future = asyncio.run_coroutine_threadsafe(
self._set_value(obj, value), obj._tsignal_loop
)

# Wait for completion like slot direct calls
return future.result()
else:
Expand Down
21 changes: 17 additions & 4 deletions src/tsignal/contrib/patterns/worker/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def result_ready(self):
async def run(self, config=None):
print("Worker started with config:", config)
# Wait until stop is requested
await self._tsignal_stopping.wait()
await self.wait_for_stop()
print("Worker finishing...")

async def do_work(self, data):
Expand Down Expand Up @@ -108,9 +108,7 @@ def __init__(self):
All operations that access or modify worker's lifecycle state must be
performed while holding this lock.
"""
self._tsignal_lifecycle_lock = (
threading.RLock()
) # Renamed lock for loop and thread
self._tsignal_lifecycle_lock = threading.RLock()
self._tsignal_stopping = asyncio.Event()
self._tsignal_affinity = object()
self._tsignal_process_queue_task = None
Expand All @@ -120,8 +118,10 @@ def __init__(self):
@property
def event_loop(self) -> asyncio.AbstractEventLoop:
"""Returns the worker's event loop"""

if not self._tsignal_loop:
raise RuntimeError("Worker not started")

return self._tsignal_loop

@t_signal
Expand All @@ -134,6 +134,7 @@ def stopped(self):

async def run(self, *args, **kwargs):
"""Run the worker."""

logger.debug("[WorkerClass][run] calling super")

super_run = getattr(super(), _WorkerConstants.RUN, None)
Expand Down Expand Up @@ -161,8 +162,10 @@ async def run(self, *args, **kwargs):

async def _process_queue(self):
"""Process the task queue."""

while not self._tsignal_stopping.is_set():
coro = await self._tsignal_task_queue.get()

try:
await coro
except Exception as e:
Expand All @@ -176,12 +179,14 @@ async def _process_queue(self):

async def start_queue(self):
"""Start the task queue processing. Returns the queue task."""

self._tsignal_process_queue_task = asyncio.create_task(
self._process_queue()
)

def queue_task(self, coro):
"""Method to add a task to the queue"""

if not asyncio.iscoroutine(coro):
logger.error(
"[WorkerClass][queue_task] Task must be a coroutine object: %s",
Expand All @@ -196,6 +201,7 @@ def queue_task(self, coro):

def start(self, *args, **kwargs):
"""Start the worker thread."""

run_coro = kwargs.pop(_WorkerConstants.RUN_CORO, None)

if run_coro is not None and not asyncio.iscoroutine(run_coro):
Expand All @@ -207,6 +213,7 @@ def start(self, *args, **kwargs):

def thread_main():
"""Thread main function."""

self._tsignal_task_queue = asyncio.Queue()

with self._tsignal_lifecycle_lock:
Expand All @@ -215,6 +222,7 @@ def thread_main():

async def runner():
"""Runner function."""

self.started.emit()

if run_coro is not None:
Expand Down Expand Up @@ -324,4 +332,9 @@ def move_to_thread(self, target):
self._tsignal_affinity,
)

async def wait_for_stop(self):
"""Wait for the worker to stop."""

await self._tsignal_stopping.wait()

return WorkerClass
19 changes: 9 additions & 10 deletions src/tsignal/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def is_valid(self):

if self.is_weak and isinstance(self.receiver_ref, weakref.ref):
return self.receiver_ref() is not None

return True

def get_slot_to_call(self):
Expand Down Expand Up @@ -126,10 +127,7 @@ def wrap(*args, **kwargs):
),
)

return func(*args, **kwargs)
else:
# Call sync function -> return result
return func(*args, **kwargs)
return func(*args, **kwargs)

return wrap

Expand Down Expand Up @@ -303,12 +301,6 @@ def standalone_func(value):
)
is_coro_slot = asyncio.iscoroutinefunction(maybe_slot)

is_coro_slot = asyncio.iscoroutinefunction(
receiver_or_slot.__func__
if hasattr(receiver_or_slot, "__self__")
else receiver_or_slot
)

if is_bound_method:
obj = receiver_or_slot.__self__

Expand Down Expand Up @@ -394,6 +386,7 @@ def standalone_func(value):

def _cleanup_on_ref_dead(self, ref):
"""Cleanup connections on weak reference death."""

# ref is a weak reference to the receiver
# Remove connections associated with the dead receiver
with self.connections_lock:
Expand Down Expand Up @@ -448,6 +441,7 @@ def disconnect(self, receiver: object = None, slot: Callable = None) -> int:
# No receiver or slot specified, remove all connections.
count = len(self.connections)
self.connections.clear()

return count

original_count = len(self.connections)
Expand Down Expand Up @@ -496,6 +490,7 @@ def disconnect(self, receiver: object = None, slot: Callable = None) -> int:
"[TSignal][disconnect][END] disconnected: %s",
disconnected,
)

return disconnected

def emit(self, *args, **kwargs):
Expand Down Expand Up @@ -726,6 +721,7 @@ def wrap(self):

if not hasattr(self, f"_{sig_name}"):
setattr(self, f"_{sig_name}", TSignal())

return getattr(self, f"_{sig_name}")

return TSignalProperty(wrap, sig_name)
Expand Down Expand Up @@ -814,6 +810,7 @@ async def wrap(self, *args, **kwargs):
future = asyncio.run_coroutine_threadsafe(
func(self, *args, **kwargs), self._tsignal_loop
)

return await asyncio.wrap_future(future)

return await func(self, *args, **kwargs)
Expand Down Expand Up @@ -853,6 +850,7 @@ def callback():
future.set_exception(e)

self._tsignal_loop.call_soon_threadsafe(callback)

return future.result()

return func(self, *args, **kwargs)
Expand Down Expand Up @@ -938,6 +936,7 @@ def __init__(self, *args, **kwargs):
original_init(self, *args, **kwargs)

cls.__init__ = __init__

return cls

if cls is None:
Expand Down
Loading