From fbc3c6a983cad2be94a6c2d49eb420337578c048 Mon Sep 17 00:00:00 2001 From: San Date: Sat, 21 Dec 2024 16:07:00 +0900 Subject: [PATCH] feat: Add weak reference and one-shot connection support - Add weak reference support via weak=True in signal connections - Add one-shot connections via one_shot=True that auto-disconnect after first emit - Improve thread safety with internal connection locking - Add extensive documentation for all public APIs - Add new tests for weak references and one-shot functionality - Update examples and documentation to demonstrate new features This commit introduces two major features: 1. Weak reference support allows receivers to be garbage collected 2. One-shot connections automatically disconnect after first successful emit The changes also include significant documentation improvements and additional test coverage. --- CHANGELOG.md | 11 + README.md | 3 + docs/api.md | 32 +- docs/testing.md | 3 +- examples/stock_core.py | 194 +++++- examples/stock_monitor_console.py | 94 ++- examples/stock_monitor_simple.py | 69 ++- examples/stock_monitor_ui.py | 96 ++- src/tsignal/__init__.py | 4 +- src/tsignal/contrib/extensions/__init__.py | 2 + src/tsignal/contrib/extensions/property.py | 92 ++- src/tsignal/contrib/patterns/__init__.py | 1 + .../contrib/patterns/worker/__init__.py | 1 + .../contrib/patterns/worker/decorators.py | 65 +- src/tsignal/core.py | 569 ++++++++++++++++-- tests/__init__.py | 1 + tests/integration/__init__.py | 1 + tests/integration/test_thread_safety.py | 141 +++++ tests/integration/test_threading.py | 6 +- tests/integration/test_with_signal.py | 2 + tests/integration/test_worker.py | 8 +- tests/integration/test_worker_queue.py | 6 +- tests/integration/test_worker_signal.py | 8 +- tests/performance/test_memory.py | 9 +- tests/performance/test_stress.py | 8 +- tests/unit/__init__.py | 2 + tests/unit/test_property.py | 8 +- tests/unit/test_signal.py | 123 +++- tests/unit/test_slot.py | 5 +- tests/unit/test_utils.py | 4 +- tests/unit/test_weak.py | 187 ++++++ 31 files changed, 1590 insertions(+), 165 deletions(-) create mode 100644 tests/integration/test_thread_safety.py create mode 100644 tests/unit/test_weak.py diff --git a/CHANGELOG.md b/CHANGELOG.md index b152c6e..8540df4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,17 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.4.0] - 2024-12-21 + +### Added +- **Weak Reference Support**: Introduced `weak=True` for signal connections to allow automatic disconnection when the receiver is garbage-collected. +- **One-Shot Connections**: Added `one_shot=True` in `connect(...)` to enable automatically disconnecting a slot after its first successful emission call. +- Extended integration tests to cover new `weak` and `one_shot` functionality. + +### Improved +- **Thread Safety**: Strengthened internal locking and concurrency patterns to reduce race conditions in high-load or multi-threaded environments. +- **Documentation**: Updated `readme.md`, `api.md`, and example code sections to explain weak references, one-shot usage, and improved thread-safety details. + ## [0.3.0] - 2024-12-19 ### Changed diff --git a/README.md b/README.md index c825fdc..8763946 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,8 @@ TSignal is a lightweight, pure-Python signal/slot library that provides thread-s - **Flexible Connection Types**: Direct or queued connections, automatically chosen based on the caller and callee threads. - **Worker Thread Pattern**: Simplify background task execution with a built-in worker pattern that provides an event loop and task queue in a dedicated thread. - **Familiar Decorators**: Inspired by Qt’s pattern, `@t_with_signals`, `@t_signal`, and `@t_slot` let you define signals and slots declaratively. +- **Weak Reference**: + - By setting `weak=True` when connecting a slot, the library holds a weak reference to the receiver object. This allows the receiver to be garbage-collected if there are no other strong references to it. Once garbage-collected, the connection is automatically removed, preventing stale references. ## Why TSignal? @@ -100,6 +102,7 @@ asyncio.run(main()) ### Thread Safety and Connection Types TSignal automatically detects whether the signal emission and slot execution occur in the same thread or different threads: +- **Auto Connection**: When connection_type is AUTO_CONNECTION (default), TSignal checks whether the slot is a coroutine function or whether the caller and callee share the same thread affinity. If they are the same thread and slot is synchronous, it uses direct connection. Otherwise, it uses queued connection. - **Direct Connection**: If signal and slot share the same thread affinity, the slot is invoked directly. - **Queued Connection**: If they differ, the call is queued to the slot’s thread/event loop, ensuring thread safety. diff --git a/docs/api.md b/docs/api.md index 9f61ca0..5da18a9 100644 --- a/docs/api.md +++ b/docs/api.md @@ -126,10 +126,12 @@ Represents a signal. Signals are created by `@t_signal` and accessed as class at Connects the signal to a slot. - **Parameters:** - - receiver_or_slot: Either the receiver object and slot method, or just a callable (function/lambda) if slot is None. - - slot: The method in the receiver if a receiver object is provided. - - connection_type: DIRECT_CONNECTION, QUEUED_CONNECTION, or AUTO_CONNECTION. - - AUTO_CONNECTION (default): Determines connection type automatically based on thread affinity and slot type. + - **receiver_or_slot:** Either the receiver object and slot method, or just a callable (function/lambda) if slot is None. + - **slot:** The method in the receiver if a receiver object is provided. + - **connection_type:** DIRECT_CONNECTION, QUEUED_CONNECTION, or AUTO_CONNECTION. + - **AUTO_CONNECTION (default):** Determines connection type automatically based on thread affinity and slot type. + - **weak:** If `True`, the receiver is kept via a weak reference so it can be garbage collected once there are no strong references. The signal automatically removes the connection if the receiver is collected. + - **one_shot:** If `True`, the connection is automatically disconnected after the first successful emit call. This is useful for events that should only notify a slot once. **Examples:** @@ -151,9 +153,29 @@ signal.connect(print) Disconnects a previously connected slot. Returns the number of disconnected connections. +- **Parameters:** + - receiver: The object whose slot is connected. If receiver is None, all receivers are considered. + - slot: The specific slot to disconnect from the signal. If slot is None, all slots for the given receiver (or all connections if receiver is also None) are disconnected. +- **Returns:** The number of connections that were disconnected.- + +**Examples:** +```python +# Disconnect all connections +signal.disconnect() + +# Disconnect all slots from a specific receiver +signal.disconnect(receiver=my_receiver) + +# Disconnect a specific slot from a specific receiver +signal.disconnect(receiver=my_receiver, slot=my_receiver.some_slot) + +# Disconnect a standalone function +signal.disconnect(slot=my_function) +``` + `emit(*args, **kwargs) -> None` -Emits the signal, invoking all connected slots either directly or via the event loop of the slot’s associated thread. +Emits the signal, invoking all connected slots either directly or via the event loop of the slot’s associated thread, depending on the connection type. If a connection is marked one_shot, it is automatically removed right after invocation. `TConnectionType` diff --git a/docs/testing.md b/docs/testing.md index 06c1aec..8665c1c 100644 --- a/docs/testing.md +++ b/docs/testing.md @@ -13,7 +13,8 @@ tests/ │ ├── test_property.py │ ├── test_signal.py │ ├── test_slot.py -│ └── test_utils.py +│ ├── test_utils.py +│ └── test_weak.py ├── integration/ # Integration tests │ ├── __init__.py │ ├── test_async.py diff --git a/examples/stock_core.py b/examples/stock_core.py index 2d7de81..bfab519 100644 --- a/examples/stock_core.py +++ b/examples/stock_core.py @@ -3,7 +3,20 @@ # pylint: disable=no-member """ -Stock monitoring core classes +Stock monitoring core classes and logic. + +This module provides a simulated stock market data generator (`StockService`), +a processor for handling price updates and alerts (`StockProcessor`), and a +view-model class (`StockViewModel`) that can be connected to various UI or CLI +front-ends. + +Usage: + 1. Instantiate `StockService` to generate price data. + 2. Instantiate `StockProcessor` to handle alert conditions and further processing. + 3. Instantiate `StockViewModel` to manage UI-related state or to relay + processed data to the presentation layer. + 4. Connect the signals/slots between these objects to build a reactive flow + that updates real-time stock information and triggers alerts when conditions are met. """ import asyncio @@ -22,6 +35,17 @@ class StockPrice: """ A dataclass to represent stock price data. + + Attributes + ---------- + code : str + The stock ticker symbol (e.g., 'AAPL', 'GOOGL', etc.). + price : float + The current price of the stock. + change : float + The percentage change compared to the previous price (in %). + timestamp : float + A UNIX timestamp representing the moment of this price capture. """ code: str @@ -33,7 +57,32 @@ class StockPrice: @t_with_worker class StockService: """ - Virtual stock price data generator and distributor + Virtual stock price data generator and distributor. + + This class simulates real-time stock price updates by randomly fluctuating + the prices of a predefined list of stock symbols. It runs in its own worker + thread, driven by an asyncio event loop. + + Attributes + ---------- + prices : Dict[str, float] + A mapping of stock code to current price. + last_prices : Dict[str, float] + A mapping of stock code to the previous price (for calculating percentage change). + _running : bool + Indicates whether the price generation loop is active. + _update_task : asyncio.Task, optional + The asyncio task that periodically updates prices. + + Signals + ------- + price_updated + Emitted every time a single stock price is updated. Receives a `StockPrice` object. + + Lifecycle + --------- + - `on_started()` is called after the worker thread starts and before `update_prices()`. + - `on_stopped()` is called when the worker thread is shutting down. """ def __init__(self): @@ -63,7 +112,15 @@ def __init__(self): @property def descriptions(self) -> Dict[str, str]: - """Get the stock descriptions.""" + """ + Get the stock descriptions. + + Returns + ------- + Dict[str, str] + A dictionary mapping stock codes to their descriptive names (e.g. "AAPL": "Apple Inc."). + """ + with self._desc_lock: return dict(self._descriptions) @@ -72,14 +129,22 @@ def price_updated(self): """Signal emitted when stock price is updated""" async def on_started(self): - """Worker initialization""" + """ + Called automatically when the worker thread is started. + + Prepares and launches the asynchronous price update loop. + """ logger.info("[StockService][on_started] started") self._running = True self._update_task = asyncio.create_task(self.update_prices()) async def on_stopped(self): - """Worker shutdown""" + """ + Called automatically when the worker thread is stopped. + + Performs cleanup and cancellation of any active update tasks. + """ logger.info("[StockService][on_stopped] stopped") self._running = False @@ -93,7 +158,13 @@ async def on_stopped(self): pass async def update_prices(self): - """Periodically update stock prices""" + """ + Periodically update stock prices in a loop. + + Randomly perturbs the prices within a small percentage range, then + emits `price_updated` with a new `StockPrice` object for each stock. + """ + while self._running: for code, price in self.prices.items(): self.last_prices[code] = price @@ -124,7 +195,20 @@ async def update_prices(self): @t_with_signals class StockViewModel: """ - UI state manager + UI state manager for stock prices and alerts. + + This class holds the current stock prices and user-defined alert settings, + and provides signals/slots for updating UI layers or notifying other components + about price changes and alerts. + + Attributes + ---------- + current_prices : Dict[str, StockPrice] + The latest known stock prices. + alerts : list[tuple[str, str, float]] + A list of triggered alerts in the form (stock_code, alert_type, current_price). + alert_settings : Dict[str, tuple[Optional[float], Optional[float]]] + A mapping of stock_code to (lower_alert_threshold, upper_alert_threshold). """ def __init__(self): @@ -134,23 +218,44 @@ def __init__(self): @t_signal def prices_updated(self): - """Signal emitted when stock prices are updated""" + """ + Signal emitted when stock prices are updated. + + Receives a dictionary of the form {stock_code: StockPrice}. + """ @t_signal def alert_added(self): - """Signal emitted when a new alert is added""" + """ + Signal emitted when a new alert is added. + + Receives (code, alert_type, current_price). + """ @t_signal def set_alert(self): - """Signal emitted when user requests to set an alert""" + """ + Signal emitted when user requests to set an alert. + + Receives (code, lower, upper). + """ @t_signal def remove_alert(self): - """Signal emitted when user requests to remove an alert""" + """ + Signal emitted when user requests to remove an alert. + + Receives (code). + """ @t_slot def on_price_processed(self, price_data: StockPrice): - """Receive processed stock price data from StockProcessor""" + """ + Receive processed stock price data from StockProcessor. + + Updates the local `current_prices` and notifies listeners that prices + have changed. + """ logger.debug("[StockViewModel][on_price_processed] price_data: %s", price_data) self.current_prices[price_data.code] = price_data @@ -158,7 +263,11 @@ def on_price_processed(self, price_data: StockPrice): @t_slot def on_alert_triggered(self, code: str, alert_type: str, price: float): - """Receive alert trigger from StockProcessor""" + """ + Receive an alert trigger from StockProcessor. + + Appends the alert to `alerts` and emits `alert_added`. + """ self.alerts.append((code, alert_type, price)) self.alert_added.emit(code, alert_type, price) @@ -167,7 +276,12 @@ def on_alert_triggered(self, code: str, alert_type: str, price: float): def on_alert_settings_changed( self, code: str, lower: Optional[float], upper: Optional[float] ): - """Receive alert settings change notification from StockProcessor""" + """ + Receive alert settings change notification from StockProcessor. + + If both lower and upper are None, remove any alert setting for that code. + Otherwise, update or create a new alert setting for that code. + """ if lower is None and upper is None: self.alert_settings.pop(code, None) @@ -178,7 +292,31 @@ def on_alert_settings_changed( @t_with_worker class StockProcessor: """ - Stock price data processor and alert condition checker + Stock price data processor and alert condition checker. + + This class runs in a separate worker thread, receiving price updates from + `StockService` and determining whether alerts should be triggered based on + user-defined thresholds. If an alert condition is met, an `alert_triggered` + signal is emitted. + + Attributes + ---------- + price_alerts : Dict[str, tuple[Optional[float], Optional[float]]] + A mapping of stock_code to (lower_alert_threshold, upper_alert_threshold). + + Signals + ------- + price_processed + Emitted after processing a new price data and optionally triggering alerts. + alert_triggered + Emitted if a stock price crosses its set threshold. + alert_settings_changed + Emitted whenever a stock's alert thresholds are changed. + + Lifecycle + --------- + - `on_started()` is invoked when the worker is fully initialized. + - `on_stopped()` is called upon shutdown/cleanup. """ def __init__(self): @@ -214,14 +352,22 @@ def alert_settings_changed(self): async def on_set_price_alert( self, code: str, lower: Optional[float], upper: Optional[float] ): - """Receive price alert setting request from main thread""" + """ + Receive a price alert setting request from the main thread or UI. + + Updates (or creates) a new alert threshold entry, then emits `alert_settings_changed`. + """ self.price_alerts[code] = (lower, upper) self.alert_settings_changed.emit(code, lower, upper) @t_slot async def on_remove_price_alert(self, code: str): - """Receive price alert removal request from main thread""" + """ + Receive a price alert removal request from the main thread or UI. + + Deletes the alert thresholds for a given code, then emits `alert_settings_changed`. + """ if code in self.price_alerts: del self.price_alerts[code] @@ -229,7 +375,12 @@ async def on_remove_price_alert(self, code: str): @t_slot async def on_price_updated(self, price_data: StockPrice): - """Receive stock price update from StockService""" + """ + Receive stock price updates from the `StockService`. + + Delegates the actual processing to `process_price` via the task queue to + avoid blocking other operations. + """ logger.debug("[StockProcessor][on_price_updated] price_data: %s", price_data) @@ -240,7 +391,12 @@ async def on_price_updated(self, price_data: StockPrice): logger.error("[SLOT] Error in on_price_updated: %s", e, exc_info=True) async def process_price(self, price_data: StockPrice): - """Process stock price data""" + """ + Process the updated price data. + + Checks if the stock meets the alert conditions (e.g., crossing upper/lower limits), + emits `alert_triggered` as needed, then emits `price_processed`. + """ logger.debug("[StockProcessor][process_price] price_data: %s", price_data) diff --git a/examples/stock_monitor_console.py b/examples/stock_monitor_console.py index 879f796..10f0c9b 100644 --- a/examples/stock_monitor_console.py +++ b/examples/stock_monitor_console.py @@ -2,6 +2,18 @@ """ Stock monitor console example. + +This module demonstrates a command-line interface (CLI) for interacting +with the stock monitoring system. It ties together `StockService`, +`StockProcessor`, and `StockViewModel`, showing how signals/slots +flow between them to provide user commands and real-time price updates. + +Usage: + 1. Instantiate the main `StockMonitorCLI` class with references + to the service, processor, and view model. + 2. Run `cli.run()` in an async context to start the CLI loop. + 3. The user can type commands like "stocks", "alert", or "remove" + to manage alerts and display prices. """ import asyncio @@ -18,7 +30,28 @@ @t_with_signals class StockMonitorCLI: - """Stock monitoring CLI interface""" + """ + Stock monitoring CLI interface. + + This class provides a text-based interactive prompt where users can: + - View stock prices + - Set or remove price alerts + - Start/stop showing price updates + - Quit the application + + Attributes + ---------- + service : StockService + The worker responsible for generating stock prices. + processor : StockProcessor + The worker responsible for processing prices and handling alerts. + view_model : StockViewModel + A view-model that stores the latest prices and user alert settings. + showing_prices : bool + Whether the CLI is currently in "showprices" mode, continuously updating prices. + running : bool + Whether the CLI loop is active. + """ def __init__( self, @@ -47,7 +80,19 @@ def print_menu(self): print("================\n") async def get_line_input(self, prompt="Command> "): - """Get line input""" + """ + Get a line of user input asynchronously. + + Parameters + ---------- + prompt : str + The prompt to display before reading user input. + + Returns + ------- + str + The user-inputted line. + """ return await asyncio.get_event_loop().run_in_executor( None, lambda: input(prompt) @@ -55,7 +100,12 @@ async def get_line_input(self, prompt="Command> "): @t_slot def on_prices_updated(self, prices: Dict[str, StockPrice]): - """Process price updates""" + """ + Respond to updated prices in the view model. + + If `showing_prices` is True, prints the current prices and any triggered alerts + to the console without re-displaying the main menu. + """ # If we are in showprices mode, display current prices: if self.showing_prices: @@ -90,7 +140,17 @@ def on_prices_updated(self, prices: Dict[str, StockPrice]): print(alert) async def process_command(self, command: str): - """Process command""" + """ + Process a single user command from the CLI. + + Supported commands: + - stocks + - alert + - remove + - list + - showprices + - quit + """ parts = command.strip().split() @@ -133,6 +193,18 @@ async def process_command(self, command: str): self.view_model.remove_alert.emit(code) print(f"Alert removed for {code}") + elif parts[0] == "list": + if not self.view_model.alert_settings: + print("\nNo alerts currently set.") + else: + print("\nCurrent alerts:") + print(f"{'Code':^6} {'Lower':>10} {'Upper':>10}") + print("-" * 30) + for code, (lower, upper) in sorted( + self.view_model.alert_settings.items() + ): + print(f"{code:<6} ${lower:>9.2f} ${upper:>9.2f}") + elif parts[0] == "showprices": self.showing_prices = True print("Now showing price updates. Press Enter to return to menu.") @@ -145,7 +217,12 @@ async def process_command(self, command: str): print(f"Unknown command: {command}") async def run(self): - """CLI execution""" + """ + Main execution loop for the CLI. + + Connects signals between `service`, `processor`, and `view_model`, + then continuously reads user input until the user exits. + """ logger.debug( "[StockMonitorCLI][run] started current loop: %s %s", @@ -184,7 +261,12 @@ def set_processor_started_true(): self.view_model, self.view_model.on_price_processed ) self.view_model.prices_updated.connect(self, self.on_prices_updated) - + self.view_model.set_alert.connect( + self.processor, self.processor.on_set_price_alert + ) + self.view_model.remove_alert.connect( + self.processor, self.processor.on_remove_price_alert + ) self.processor.alert_triggered.connect( self.view_model, self.view_model.on_alert_triggered ) diff --git a/examples/stock_monitor_simple.py b/examples/stock_monitor_simple.py index 2671ecb..61f9521 100644 --- a/examples/stock_monitor_simple.py +++ b/examples/stock_monitor_simple.py @@ -1,11 +1,16 @@ # examples/stock_monitor_simple.py +# pylint: disable=no-member +# pylint: disable=unused-argument + """ Stock monitor simple example. -""" -# pylint: disable=no-member -# pylint: disable=unused-argument +This module shows a straightforward example of using a worker (`DataWorker`) +to generate data continuously and a display (`DataDisplay`) to process and +log that data on the main thread. It's a minimal demonstration of TSignal's +thread-safe signal/slot invocation. +""" import asyncio import logging @@ -20,7 +25,26 @@ @t_with_worker class DataWorker: - """Data worker""" + """ + A simple data worker that emits incrementing integers every second. + + Attributes + ---------- + _running : bool + Indicates whether the update loop is active. + _update_task : asyncio.Task, optional + The asynchronous task that updates and emits data. + + Signals + ------- + data_processed + Emitted with the incremented integer each time data is processed. + + Lifecycle + --------- + - `run(...)` is called automatically in the worker thread. + - `stop()` stops the worker, cancelling the update loop. + """ def __init__(self): self._running = False @@ -28,10 +52,18 @@ def __init__(self): @t_signal def data_processed(self): - """Signal emitted when data is processed""" + """ + Signal emitted when data is processed. + + Receives an integer count. + """ async def run(self, *args, **kwargs): - """Worker initialization""" + """ + Worker initialization and main event loop. + + Creates the update loop task and waits until the worker is stopped. + """ logger.info("[DataWorker][run] Starting") @@ -51,7 +83,11 @@ async def run(self, *args, **kwargs): pass async def update_loop(self): - """Update loop""" + """ + Periodically emits a counter value. + + Every second, the counter increments and `data_processed` is emitted. + """ count = 0 @@ -66,7 +102,14 @@ async def update_loop(self): @t_with_signals class DataDisplay: - """Data display""" + """ + A display class that receives the processed data from the worker. + + Attributes + ---------- + last_value : int or None + Stores the last received value from the worker. + """ def __init__(self): self.last_value = None @@ -74,7 +117,11 @@ def __init__(self): @t_slot def on_data_processed(self, value): - """Slot called when data is processed""" + """ + Slot called when data is processed. + + Logs the received value and simulates a brief processing delay. + """ current_thread = threading.current_thread() logger.debug( @@ -87,7 +134,9 @@ def on_data_processed(self, value): async def main(): - """Main function""" + """ + Main function demonstrating how to set up and run the worker and display. + """ logger.debug("[Main] Starting in thread: %s", threading.current_thread().name) diff --git a/examples/stock_monitor_ui.py b/examples/stock_monitor_ui.py index 62b5d9e..57c4d8c 100644 --- a/examples/stock_monitor_ui.py +++ b/examples/stock_monitor_ui.py @@ -1,13 +1,18 @@ # examples/stock_monitor_ui.py -""" -Stock monitor UI example. -""" - # pylint: disable=too-many-instance-attributes # pylint: disable=no-member # pylint: disable=unused-argument +""" +Stock monitor UI example. + +Demonstrates integrating TSignal-based signals/slots into a Kivy GUI application. +It showcases a real-time price update loop (`StockService`), an alert/processing +component (`StockProcessor`), and a Kivy-based front-end (`StockView`) for +visualizing and setting stock alerts, all running asynchronously. +""" + import asyncio from typing import Dict @@ -30,7 +35,17 @@ @t_with_signals class StockView(BoxLayout): - """Stock monitor UI view""" + """ + Stock monitor UI view (Kivy layout). + + Displays: + - A status label + - A Start/Stop button + - A dropdown to select stock codes + - Current price and change + - Alert setting/removal inputs + - A display label for triggered alerts + """ def __init__(self, **kwargs): super().__init__(**kwargs) @@ -95,7 +110,12 @@ def __init__(self, **kwargs): self.add_widget(Widget(size_hint_y=1)) def update_prices(self, prices: Dict[str, StockPrice]): - """Update price information""" + """ + Update the displayed price information based on the currently selected stock. + + If the spinner's text matches a code in `prices`, update the price label + and change label. Also shows a status message indicating successful update. + """ if self.stock_spinner.text in prices: price_data = prices[self.stock_spinner.text] @@ -105,13 +125,23 @@ def update_prices(self, prices: Dict[str, StockPrice]): @t_slot def on_alert_added(self, code: str, alert_type: str, price: float): - """Update UI when alert is triggered""" + """ + Slot for handling newly triggered alerts. + + Updates the `alert_label` in the UI to inform the user about the alert. + """ self.alert_label.text = f"ALERT: {code} {alert_type} {price:.2f}" class AsyncKivyApp(App): - """Async Kivy app""" + """ + A Kivy application that integrates with asyncio for background tasks. + + This class sets up the UI (`StockView`), the stock service, processor, + and view model, and wires them together with signals/slots. It also provides + a background task that keeps the UI responsive and handles graceful shutdown. + """ def __init__(self): super().__init__() @@ -124,7 +154,9 @@ def __init__(self): self.async_lib = None def build(self): - """Build the UI""" + """ + Build the UI layout, connect signals, and initialize the main components. + """ self.view = StockView() @@ -168,7 +200,9 @@ def build(self): return self.view def _toggle_service(self, instance): - """Toggle service start/stop""" + """ + Start or stop the StockService and StockProcessor based on the current button state. + """ if instance.text == "Start": self.service.start() @@ -182,7 +216,11 @@ def _toggle_service(self, instance): self.view.status_label.text = "Service stopped" def _set_alert(self, instance): - """Alert setting button handler""" + """ + Handle the "Set Alert" button press. + + Reads the lower/upper thresholds from the text fields and emits `set_alert`. + """ code = self.view.stock_spinner.text lower_str = self.view.lower_input.text.strip() @@ -199,7 +237,11 @@ def _set_alert(self, instance): self.view.alert_label.text = f"Alert set for {code}: lower={lower if lower else 'None'} upper={upper if upper else 'None'}" def _remove_alert(self, instance): - """Alert removal button handler""" + """ + Handle the "Remove Alert" button press. + + Emits `remove_alert` for the currently selected stock code. + """ code = self.view.stock_spinner.text @@ -211,7 +253,11 @@ def _remove_alert(self, instance): self.view.alert_label.text = f"Alert removed for {code}" async def background_task(self): - """Background task""" + """ + Background task that can be used for periodic checks or housekeeping. + + Runs concurrently with the Kivy event loop in async mode. + """ try: while self.background_task_running: @@ -220,15 +266,22 @@ async def background_task(self): pass def on_request_close(self, *args): - """Request close handler""" + """ + Intercept the Kivy window close event to properly shut down. + + Returns True to indicate we handle the closing ourselves. + """ asyncio.create_task(self.cleanup()) return True async def cleanup(self): - """Cleanup""" + """ + Perform a graceful shutdown by stopping background tasks and stopping the app. + """ self.background_task_running = False + for task in self.tasks: if not task.done(): task.cancel() @@ -240,7 +293,14 @@ async def cleanup(self): self.stop() async def async_run(self, async_lib=None): - """Async run""" + """ + Launch the Kivy app in an async context. + + Parameters + ---------- + async_lib : module or None + The async library to use (defaults to `asyncio`). + """ self._async_lib = async_lib or asyncio @@ -250,7 +310,9 @@ async def async_run(self, async_lib=None): async def main(): - """Main function""" + """ + Main entry point for running the Kivy app in an async-friendly manner. + """ Clock.init_async_lib("asyncio") diff --git a/src/tsignal/__init__.py b/src/tsignal/__init__.py index 1598e03..44dac6b 100644 --- a/src/tsignal/__init__.py +++ b/src/tsignal/__init__.py @@ -1,3 +1,5 @@ +# pylint: disable=missing-module-docstring + """ TSignal - Python Signal/Slot Implementation """ @@ -23,5 +25,5 @@ "TConnectionType", "TSignalConstants", "t_signal_log_and_raise_error", - "t_signal_graceful_shutdown", + "t_signal_graceful_shutdown", ] diff --git a/src/tsignal/contrib/extensions/__init__.py b/src/tsignal/contrib/extensions/__init__.py index 7d0faf8..265434d 100644 --- a/src/tsignal/contrib/extensions/__init__.py +++ b/src/tsignal/contrib/extensions/__init__.py @@ -1,3 +1,5 @@ +# pylint: disable=missing-module-docstring + from .property import t_property __all__ = ["t_property"] diff --git a/src/tsignal/contrib/extensions/property.py b/src/tsignal/contrib/extensions/property.py index b6912a1..ed46fdc 100644 --- a/src/tsignal/contrib/extensions/property.py +++ b/src/tsignal/contrib/extensions/property.py @@ -20,7 +20,66 @@ class TProperty(property): """ - A thread-safe property decorator. + A thread-safe property decorator for classes decorated with `@t_with_signals` + or `@t_with_worker`. + + This property ensures that all reads (getter) and writes (setter) occur on the + object's designated event loop, maintaining thread-safety across different threads. + + If the property is accessed from a different thread than the object's thread affinity: + - The operation is automatically dispatched (queued) onto the object's event loop + via `asyncio.run_coroutine_threadsafe`. + - This prevents race conditions that might otherwise occur if multiple threads + tried to read/write the same attribute. + + Parameters + ---------- + fget : callable, optional + The getter function for the property. + fset : callable, optional + The setter function for the property. + fdel : callable, optional + The deleter function for the property (not commonly used). + doc : str, optional + Docstring for this property. + notify : TSignal, optional + A signal to emit when the property value changes. If provided, the signal is + triggered after a successful write operation, and only if the new value is + different from the old value. + + Notes + ----- + - The property’s underlying storage is typically `_private_name` on the instance, + inferred from the property name (e.g. `value` -> `self._value`). + - If `notify` is set, `signal.emit(new_value)` is called whenever the property changes. + - Reading or writing this property from its "home" thread is done synchronously; + from any other thread, TSignal automatically queues the operation in the + object's event loop. + + Example + ------- + @t_with_signals + class Model: + @t_signal + def value_changed(self): + pass + + @t_property(notify=value_changed) + def value(self): + return self._value + + @value.setter + def value(self, new_val): + self._value = new_val + + # Usage: + model = Model() + model.value = 10 # If called from a different thread, it’s queued to model's loop + current_val = model.value # Also thread-safe read + + See Also + -------- + t_with_signals : Decorates a class to enable signal/slot features and thread affinity. """ def __init__(self, fget=None, fset=None, fdel=None, doc=None, notify=None): @@ -112,7 +171,36 @@ def setter(self, fset): def t_property(notify=None): """ - Decorator to create a thread-safe property. + Decorator to create a TProperty-based thread-safe property. + + Parameters + ---------- + notify : TSignal, optional + If provided, this signal is automatically emitted when the property's value changes. + + Returns + ------- + function + A decorator that converts a normal getter function into a TProperty-based property. + + Example + ------- + @t_with_signals + class Example: + @t_signal + def updated(self): + pass + + @t_property(notify=updated) + def data(self): + return self._data + + @data.setter + def data(self, value): + self._data = value + + e = Example() + e.data = 42 # Thread-safe property set; emits 'updated' signal on change """ def decorator(func): diff --git a/src/tsignal/contrib/patterns/__init__.py b/src/tsignal/contrib/patterns/__init__.py index e69de29..7cac770 100644 --- a/src/tsignal/contrib/patterns/__init__.py +++ b/src/tsignal/contrib/patterns/__init__.py @@ -0,0 +1 @@ +# pylint: disable=missing-module-docstring diff --git a/src/tsignal/contrib/patterns/worker/__init__.py b/src/tsignal/contrib/patterns/worker/__init__.py index e69de29..7cac770 100644 --- a/src/tsignal/contrib/patterns/worker/__init__.py +++ b/src/tsignal/contrib/patterns/worker/__init__.py @@ -0,0 +1 @@ +# pylint: disable=missing-module-docstring diff --git a/src/tsignal/contrib/patterns/worker/decorators.py b/src/tsignal/contrib/patterns/worker/decorators.py index 4bfa75a..9d0360b 100644 --- a/src/tsignal/contrib/patterns/worker/decorators.py +++ b/src/tsignal/contrib/patterns/worker/decorators.py @@ -28,7 +28,70 @@ class _WorkerConstants: def t_with_worker(cls): - """Decorator for the worker pattern.""" + """ + Class decorator that adds a worker pattern to the decorated class, allowing it + to run in a dedicated thread with its own asyncio event loop. This is especially + useful for background processing or offloading tasks that should not block the + main thread. + + Features + -------- + - **Dedicated Thread & Event Loop**: The decorated class, once started, runs in + a new thread with its own event loop. + - **Signal/Slot Support**: The worker class can define signals (with `@t_signal`) + and slots (`@t_slot`), enabling event-driven communication. + - **Task Queue**: A built-in asyncio `Queue` is provided for scheduling coroutines + in the worker thread via `queue_task(coro)`. + - **Lifecycle Signals**: Automatically emits `started` and `stopped` signals, + indicating when the worker thread is up and when it has fully terminated. + - **Lifecycle Management**: Methods like `start(...)`, `stop()`, and `move_to_thread(...)` + help manage the worker thread's lifecycle and move other `@t_with_signals` objects + into this worker thread. + + Usage + ----- + 1. Decorate the class with `@t_with_worker`. + 2. Optionally implement an async `run(*args, **kwargs)` method to control + the worker's main logic. This method is run in the worker thread. + 3. Call `start(...)` to launch the thread and event loop. + 4. Use `queue_task(...)` to schedule coroutines on the worker's event loop. + + Example + ------- + @t_with_worker + class BackgroundWorker: + @t_signal + def started(self): + pass + + @t_signal + def stopped(self): + pass + + @t_signal + def result_ready(self): + pass + + async def run(self, config=None): + print("Worker started with config:", config) + # Wait until stop is requested + await self._tsignal_stopping.wait() + print("Worker finishing...") + + async def do_work(self, data): + await asyncio.sleep(2) + self.result_ready.emit(data * 2) + + worker = BackgroundWorker() + worker.start(config={'threads': 4}) + worker.queue_task(worker.do_work(10)) + worker.stop() + + See Also + -------- + t_slot : Decorates methods as thread-safe or async slots. + t_signal : Decorates functions to define signals. + """ class WorkerClass(cls): """ diff --git a/src/tsignal/core.py b/src/tsignal/core.py index bc212f8..1aab025 100644 --- a/src/tsignal/core.py +++ b/src/tsignal/core.py @@ -1,5 +1,11 @@ # src/tsignal/core.py +# pylint: disable=unnecessary-dunder-call +# pylint: disable=too-many-arguments +# pylint: disable=too-many-locals +# pylint: disable=too-many-branches +# pylint: disable=too-many-positional-arguments + """ Implementation of the Signal class for tsignal. @@ -11,10 +17,12 @@ import asyncio import concurrent.futures import contextvars +from dataclasses import dataclass import functools import logging +import weakref import threading -from typing import Callable +from typing import Callable, Optional from tsignal.utils import t_signal_log_and_raise_error logger = logging.getLogger(__name__) @@ -27,6 +35,7 @@ class TSignalConstants: THREAD = "_tsignal_thread" LOOP = "_tsignal_loop" AFFINITY = "_tsignal_affinity" + WEAK_DEFAULT = "_tsignal_weak_default" _tsignal_from_emit = contextvars.ContextVar(TSignalConstants.FROM_EMIT, default=False) @@ -40,6 +49,61 @@ class TConnectionType(Enum): AUTO_CONNECTION = 3 +@dataclass +class TConnection: + """Connection class for signal-slot connections.""" + + receiver_ref: Optional[object] + slot_func: Callable + conn_type: TConnectionType + is_coro_slot: bool + is_bound: bool + is_weak: bool + is_one_shot: bool = False + + def get_receiver(self): + """If receiver_ref is a weakref, return the actual receiver. Otherwise, return the receiver_ref as is.""" + + if self.is_weak and isinstance(self.receiver_ref, weakref.ref): + return self.receiver_ref() + return self.receiver_ref + + def is_valid(self): + """Check if the receiver is alive if it's a weakref.""" + + if self.is_weak and isinstance(self.receiver_ref, weakref.ref): + return self.receiver_ref() is not None + return True + + def get_slot_to_call(self): + """ + Return the slot to call at emit time. + For weakref bound method connections, reconstruct the bound method after recovering the receiver. + For strong reference, it's already a bound method, so return it directly. + For standalone functions, return them directly. + """ + + if not self.is_bound: + # standalone function + return self.slot_func + + receiver = self.get_receiver() + + if receiver is None: + # weak ref is dead + return None + + # Restore bound method + if self.is_weak: + # slot_func is an unbound function, so reconstruct the bound method using __get__ + return self.slot_func.__get__(receiver, type(receiver)) + + # For instance of strong reference, slot_func may be already bound method, + # or it may be bound method at connect time. + # In either case, it can be returned directly. + return self.slot_func + + def _wrap_standalone_function(func, is_coroutine): """Wrap standalone function""" @@ -77,41 +141,141 @@ def _determine_connection_type(conn_type, receiver, owner, is_coro_slot): """ actual_conn_type = conn_type + logger.debug( + "[TSignal][_determine_connection_type] conn_type=%s receiver=%s owner=%s is_coro_slot=%s", + conn_type, + receiver, + owner, + is_coro_slot, + ) + if conn_type == TConnectionType.AUTO_CONNECTION: if is_coro_slot: actual_conn_type = TConnectionType.QUEUED_CONNECTION + logger.debug( + "[TSignal][_determine_connection_type] actual_conn_type=%s reason=is_coro_slot", + actual_conn_type, + ) else: + receiver = receiver() if isinstance(receiver, weakref.ref) else receiver + + is_receiver_valid = receiver is not None + has_thread = hasattr(receiver, TSignalConstants.THREAD) + has_affinity = hasattr(receiver, TSignalConstants.AFFINITY) + has_owner_thread = hasattr(owner, TSignalConstants.THREAD) + has_owner_affinity = hasattr(owner, TSignalConstants.AFFINITY) + if ( - receiver is not None - and hasattr(receiver, TSignalConstants.THREAD) - and hasattr(owner, TSignalConstants.THREAD) - and hasattr(receiver, TSignalConstants.AFFINITY) - and hasattr(owner, TSignalConstants.AFFINITY) + is_receiver_valid + and has_thread + and has_owner_thread + and has_affinity + and has_owner_affinity ): if receiver._tsignal_affinity == owner._tsignal_affinity: actual_conn_type = TConnectionType.DIRECT_CONNECTION + logger.debug( + "[TSignal][_determine_connection_type] actual_conn_type=%s reason=same_thread", + actual_conn_type, + ) else: actual_conn_type = TConnectionType.QUEUED_CONNECTION + logger.debug( + "[TSignal][_determine_connection_type] actual_conn_type=%s reason=different_thread", + actual_conn_type, + ) else: actual_conn_type = TConnectionType.DIRECT_CONNECTION + logger.debug( + "[TSignal][_determine_connection_type] actual_conn_type=%s reason=no_receiver or invalid thread or affinity " + "is_receiver_valid=%s has_thread=%s has_affinity=%s has_owner_thread=%s has_owner_affinity=%s", + actual_conn_type, + is_receiver_valid, + has_thread, + has_affinity, + has_owner_thread, + has_owner_affinity, + ) return actual_conn_type +def _extract_unbound_function(callable_obj): + """ + Extract the unbound function from a bound method. + If the slot is a bound method, return the unbound function (__func__), otherwise return the slot as is. + """ + + return getattr(callable_obj, "__func__", callable_obj) + + class TSignal: """Signal class for tsignal.""" def __init__(self): self.connections = [] self.owner = None + self.connections_lock = threading.RLock() def connect( - self, receiver_or_slot, slot=None, conn_type=TConnectionType.AUTO_CONNECTION + self, + receiver_or_slot, + slot=None, + conn_type=TConnectionType.AUTO_CONNECTION, + weak=None, + one_shot=False, ): """ - Connect signal to a slot with an optional connection type. - If conn_type is AUTO_CONNECTION, the actual type (direct or queued) - is determined at emit time based on threads. + Connect this signal to a slot (callable). The connected slot will be invoked + on each `emit()` call. + + Parameters + ---------- + receiver_or_slot : object or callable + If `slot` is omitted, this can be a standalone callable (function or lambda), + or a bound method. Otherwise, this is treated as the receiver object. + slot : callable, optional + When `receiver_or_slot` is a receiver object, `slot` should be the method + to connect. If both `receiver_or_slot` and `slot` are given, this effectively + connects the signal to the method `slot` of the given `receiver`. + conn_type : TConnectionType, optional + Specifies how the slot is invoked relative to the signal emitter. Defaults to + `TConnectionType.AUTO_CONNECTION`, which automatically determines direct or queued + invocation based on thread affinity and slot type (sync/async). + weak : bool, optional + If `True`, a weak reference to the receiver is stored so the connection + is automatically removed once the receiver is garbage-collected. + If omitted (`None`), the default is determined by the decorator `@t_with_signals` + (i.e., `weak_default`). + one_shot : bool, optional + If `True`, this connection is automatically disconnected right after the + first successful emission. Defaults to `False`. + + Raises + ------ + TypeError + If the provided slot is not callable or if `receiver_or_slot` is not callable + when `slot` is `None`. + AttributeError + If `receiver_or_slot` is `None` while `slot` is provided. + ValueError + If `conn_type` is invalid (not one of AUTO_CONNECTION, DIRECT_CONNECTION, QUEUED_CONNECTION). + + Examples + -------- + # Connect a bound method + signal.connect(receiver, receiver.some_method) + + # Connect a standalone function + def standalone_func(value): + print("Received:", value) + signal.connect(standalone_func) + + # One-shot connection + signal.connect(receiver, receiver.one_time_handler, one_shot=True) + + # Weak reference connection + signal.connect(receiver, receiver.on_event, weak=True) """ logger.debug( @@ -121,6 +285,9 @@ def connect( slot, ) + if weak is None and self.owner is not None: + weak = getattr(self.owner, TSignalConstants.WEAK_DEFAULT, False) + if slot is None: if not callable(receiver_or_slot): t_signal_log_and_raise_error( @@ -130,6 +297,11 @@ def connect( ) receiver = None + is_bound_method = hasattr(receiver_or_slot, "__self__") + maybe_slot = ( + receiver_or_slot.__func__ if is_bound_method else receiver_or_slot + ) + is_coro_slot = asyncio.iscoroutinefunction(maybe_slot) is_coro_slot = asyncio.iscoroutinefunction( receiver_or_slot.__func__ @@ -137,8 +309,9 @@ def connect( else receiver_or_slot ) - if hasattr(receiver_or_slot, "__self__"): + if is_bound_method: obj = receiver_or_slot.__self__ + if hasattr(obj, TSignalConstants.THREAD) and hasattr( obj, TSignalConstants.LOOP ): @@ -156,6 +329,7 @@ def connect( AttributeError, "[TSignal][connect] Receiver cannot be None.", ) + if not callable(slot): t_signal_log_and_raise_error( logger, TypeError, "[TSignal][connect] Slot must be callable." @@ -175,61 +349,223 @@ def connect( ): t_signal_log_and_raise_error(logger, ValueError, "Invalid connection type.") - conn = (receiver, slot, conn_type, is_coro_slot) + is_bound = False + + if hasattr(slot, "__self__") and slot.__self__ is not None: + # It's a bound method + slot_instance = slot.__self__ + slot_func = slot.__func__ + is_bound = True + + if weak and receiver is not None: + receiver_ref = weakref.ref(slot_instance, self._cleanup_on_ref_dead) + conn = TConnection( + receiver_ref, + slot_func, + conn_type, + is_coro_slot, + is_bound, + True, + one_shot, + ) + else: + # strong ref + conn = TConnection( + slot_instance, + slot, + conn_type, + is_coro_slot, + is_bound, + False, + one_shot, + ) + else: + # standalone function or lambda + # weak not applied to function itself, since no receiver + is_bound = False + conn = TConnection( + None, slot, conn_type, is_coro_slot, is_bound, False, one_shot + ) + logger.debug("[TSignal][connect][END] conn=%s", conn) - self.connections.append(conn) + with self.connections_lock: + self.connections.append(conn) + + def _cleanup_on_ref_dead(self, ref): + """Cleanup connections on weak reference death.""" + # ref is a weak reference to the receiver + # Remove connections associated with the dead receiver + with self.connections_lock: + self.connections = [ + conn for conn in self.connections if conn.receiver_ref is not ref + ] def disconnect(self, receiver: object = None, slot: Callable = None) -> int: - """Disconnect signal from slot(s).""" + """ + Disconnects one or more slots from the signal. This method attempts to find and remove + connections that match the given `receiver` and/or `slot`. + + Parameters + ---------- + receiver : object, optional + The receiver object initially connected to the signal. If omitted, matches any receiver. + slot : Callable, optional + The slot (callable) that was connected to the signal. If omitted, matches any slot. + + Returns + ------- + int + The number of connections successfully disconnected. + + Notes + ----- + - If neither `receiver` nor `slot` is specified, all connections are removed. + - If only `receiver` is given (and `slot=None`), all connections involving that receiver will be removed. + - If only `slot` is given (and `receiver=None`), all connections involving that slot are removed. + - If both `receiver` and `slot` are given, only the connections that match both will be removed. + + Example + ------- + Consider a signal connected to multiple slots of a given receiver: + + >>> signal.disconnect(receiver=my_receiver) + # All connections associated with `my_receiver` are removed. + + Or if a specific slot was connected: + + >>> signal.disconnect(slot=my_specific_slot) + # All connections to `my_specific_slot` are removed. + + Passing both `receiver` and `slot`: - if receiver is None and slot is None: - count = len(self.connections) - self.connections.clear() - return count + >>> signal.disconnect(receiver=my_receiver, slot=my_specific_slot) + # Only the connections that match both `my_receiver` and `my_specific_slot` are removed. + """ - original_count = len(self.connections) - new_connections = [] + with self.connections_lock: + if receiver is None and slot is None: + # No receiver or slot specified, remove all connections. + count = len(self.connections) + self.connections.clear() + return count + + original_count = len(self.connections) + new_connections = [] + + logger.debug( + "[TSignal][disconnect][START] receiver: %s slot: %s original_count: %s", + receiver, + slot, + original_count, + ) - for r, s, t, c in self.connections: - # Compare original function and wrapped function for directly connected functions - if r is None and slot is not None: - if getattr(s, "__wrapped__", None) == slot or s == slot: + # In case slot is a bound method, convert it to an unbound function. + slot_unbound = _extract_unbound_function(slot) + + for conn in self.connections: + # If the connection does not reference a receiver (standalone function) + # and a slot is specified, check if the connected func_or_slot matches the given slot. + receiver_match = receiver is None or conn.get_receiver() == receiver + slot_match = ( + slot is None + or conn.slot_func == slot_unbound + or getattr(conn.slot_func, "__wrapped__", None) == slot_unbound + ) + + if receiver_match and slot_match: + # remove this connection + logger.debug( + "[TSignal][disconnect][MATCHED] func: %s receiver_match: %s slot_match: %s", + conn.slot_func, + receiver_match, + slot_match, + ) continue - elif (receiver is None or r == receiver) and (slot is None or s == slot): - continue - new_connections.append((r, s, t, c)) - self.connections = new_connections - disconnected = original_count - len(self.connections) + # If the connection was not matched by the given criteria, keep it. + logger.debug( + "[TSignal][disconnect][NOT MATCHED] func: %s", conn.slot_func + ) + new_connections.append((conn)) - return disconnected + self.connections = new_connections + disconnected = original_count - len(self.connections) + + logger.debug( + "[TSignal][disconnect][END] disconnected: %s", + disconnected, + ) + return disconnected def emit(self, *args, **kwargs): - """Emit signal to connected slots.""" + """ + Emit the signal with the specified arguments. All connected slots will be + invoked, either directly or via their respective event loops, depending on + the connection type and thread affinity. + + Parameters + ---------- + *args : Any + Positional arguments passed on to each connected slot. + **kwargs : Any + Keyword arguments passed on to each connected slot. + + Notes + ----- + - When a connected slot is marked with `is_one_shot=True`, it is automatically + disconnected immediately after being invoked for the first time. + - If a slot was connected with a weak reference (`weak=True`) and its receiver + has been garbage-collected, that connection is skipped and removed from the + internal list of connections. + - If the slot is asynchronous and `conn_type` is `AUTO_CONNECTION`, it typically + uses a queued connection (queued to the slot’s event loop). + - If an exception occurs in a slot, the exception is logged, but does not halt + the emission to other slots. + + Examples + -------- + signal.emit(42, message="Hello") + """ logger.debug("[TSignal][emit][START]") token = _tsignal_from_emit.set(True) + with self.connections_lock: + # copy list to avoid iteration issues during emit + current_conns = list(self.connections) + # pylint: disable=too-many-nested-blocks try: - for receiver, slot, conn_type, is_coro_slot in self.connections: + for conn in current_conns: + if conn.is_bound and not conn.is_valid(): + with self.connections_lock: + if conn in self.connections: + self.connections.remove(conn) + continue + + slot_to_call = conn.get_slot_to_call() + + if slot_to_call is None: + # Unable to call bound method due to receiver GC or other reasons + continue + actual_conn_type = _determine_connection_type( - conn_type, receiver, self.owner, is_coro_slot + conn.conn_type, conn.get_receiver(), self.owner, conn.is_coro_slot ) logger.debug( "[TSignal][emit] slot=%s receiver=%s conn_type=%s", - slot.__name__, - receiver, + getattr(slot_to_call, "__name__", slot_to_call), + conn.get_receiver(), actual_conn_type, ) try: if actual_conn_type == TConnectionType.DIRECT_CONNECTION: logger.debug("[TSignal][emit][DIRECT] calling slot directly") - result = slot(*args, **kwargs) + result = slot_to_call(*args, **kwargs) logger.debug( "[TSignal][emit][DIRECT] result=%s result_type=%s", result, @@ -237,14 +573,16 @@ def emit(self, *args, **kwargs): ) else: # Handle QUEUED CONNECTION + receiver = conn.get_receiver() + if receiver is not None: receiver_loop = getattr( receiver, TSignalConstants.LOOP, None ) - receiver_thread = getattr( receiver, TSignalConstants.THREAD, None ) + if not receiver_loop: logger.error( "[TSignal][emit][QUEUED] No event loop found for receiver. receiver=%s", @@ -279,20 +617,25 @@ def emit(self, *args, **kwargs): logger.debug( "[TSignal][emit][QUEUED] slot=%s is_coroutine=%s", - slot.__name__, - is_coro_slot, + getattr(slot_to_call, "__name__", slot_to_call), + conn.is_coro_slot, ) - def dispatch(slot=slot, is_coro_slot=is_coro_slot): + def dispatch( + is_coro_slot=conn.is_coro_slot, + slot_to_call=slot_to_call, + ): logger.debug( "[TSignal][emit][QUEUED][dispatch] calling slot=%s", - slot.__name__, + getattr(slot_to_call, "__name__", slot_to_call), ) if is_coro_slot: - returned = asyncio.create_task(slot(*args, **kwargs)) + returned = asyncio.create_task( + slot_to_call(*args, **kwargs) + ) else: - returned = slot(*args, **kwargs) + returned = slot_to_call(*args, **kwargs) logger.debug( "[TSignal][emit][QUEUED][dispatch] returned=%s type=%s", @@ -308,6 +651,12 @@ def dispatch(slot=slot, is_coro_slot=is_coro_slot): logger.error( "[TSignal][emit] error in emission: %s", e, exc_info=True ) + + if conn.is_one_shot: + with self.connections_lock: + if conn in self.connections: + self.connections.remove(conn) + finally: _tsignal_from_emit.reset(token) @@ -333,7 +682,42 @@ def __get__(self, obj, objtype=None): def t_signal(func): - """Signal decorator""" + """ + Decorator that defines a signal attribute within a class decorated by `@t_with_signals`. + The decorated function name is used as the signal name, and it provides a lazy-initialized + `TSignal` instance. + + Parameters + ---------- + func : function + A placeholder function that helps to define the signal's name and docstring. The + function body is ignored at runtime, as the signal object is created and stored + dynamically. + + Returns + ------- + TSignalProperty + A property-like descriptor that, when accessed, returns the underlying `TSignal` object. + + Notes + ----- + - A typical usage looks like: + ```python + @t_with_signals + class MyClass: + @t_signal + def some_event(self): + # The body here is never called at runtime. + pass + ``` + - You can then emit the signal via `self.some_event.emit(...)`. + - The actual signal object is created and cached when first accessed. + + See Also + -------- + t_with_signals : Decorates a class to enable signal/slot features. + TSignal : The class representing an actual signal (internal usage). + """ sig_name = func.__name__ @@ -348,7 +732,50 @@ def wrap(self): def t_slot(func): - """Slot decorator""" + """ + Decorator that marks a method as a 'slot' for TSignal. Slots can be either synchronous + or asynchronous, and TSignal automatically handles cross-thread invocation. + + If this decorated method is called directly (i.e., not via a signal’s `emit()`) + from a different thread than the slot’s home thread/event loop, TSignal also ensures + that the call is dispatched (queued) correctly to the slot's thread. This guarantees + consistent and thread-safe execution whether the slot is triggered by a signal emit + or by a direct method call. + + Parameters + ---------- + func : function or coroutine + The method to be decorated as a slot. If it's a coroutine (async def), TSignal + treats it as an async slot. + + Returns + ------- + function or coroutine + A wrapped version of the original slot, with added thread/loop handling for + cross-thread invocation. + + Notes + ----- + - If the slot is synchronous and the emitter (or caller) is in another thread, + TSignal queues a function call to the slot’s thread/event loop. + - If the slot is asynchronous (`async def`), TSignal ensures that the coroutine + is scheduled on the correct event loop. + - The threading affinity and event loop references are automatically assigned + by `@t_with_signals` or `@t_with_worker` when the class instance is created. + + Examples + -------- + @t_with_signals + class Receiver: + @t_slot + def on_data_received(self, data): + print("Synchronous slot called in a thread-safe manner.") + + @t_slot + async def on_data_received_async(self, data): + await asyncio.sleep(1) + print("Asynchronous slot called in a thread-safe manner.") + """ is_coroutine = asyncio.iscoroutinefunction(func) @@ -430,8 +857,55 @@ def callback(): return wrap -def t_with_signals(cls, *, loop=None): - """Decorator for classes using signals""" +def t_with_signals(cls=None, *, loop=None, weak_default=True): + """ + Class decorator that enables the use of TSignal-based signals and TSlot-based slots. + When applied, it assigns an event loop and a thread affinity to each instance, + providing automatic threading support for signals and slots. + + Parameters + ---------- + cls : class, optional + The class to be decorated. If not provided, returns a decorator that can be + applied to a class. + loop : asyncio.AbstractEventLoop, optional + An event loop to be assigned to the instances of the decorated class. If omitted, + TSignal attempts to retrieve the current running loop. If none is found, it raises + an error or creates a new event loop in some contexts. + weak_default : bool, optional + Determines the default value for `weak` connections on signals from instances of + this class. If `True`, any signal `connect` call without a specified `weak` argument + will store a weak reference to the receiver. Defaults to `True`. + + Returns + ------- + class + The decorated class, now enabled with signal/slot features. + + Notes + ----- + - This decorator modifies the class’s `__init__` method to automatically assign + `_tsignal_thread`, `_tsignal_loop`, `_tsignal_affinity`, and `_tsignal_weak_default`. + - Typically, you’ll write: + ```python + @t_with_signals + class MyClass: + @t_signal + def some_event(self): + pass + ``` + Then create an instance: `obj = MyClass()`, and connect signals as needed. + - The `weak_default` argument can be overridden on a per-connection basis + by specifying `weak=True` or `weak=False` in `connect`. + + Example + ------- + @t_with_signals(loop=some_asyncio_loop, weak_default=False) + class MySender: + @t_signal + def message_sent(self): + pass + """ def wrap(cls): """Wrap class with signals""" @@ -455,6 +929,7 @@ def __init__(self, *args, **kwargs): self._tsignal_thread = threading.current_thread() self._tsignal_affinity = self._tsignal_thread self._tsignal_loop = current_loop + self._tsignal_weak_default = weak_default # Call the original __init__ original_init(self, *args, **kwargs) diff --git a/tests/__init__.py b/tests/__init__.py index e69de29..7cac770 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -0,0 +1 @@ +# pylint: disable=missing-module-docstring diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py index e69de29..7cac770 100644 --- a/tests/integration/__init__.py +++ b/tests/integration/__init__.py @@ -0,0 +1 @@ +# pylint: disable=missing-module-docstring diff --git a/tests/integration/test_thread_safety.py b/tests/integration/test_thread_safety.py new file mode 100644 index 0000000..500618e --- /dev/null +++ b/tests/integration/test_thread_safety.py @@ -0,0 +1,141 @@ +# tests/integration/test_thread_safety.py + +# pylint: disable=unused-argument + +""" +Test cases for thread safety of TSignal. +""" + +import unittest +import threading +import gc +from tsignal.core import t_with_signals, t_signal + + +@t_with_signals +class SafeSender: + """ + A class that sends events. + """ + + @t_signal + def event(self, value): + """ + Event signal. + """ + + +class SafeReceiver: + """ + A class that receives events. + """ + + def __init__(self, name=None): + self.called = 0 + self.name = name + + def on_event(self, value): + """ + Event handler. + """ + + self.called += 1 + + +class TestThreadSafe(unittest.IsolatedAsyncioTestCase): + """ + Test cases for thread safety of TSignal. + """ + + async def test_thread_safety(self): + """ + Test thread safety of TSignal. + """ + + sender = SafeSender() + receiver = SafeReceiver("strong_ref") + + # regular connection + sender.event.connect(receiver, receiver.on_event, weak=False) + + # weak reference connection + weak_receiver = SafeReceiver("weak_ref") + sender.event.connect(weak_receiver, weak_receiver.on_event, weak=True) + + # additional receivers + extra_receivers = [SafeReceiver(f"extra_{i}") for i in range(10)] + for r in extra_receivers: + sender.event.connect(r, r.on_event, weak=False) + + # background thread to emit events + def emit_task(): + """ + Background thread to emit events. + """ + + for i in range(1000): + sender.event.emit(i) + + # thread to connect/disconnect repeatedly + def connect_disconnect_task(): + """ + Thread to connect/disconnect repeatedly. + """ + + # randomly connect/disconnect one of extra_receivers + for i in range(500): + idx = i % len(extra_receivers) + r = extra_receivers[idx] + + if i % 2 == 0: + sender.event.connect(r, r.on_event, weak=False) + else: + sender.event.disconnect(r, r.on_event) + + # thread to try to GC weak_receiver + def gc_task(): + """ + Thread to try to GC weak_receiver. + """ + + nonlocal weak_receiver + for i in range(100): + if i == 50: + # release weak_receiver reference and try to GC + del weak_receiver + gc.collect() + else: + # randomly emit events + sender.event.emit(i) + + threads = [] + # multiple threads to perform various tasks + threads.append(threading.Thread(target=emit_task)) + threads.append(threading.Thread(target=connect_disconnect_task)) + threads.append(threading.Thread(target=gc_task)) + + # start threads + for t in threads: + t.start() + + # wait for all threads to finish + for t in threads: + t.join() + + # check: strong_ref receiver should have been called + self.assertTrue( + receiver.called > 0, + f"Strong ref receiver should have been called. Called={receiver.called}", + ) + + # some extra_receivers may have been connected/disconnected repeatedly + # if at least one of them has been called, it's normal + called_counts = [r.called for r in extra_receivers] + self.assertTrue( + any(c > 0 for c in called_counts), + "At least one extra receiver should have been called.", + ) + + # weak_receiver can be GCed. If it is GCed, the receiver will not be called anymore. + # weak_receiver itself is GCed, so it is not accessible. In this case, it is simply checked that it works without errors. + # Here, it is simply checked that the code does not raise an exception and terminates normally. diff --git a/tests/integration/test_threading.py b/tests/integration/test_threading.py index 7e92c52..8f8cb42 100644 --- a/tests/integration/test_threading.py +++ b/tests/integration/test_threading.py @@ -1,12 +1,12 @@ # tests/integration/test_threading.py +# pylint: disable=unused-argument +# pylint: disable=unnecessary-lambda + """ Test cases for threading. """ -# pylint: disable=unused-argument -# pylint: disable=unnecessary-lambda - import asyncio import threading import time diff --git a/tests/integration/test_with_signal.py b/tests/integration/test_with_signal.py index ecfad7e..7243b6b 100644 --- a/tests/integration/test_with_signal.py +++ b/tests/integration/test_with_signal.py @@ -1,5 +1,7 @@ # tests/integration/test_with_signal.py +# pylint: disable=duplicate-code + """ Test cases for the with-signal pattern. """ diff --git a/tests/integration/test_worker.py b/tests/integration/test_worker.py index 57b580c..27d5220 100644 --- a/tests/integration/test_worker.py +++ b/tests/integration/test_worker.py @@ -1,13 +1,13 @@ # tests/integration/test_worker.py -""" -Test cases for the worker pattern. -""" - # pylint: disable=no-member # pylint: disable=redefined-outer-name # pylint: disable=unused-variable +""" +Test cases for the worker pattern. +""" + import asyncio import logging import pytest diff --git a/tests/integration/test_worker_queue.py b/tests/integration/test_worker_queue.py index 0b45c2d..8717bb3 100644 --- a/tests/integration/test_worker_queue.py +++ b/tests/integration/test_worker_queue.py @@ -1,12 +1,12 @@ # tests/integration/test_worker_queue.py +# pylint: disable=no-member +# pylint: disable=redefined-outer-name + """ Test cases for the worker-queue pattern. """ -# pylint: disable=no-member -# pylint: disable=redefined-outer-name - import asyncio import logging import pytest diff --git a/tests/integration/test_worker_signal.py b/tests/integration/test_worker_signal.py index 47950b5..a01659a 100644 --- a/tests/integration/test_worker_signal.py +++ b/tests/integration/test_worker_signal.py @@ -1,14 +1,14 @@ # tests/integration/test_worker_signal.py -""" -Test cases for the worker-signal pattern. -""" - # pylint: disable=redefined-outer-name # pylint: disable=unnecessary-lambda # pylint: disable=unnecessary-lambda-assignment # pylint: disable=no-member +""" +Test cases for the worker-signal pattern. +""" + import asyncio import logging import pytest diff --git a/tests/performance/test_memory.py b/tests/performance/test_memory.py index e9be375..e3436ab 100644 --- a/tests/performance/test_memory.py +++ b/tests/performance/test_memory.py @@ -1,13 +1,14 @@ # tests/performance/test_memory.py -""" -Test cases for memory usage. -""" - # pylint: disable=no-member # pylint: disable=redefined-outer-name # pylint: disable=unused-variable + +""" +Test cases for memory usage. +""" + import pytest from tsignal import t_with_signals, t_signal, t_slot diff --git a/tests/performance/test_stress.py b/tests/performance/test_stress.py index 6eea387..c572510 100644 --- a/tests/performance/test_stress.py +++ b/tests/performance/test_stress.py @@ -1,13 +1,13 @@ # tests/performance/test_stress.py -""" -Test cases for stress testing. -""" - # pylint: disable=no-member # pylint: disable=redefined-outer-name # pylint: disable=unused-variable +""" +Test cases for stress testing. +""" + import asyncio import logging import pytest diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py index e69de29..6300cd5 100644 --- a/tests/unit/__init__.py +++ b/tests/unit/__init__.py @@ -0,0 +1,2 @@ +# pylint: disable=missing-module-docstring +# pylint: disable=duplicate-code diff --git a/tests/unit/test_property.py b/tests/unit/test_property.py index 4e26340..112d58e 100644 --- a/tests/unit/test_property.py +++ b/tests/unit/test_property.py @@ -1,13 +1,13 @@ # tests/unit/test_property.py -""" -Test cases for the property pattern. -""" - # pylint: disable=no-member # pylint: disable=unnecessary-lambda # pylint: disable=useless-with-lock +""" +Test cases for the property pattern. +""" + import asyncio import threading import logging diff --git a/tests/unit/test_signal.py b/tests/unit/test_signal.py index e318869..ddbeffa 100644 --- a/tests/unit/test_signal.py +++ b/tests/unit/test_signal.py @@ -1,13 +1,13 @@ # tests/unit/test_signal.py -""" -Test cases for the signal pattern. -""" - # pylint: disable=unused-argument # pylint: disable=unused-variable # pylint: disable=too-many-locals +""" +Test cases for the signal pattern. +""" + import asyncio import logging import pytest @@ -92,7 +92,7 @@ async def test_signal_disconnect_specific_slot(sender, receiver): # Only async slot should remain remaining = sender.value_changed.connections[0] - assert remaining[1] == receiver.on_value_changed + assert remaining.get_slot_to_call() == receiver.on_value_changed @pytest.mark.asyncio @@ -136,8 +136,8 @@ async def test_signal_disconnect_specific_receiver_and_slot(sender, receiver): assert len(sender.value_changed.connections) == 1 # Only sync slot should remain - remaining = sender.value_changed.connections[0] - assert remaining[1] == receiver.on_value_changed_sync + conn = sender.value_changed.connections[0] + assert conn.get_slot_to_call() == receiver.on_value_changed_sync @pytest.mark.asyncio @@ -294,14 +294,18 @@ def collect_value(self, value): # The connection type of signal_receiver's method is DIRECT_CONNECTION # because it has the same thread affinity as the signal - receiver, slot, conn_type, is_coro = signal.connections[-1] - actual_type = _determine_connection_type(conn_type, receiver, signal.owner, is_coro) + conn = signal.connections[-1] + actual_type = _determine_connection_type( + conn.conn_type, conn.get_receiver(), signal.owner, conn.is_coro_slot + ) assert actual_type == TConnectionType.DIRECT_CONNECTION # The connection type of regular class's method is DIRECT_CONNECTION # because it has the same thread affinity as the signal - receiver, slot, conn_type, is_coro = signal.connections[-1] - actual_type = _determine_connection_type(conn_type, receiver, signal.owner, is_coro) + conn = signal.connections[-1] + actual_type = _determine_connection_type( + conn.conn_type, conn.get_receiver(), signal.owner, conn.is_coro_slot + ) assert actual_type == TConnectionType.DIRECT_CONNECTION signal.emit(42) @@ -369,45 +373,112 @@ async def async_handler(self, value): threaded_obj = ThreadedClass() worker_obj = WorkerClass() - # Test sync function connections signal = regular_with_signal_obj.test_signal signal.connect(regular_handler) - receiver, slot, conn_type, is_coro = signal.connections[-1] - actual_type = _determine_connection_type(conn_type, receiver, signal.owner, is_coro) + + # Test sync function connections + conn = signal.connections[-1] + actual_type = _determine_connection_type( + conn.conn_type, conn.get_receiver(), signal.owner, conn.is_coro_slot + ) assert actual_type == TConnectionType.DIRECT_CONNECTION # Test async function connections signal.connect(async_handler) - receiver, slot, conn_type, is_coro = signal.connections[-1] - actual_type = _determine_connection_type(conn_type, receiver, signal.owner, is_coro) + conn = signal.connections[-1] + actual_type = _determine_connection_type( + conn.conn_type, conn.get_receiver(), signal.owner, conn.is_coro_slot + ) assert actual_type == TConnectionType.QUEUED_CONNECTION # Test regular class method signal.connect(regular_obj.handler) - receiver, slot, conn_type, is_coro = signal.connections[-1] - actual_type = _determine_connection_type(conn_type, receiver, signal.owner, is_coro) + conn = signal.connections[-1] + actual_type = _determine_connection_type( + conn.conn_type, conn.get_receiver(), signal.owner, conn.is_coro_slot + ) assert actual_type == TConnectionType.DIRECT_CONNECTION # Test threaded class with sync method signal.connect(threaded_obj, threaded_obj.sync_handler) - receiver, slot, conn_type, is_coro = signal.connections[-1] - actual_type = _determine_connection_type(conn_type, receiver, signal.owner, is_coro) + conn = signal.connections[-1] + actual_type = _determine_connection_type( + conn.conn_type, conn.get_receiver(), signal.owner, conn.is_coro_slot + ) assert actual_type == TConnectionType.DIRECT_CONNECTION # Test threaded class with async method signal.connect(threaded_obj, threaded_obj.async_handler) - receiver, slot, conn_type, is_coro = signal.connections[-1] - actual_type = _determine_connection_type(conn_type, receiver, signal.owner, is_coro) + conn = signal.connections[-1] + actual_type = _determine_connection_type( + conn.conn_type, conn.get_receiver(), signal.owner, conn.is_coro_slot + ) assert actual_type == TConnectionType.QUEUED_CONNECTION # Test worker class with sync method signal.connect(worker_obj.sync_handler) - receiver, slot, conn_type, is_coro = signal.connections[-1] - actual_type = _determine_connection_type(conn_type, receiver, signal.owner, is_coro) + conn = signal.connections[-1] + actual_type = _determine_connection_type( + conn.conn_type, conn.get_receiver(), signal.owner, conn.is_coro_slot + ) assert actual_type == TConnectionType.QUEUED_CONNECTION # Test worker class with async method signal.connect(worker_obj.async_handler) - receiver, slot, conn_type, is_coro = signal.connections[-1] - actual_type = _determine_connection_type(conn_type, receiver, signal.owner, is_coro) + conn = signal.connections[-1] + actual_type = _determine_connection_type( + conn.conn_type, conn.get_receiver(), signal.owner, conn.is_coro_slot + ) assert actual_type == TConnectionType.QUEUED_CONNECTION + + +async def test_one_shot(): + """ + Verifies that one_shot connections are triggered exactly once, + then removed automatically upon the first call. + """ + + @t_with_signals + class OneShotSender: + """ + A class that sends one-shot events. + """ + + @t_signal + def one_shot_event(self, value): + """ + One-shot event signal. + """ + + class OneShotReceiver: + """ + A class that receives one-shot events. + """ + + def __init__(self): + self.called_count = 0 + + def on_event(self, value): + """ + Event handler. + """ + + self.called_count += 1 + + sender = OneShotSender() + receiver = OneShotReceiver() + + sender.one_shot_event.connect(receiver, receiver.on_event, one_shot=True) + + sender.one_shot_event.emit(123) + # Ensure all processing is complete + await asyncio.sleep(1) + + # Already called once, so second emit should not trigger on_event + sender.one_shot_event.emit(456) + await asyncio.sleep(1) + + # Check if it was called only once + assert ( + receiver.called_count == 1 + ), "Receiver should only be called once for a one_shot connection" diff --git a/tests/unit/test_slot.py b/tests/unit/test_slot.py index b678711..4204858 100644 --- a/tests/unit/test_slot.py +++ b/tests/unit/test_slot.py @@ -1,11 +1,12 @@ # tests/unit/test_slot.py +# pylint: disable=duplicate-code +# pylint: disable=no-member + """ Test cases for the slot pattern. """ -# pylint: disable=no-member - import asyncio import threading import time diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index b276486..5175b51 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -1,11 +1,11 @@ # tests/unit/test_utils.py -"""Test utils""" - # pylint: disable=no-member # pylint: disable=unused-argument # pylint: disable=redefined-outer-name +"""Test utils""" + from unittest.mock import MagicMock import logging import pytest diff --git a/tests/unit/test_weak.py b/tests/unit/test_weak.py new file mode 100644 index 0000000..caa1bc2 --- /dev/null +++ b/tests/unit/test_weak.py @@ -0,0 +1,187 @@ +# tests/unit/test_weak.py + +""" +Test cases for weak reference connections. +""" + +# pylint: disable=unused-argument +# pylint: disable=redundant-unittest-assert + +import unittest +import gc +import asyncio +import weakref +from tsignal.core import t_with_signals, t_signal + + +class WeakRefReceiver: + """ + A class that receives weak reference events. + """ + + def __init__(self): + self.called = False + + def on_signal(self, value): + """ + Event handler. + """ + + self.called = True + print(f"WeakRefReceiver got value: {value}") + + +@t_with_signals(weak_default=True) +class WeakRefSender: + """ + A class that sends weak reference events. + """ + + @t_signal + def event(self): + """ + Event signal. + """ + + +class StrongRefReceiver: + """ + A class that receives strong reference events. + """ + + def __init__(self): + """ + Initialize the receiver. + """ + + self.called = False + + def on_signal(self, value): + """ + Event handler. + """ + + self.called = True + print(f"StrongRefReceiver got value: {value}") + + +@t_with_signals(weak_default=True) +class MixedSender: + """ + A class that sends mixed reference events. + """ + + @t_signal + def event(self, value): + """ + Event signal. + """ + + +class TestWeakRefConnections(unittest.IsolatedAsyncioTestCase): + """ + Test cases for weak reference connections. + """ + + async def test_weak_default_connection(self): + """ + Test weak default connection. + """ + + sender = WeakRefSender() + receiver = WeakRefReceiver() + + # connect without specifying weak, should use weak_default=True + sender.event.connect(receiver, receiver.on_signal) + + sender.event.emit(42) + self.assertTrue(receiver.called, "Receiver should be called when alive") + + # Delete receiver and force GC + del receiver + gc.collect() + + # After GC, the connection should be removed automatically + # Emit again and ensure no error and no print from receiver + sender.event.emit(100) + # If receiver was alive or connection remained, it would print or set called to True + # But we no longer have access to receiver here + # Just ensure no exception - implicit check + self.assertTrue( + True, "No exception emitted, weak ref disconnected automatically" + ) + + async def test_override_weak_false(self): + """ + Test override weak=False. + """ + + sender = MixedSender() + receiver = StrongRefReceiver() + + # Even though weak_default=True, we explicitly set weak=False + sender.event.connect(receiver, receiver.on_signal, weak=False) + + sender.event.emit(10) + self.assertTrue(receiver.called, "Receiver called with strong ref") + + # Reset called + receiver.called = False + + # Delete receiver and force GC + receiver_ref = weakref.ref(receiver) + del receiver + gc.collect() + + # Check if receiver is GCed + # Originally: self.assertIsNone(receiver_ref(), "Receiver should be GCed") + # Update the expectation: Since weak=False means strong ref remains, receiver won't GC. + self.assertIsNotNone( + receiver_ref(), "Receiver should NOT be GCed due to strong ref" + ) + + # Emit again, should still have a reference + sender.event.emit(200) + # Even if we can't call receiver (it was del), the reference in slot keeps it alive, + # but possibly as an inaccessible object. + # Just checking no exception raised and that receiver_ref is not None. + # This confirms the slot strong reference scenario. + self.assertTrue(True, "No exception raised, strong ref scenario is consistent") + + async def test_explicit_weak_true(self): + """ + Test explicit weak=True. + """ + + sender = MixedSender() + receiver = StrongRefReceiver() + + # weak_default=True anyway, but let's be explicit + sender.event.connect(receiver, receiver.on_signal, weak=True) + + sender.event.emit(20) + self.assertTrue(receiver.called, "Explicit weak=True call") + + receiver.called = False + + # Create a weak reference to the receiver + receiver_ref = weakref.ref(receiver) + self.assertIsNotNone(receiver_ref(), "Receiver should be alive before deletion") + + # Delete strong reference and force GC + del receiver + gc.collect() + + # Check if the receiver has been collected + self.assertIsNone( + receiver_ref(), "Receiver should be GCed after weakref disconnection" + ) + + # Receiver gone, emit again + # Should not call anything, no crash + sender.event.emit(30) + self.assertTrue(True, "No exception and no call because weak disconnect") + + +if __name__ == "__main__": + asyncio.run(unittest.main())