diff --git a/.github/workflows/tests.yml b/.github/workflows/ci.yml similarity index 64% rename from .github/workflows/tests.yml rename to .github/workflows/ci.yml index da8a8e5..9a89c75 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/ci.yml @@ -1,4 +1,5 @@ -name: Tests +# .github/workflows/ci.yml +name: CI on: push: @@ -11,7 +12,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: ['3.8', '3.9', '3.10', '3.11', '3.12'] + python-version: ['3.10', '3.11', '3.12'] steps: - uses: actions/checkout@v2 @@ -24,9 +25,18 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - python -m pip install pytest pytest-cov pytest-asyncio pip install -e ".[dev]" + pip install pylint mypy - name: Run tests run: | pytest --cov=tsignal + + - name: Code quality checks + run: | + pylint src/tsignal + mypy src/tsignal + + - name: Performance tests + run: | + pytest -v -m performance \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 4324cf8..e7741e0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,54 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.2.0] - 2024-03-19 + +### Changed +- Updated minimum Python version requirement to 3.10 + - This change was necessary to ensure reliable worker thread functionality + - Python 3.10+ provides improved async features and type handling + - Better support for async context management and error handling +- Updated documentation to reflect new Python version requirement +- Enhanced worker thread implementation with Python 3.10+ features + +### Added +- Performance tests for stress testing and memory usage analysis + - Includes `test_stress.py` for heavy signal load testing + - Includes `test_memory.py` for memory profiling + +### Removed +- Support for Python versions below 3.10 + +### Note +Core features are now implemented and stable: +- Robust signal-slot mechanism +- Thread-safe operations +- Async/await support +- Worker thread pattern +- Comprehensive documentation +- Full test coverage + +Next steps before 1.0.0: +- Additional stress testing +- Memory leak verification +- Production environment validation +- Enhanced CI/CD pipeline +- Extended documentation + +## [0.1.1] - 2024-12-01 + +### Changed +- Refactored signal connection logic to support direct function connections +- Improved error handling for invalid connections +- Enhanced logging for signal emissions and connections + +### Fixed +- Resolved issues with disconnecting slots during signal emissions +- Fixed bugs related to async slot processing and connection management + +### Removed +- Deprecated unused constants and methods from the core module + ## [0.1.0] - 2024-01-26 ### Added diff --git a/README.md b/README.md index f1f6149..fb59e0e 100644 --- a/README.md +++ b/README.md @@ -70,6 +70,7 @@ asyncio.run(main()) ``` ## Features +- Requires Python 3.10+ - Easy-to-use signal-slot mechanism with decorators - Support for both synchronous and asynchronous slots - Thread-safe signal emissions @@ -78,7 +79,7 @@ asyncio.run(main()) ## Installation -Currently, this package is under development. You can install it directly from the repository: +TSignal requires Python 3.10 or higher. You can install it directly from the repository: ```bash git clone https://github.com/tsignal/tsignal-python.git @@ -132,3 +133,88 @@ Please see [Contributing Guidelines](CONTRIBUTING.md) for details on how to cont ## License This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details. + +## Connecting Signals and Slots + +### Classic Object-Member Connection +```python +@t_with_signals +class Counter: + @t_signal + def count_changed(self): + pass + +@t_with_signals +class Display: + @t_slot + def on_count_changed(self, value): + print(f"Count is now: {value}") + +counter = Counter() +display = Display() +counter.count_changed.connect(display, display.on_count_changed) +``` + +### Function Connection +```python +# Connect to a simple function +def print_value(value): + print(f"Value: {value}") + +counter.count_changed.connect(print_value) + +# Connect to a lambda +counter.count_changed.connect(lambda x: print(f"Lambda received: {x}")) + +# Connect to an object method without @t_slot +class Handler: + def process_value(self, value): + print(f"Processing: {value}") + +handler = Handler() +counter.count_changed.connect(handler.process_value) +``` + +## Worker Thread Pattern + +TSignal provides a worker thread pattern that combines thread management with signal/slot communication and task queuing: + +```python +from tsignal import t_with_worker + +@t_with_worker +class DataProcessor: + async def initialize(self, config=None): + # Setup worker (called in worker thread) + self.config = config or {} + + async def process_data(self, data): + # Heavy processing in worker thread + result = await heavy_computation(data) + self.processing_done.emit(result) + + async def finalize(self): + # Cleanup worker (called before thread stops) + await self.cleanup() + + @t_signal + def processing_done(self): + pass + +# Usage +processor = DataProcessor() +processor.start(config={'threads': 4}) # Starts worker thread + +# Queue task in worker thread +await processor.queue_task(processor.process_data(some_data)) + +# Stop worker +processor.stop() # Graceful shutdown +``` + +The worker pattern provides: +- Dedicated worker thread with event loop +- Built-in signal/slot support +- Async task queue +- Graceful initialization/shutdown +- Thread-safe communication diff --git a/docs/api.md b/docs/api.md index 331ed8a..0ba50b9 100644 --- a/docs/api.md +++ b/docs/api.md @@ -36,6 +36,43 @@ async def on_async_signal(self, *args, **kwargs): pass ``` +### `@t_with_worker` +Class decorator that creates a worker thread with signal support and task queue. + +**Requirements:** +- Class must implement async `initialize(self, *args, **kwargs)` method +- Class must implement async `finalize(self)` method + +**Added Methods:** +##### `start(*args, **kwargs) -> None` +Starts the worker thread and calls initialize with given arguments. + +##### `stop() -> None` +Stops the worker thread gracefully, calling finalize. + +##### `async queue_task(coro) -> None` +Queues a coroutine for execution in the worker thread. + +**Example:** +```python +@t_with_worker +class Worker: + async def initialize(self): + print("Worker initialized") + + async def finalize(self): + print("Worker cleanup") + + async def process(self): + await asyncio.sleep(1) + print("Processing done") + +worker = Worker() +worker.start() +await worker.queue_task(worker.process()) +worker.stop() +``` + ## Classes ### `TSignal` @@ -43,13 +80,44 @@ Base class for signals. #### Methods -##### `connect(receiver: object, slot: Callable, connection_type: Optional[TConnectionType] = None) -> None` +##### `connect(receiver_or_slot: Union[object, Callable], slot: Optional[Callable] = None) -> None` Connects the signal to a slot. **Parameters:** -- `receiver`: Object that contains the slot -- `slot`: Callable that will receive the signal -- `connection_type`: Optional connection type (DirectConnection or QueuedConnection) +- When connecting to a QObject slot: + - `receiver_or_slot`: The receiver object + - `slot`: The slot method of the receiver + +- When connecting to a function/lambda: + - `receiver_or_slot`: The callable (function, lambda, or method) + - `slot`: None + +**Connection Behavior:** +1. Object Method with Signal Support: + ```python + @t_with_signals + class Receiver: + def on_signal(self, value): + print(value) + + receiver = Receiver() + signal.connect(receiver.on_signal) # Automatically sets up receiver + ``` + +2. Regular Object Method: + ```python + class RegularClass: + def on_signal(self, value): + print(value) + + obj = RegularClass() + signal.connect(obj.on_signal) # Treated as direct connection + ``` + +The connection type is automatically determined: +- Methods from objects with `@t_with_signals` are set up with their object as receiver +- Regular object methods are treated as direct connections +- Async methods always use queued connections ##### `disconnect(receiver: Optional[object] = None, slot: Optional[Callable] = None) -> int` Disconnects one or more slots from the signal. @@ -104,18 +172,8 @@ Emits the signal with the given arguments. Enum defining connection types. #### Values: -- `DirectConnection`: Slot is called directly in the emitting thread -- `QueuedConnection`: Slot is queued in the receiver's event loop - -## Constants - -### `TSignalConstants` -Constants used by the TSignal system. - -#### Values: -- `FROM_EMIT`: Key for emission context -- `THREAD`: Key for thread storage -- `LOOP`: Key for event loop storage +- `DIRECT_CONNECTION`: Slot is called directly in the emitting thread +- `QUEUED_CONNECTION`: Slot is queued in the receiver's event loop ## Usage Examples @@ -164,14 +222,14 @@ asyncio.run(main()) sender.value_changed.connect( receiver, receiver.on_value_changed, - connection_type=TConnectionType.DirectConnection + connection_type=TConnectionType.DIRECT_CONNECTION ) # Force queued connection sender.value_changed.connect( receiver, receiver.on_value_changed, - connection_type=TConnectionType.QueuedConnection + connection_type=TConnectionType.QUEUED_CONNECTION ) ``` diff --git a/docs/logging.md b/docs/logging.md index bc3ae46..5035991 100644 --- a/docs/logging.md +++ b/docs/logging.md @@ -1,5 +1,8 @@ # Logging Guidelines +## Requirements +TSignal requires Python 3.10 or higher. + TSignal uses Python's standard logging module with the following levels: - DEBUG: Detailed information about signal-slot connections and emissions diff --git a/docs/testing.md b/docs/testing.md index 461c2f7..bc61580 100644 --- a/docs/testing.md +++ b/docs/testing.md @@ -1,7 +1,7 @@ # Testing Guide ## Overview -TSignal uses pytest for testing. Our test suite includes unit tests, integration tests, and supports async testing. +TSignal requires Python 3.10 or higher and uses pytest for testing. Our test suite includes unit tests, integration tests, performance tests, and supports async testing. ## Test Structure ``` @@ -13,10 +13,14 @@ tests/ │ ├── test_signal.py │ ├── test_slot.py │ └── test_with_signals.py -└── integration/ # Integration tests +├── integration/ # Integration tests +│ ├── __init__.py +│ ├── test_async.py +│ └── test_threading.py +└── performance/ # Performance and stress tests ├── __init__.py - ├── test_async.py - └── test_threading.py + ├── test_stress.py + └── test_memory.py ``` ## Running Tests @@ -43,8 +47,23 @@ pytest tests/unit/test_signal.py -k "test_signal_disconnect_all" # Run tests by marker pytest -v -m asyncio +pytest -v -m performance # Run performance tests only +``` + +### Performance Tests +Performance tests include stress testing and memory usage analysis. These tests are marked with the `@pytest.mark.performance` decorator. + +```bash +# Run only performance tests +pytest -v -m performance + +# Run specific performance test +pytest tests/performance/test_stress.py +pytest tests/performance/test_memory.py ``` +Note: Performance tests might take longer to run and consume more resources than regular tests. + ### Debug Mode To enable debug logging during tests: ```bash @@ -65,3 +84,12 @@ pytest --cov=tsignal # Generate HTML coverage report pytest --cov=tsignal --cov-report=html ``` + +## Async Testing Configuration + +The project uses `pytest-asyncio` with `asyncio_mode = "auto"` to handle async fixtures and tests. This configuration allows for more flexible handling of async/sync code interactions, especially in worker-related tests where we need to manage both synchronous and asynchronous operations. + +Key points: +- Async fixtures can yield values directly +- Both sync and async tests can use the same fixtures +- Worker thread initialization and cleanup are handled automatically diff --git a/docs/usage.md b/docs/usage.md index 1e4a553..f7d549e 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -1,5 +1,8 @@ # Usage Guide +## Requirements +TSignal requires Python 3.10 or higher. + ## Table of Contents 1. [Basic Concepts](#basic-concepts) 2. [Signals](#signals) @@ -7,6 +10,7 @@ 4. [Connection Types](#connection-types) 5. [Threading and Async](#threading-and-async) 6. [Best Practices](#best-practices) +7. [Worker Thread Pattern](#worker-thread-pattern) ## Basic Concepts TSignal implements the signal-slot pattern, which allows for loose coupling between components. The core concepts are: @@ -71,18 +75,18 @@ class DataProcessor: ## Connection Types TSignal supports two types of connections: -### DirectConnection +### DIRECT_CONNECTION - Signal and slot execute in the same thread - Slot is called immediately when signal is emitted ```python -signal.connect(receiver, slot, connection_type=TConnectionType.DirectConnection) +signal.connect(receiver, slot, connection_type=TConnectionType.DIRECT_CONNECTION) ``` -### QueuedConnection +### QUEUED_CONNECTION - Signal and slot can execute in different threads - Slot execution is queued in receiver's event loop ```python -signal.connect(receiver, slot, connection_type=TConnectionType.QueuedConnection) +signal.connect(receiver, slot, connection_type=TConnectionType.QUEUED_CONNECTION) ``` Connection type is automatically determined based on: @@ -277,3 +281,140 @@ This understanding of signal disconnection behavior is crucial for: - Ensuring proper resource cleanup - Managing complex async operations - Handling thread synchronization correctly + +## Signal Connection Types + +### Object-Member Connection +Traditional signal-slot connection between objects: +```python +@t_with_signals +class Sender: + @t_signal + def value_changed(self): + pass + +@t_with_signals +class Receiver: + @t_slot + def on_value_changed(self, value): + print(f"Value: {value}") + +sender.value_changed.connect(receiver, receiver.on_value_changed) +``` + +### Function Connection +Connect signals directly to functions or lambdas: +```python +# Standalone function +def handle_value(value): + print(f"Value: {value}") +sender.value_changed.connect(handle_value) + +# Lambda function +sender.value_changed.connect(lambda x: print(f"Value: {x}")) +``` + +### Method Connection +Connect to object methods without @t_slot decorator: +```python +class Handler: + def process(self, value): + print(f"Processing: {value}") + +handler = Handler() +sender.value_changed.connect(handler.process) +``` + +### Connection Behavior Notes +- Object-member connections are automatically disconnected when the receiver is destroyed +- Function connections remain active until explicitly disconnected +- Method connections behave like function connections and need manual cleanup + +## Worker Thread Pattern + +### Overview +The worker pattern provides a convenient way to run operations in a background thread with built-in signal/slot support and task queuing. + +### Basic Worker +```python +@t_with_worker +class ImageProcessor: + async def initialize(self, cache_size=100): + """Called when worker starts""" + self.cache = {} + self.cache_size = cache_size + + async def finalize(self): + """Called before worker stops""" + self.cache.clear() + + @t_signal + def processing_complete(self): + """Signal emitted when processing is done""" + pass + + async def process_image(self, image_data): + """Task to be executed in worker thread""" + result = await self.heavy_processing(image_data) + self.processing_complete.emit(result) + +# Usage +processor = ImageProcessor() +processor.start(cache_size=200) + +# Queue tasks +await processor.queue_task(processor.process_image(data1)) +await processor.queue_task(processor.process_image(data2)) + +# Cleanup +processor.stop() +``` + +### Worker Features + +#### Thread Safety +- All signal emissions are thread-safe +- Task queue is thread-safe +- Worker has its own event loop + +#### Task Queue +- Tasks are executed sequentially +- Tasks must be coroutines +- Queue is processed in worker thread + +#### Lifecycle Management +```python +@t_with_worker +class Worker: + async def initialize(self, *args, **kwargs): + # Setup code + self.resources = await setup_resources() + + async def finalize(self): + # Cleanup code + await self.resources.cleanup() + + async def some_task(self): + # Will run in worker thread + await self.resources.process() + +worker = Worker() +try: + worker.start() + await worker.queue_task(worker.some_task()) +finally: + worker.stop() # Ensures cleanup +``` + +#### Combining with Signals +Workers can use signals to communicate results: +```python +@t_with_worker +class DataProcessor: + def __init__(self): + super().__init__() + self.results = [] + + @t_signal + def processing_done(self): + """Emitted when batch is diff --git a/docs/windows-asyncio-iocp-termination-issue.md b/docs/windows-asyncio-iocp-termination-issue.md new file mode 100644 index 0000000..e08eedd --- /dev/null +++ b/docs/windows-asyncio-iocp-termination-issue.md @@ -0,0 +1,73 @@ +# Windows IOCP and asyncio Event Loop Termination Issue + +## Problem Overview +A chronic issue that occurs when terminating asyncio event loops in Windows environments. This primarily occurs with `ProactorEventLoop`, which is based on Windows' IOCP (Input/Output Completion Port). + +## Key Symptoms +- Event loop fails to terminate completely even when all tasks are completed and no pending work exists +- Event loop continues to show as running even after calling `loop.stop()` +- Process remains in background without full termination + +## Root Cause +- Windows IOCP kernel objects not being properly cleaned up +- Incomplete integration between Python asyncio and Windows IOCP +- Multiple reports in Python bug tracker ([bpo-23057](https://bugs.python.org/issue23057), [bpo-45097](https://bugs.python.org/issue45097)) + +## Code Example +Here's a typical scenario where this issue occurs: + +```python +import asyncio +import threading + +class WorkerClass: + def start_worker(self): + self.worker_thread = threading.Thread( + target=run_worker, + name="WorkerThread", + daemon=True # Set as daemon thread to work around the issue + ) + self.worker_thread.start() + + def stop_worker(self): + if self.worker_loop: + self.worker_loop.call_soon_threadsafe(self._worker_loop.stop) + if self.worker_thread: + self.worker_thread.join() +``` + +## Solution +The most common workaround is to set the worker thread as a daemon thread: + +### Benefits of Daemon Thread Approach +- Allows forced thread termination on program exit +- Bypasses event loop hanging issues +- Enables clean process termination + +### Important Notes +- This issue is less prevalent on Linux and macOS +- Not a perfect solution as forced termination might lead to resource cleanup issues +- Remains an ongoing issue in the Python community + +## Alternative Solutions +While daemon threads are the most practical solution, other approaches include: + +1. Implementing careful cleanup logic: +```python +async def cleanup(): + tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] + for task in tasks: + task.cancel() + await asyncio.gather(tasks, return_exceptions=True) + loop.stop() +``` + +2. Using signal handlers for graceful shutdown +3. Implementing timeout-based forced termination + +## Platform Specifics +- Windows: Most affected due to IOCP implementation +- Linux/macOS: Less problematic due to different event loop implementations +- The issue is specific to asyncio's integration with Windows' IOCP + + diff --git a/examples/02_basic-signal.py b/examples/01_signal_basic.py similarity index 100% rename from examples/02_basic-signal.py rename to examples/01_signal_basic.py diff --git a/examples/01_async-signal.py b/examples/02_signal_async.py similarity index 100% rename from examples/01_async-signal.py rename to examples/02_signal_async.py diff --git a/examples/03_thread-communication.py b/examples/03_thread_basic.py similarity index 100% rename from examples/03_thread-communication.py rename to examples/03_thread_basic.py diff --git a/examples/04_thread_worker.py b/examples/04_thread_worker.py new file mode 100644 index 0000000..26a8f75 --- /dev/null +++ b/examples/04_thread_worker.py @@ -0,0 +1,137 @@ +""" +Thread Worker Pattern Example + +This example demonstrates the worker thread pattern using TSignal's t_with_worker decorator: + +1. Worker Thread: + - ImageProcessor: A worker class that processes images in background + - Supports async initialization and cleanup + - Uses task queue for processing requests + - Communicates results via signals + +2. Main Thread: + - Controls worker lifecycle + - Submits processing requests + - Receives processing results + +Architecture: +- Worker runs in separate thread with its own event loop +- Task queue ensures sequential processing +- Signal/Slot connections handle thread-safe communication +""" + +import asyncio +import time +from tsignal import t_with_signals, t_signal, t_slot, t_with_worker + + +@t_with_worker +class ImageProcessor: + """Worker that processes images in background thread""" + + def __init__(self): + self.cache = {} + super().__init__() + + async def initialize(self, cache_size=100): + """Initialize worker (runs in worker thread)""" + print(f"[Worker Thread] Initializing image processor (cache_size={cache_size})") + self.cache_size = cache_size + + async def finalize(self): + """Cleanup worker (runs in worker thread)""" + print("[Worker Thread] Cleaning up image processor") + self.cache.clear() + + @t_signal + def processing_complete(self): + """Signal emitted when image processing completes""" + pass + + @t_signal + def batch_complete(self): + """Signal emitted when batch processing completes""" + pass + + async def process_image(self, image_id: str, image_data: bytes): + """Process single image (runs in worker thread)""" + print(f"[Worker Thread] Processing image {image_id}") + + # Simulate image processing + await asyncio.sleep(0.5) + result = f"Processed_{image_id}" + + # Cache result + if len(self.cache) >= self.cache_size: + self.cache.pop(next(iter(self.cache))) + self.cache[image_id] = result + + # Emit result + self.processing_complete.emit(image_id, result) + return result + + async def process_batch(self, images: list): + """Process batch of images (runs in worker thread)""" + results = [] + for img_id, img_data in images: + result = await self.process_image(img_id, img_data) + results.append(result) + self.batch_complete.emit(results) + return results + + +@t_with_signals +class ImageViewer: + """UI component that displays processed images""" + + def __init__(self): + print("[Main Thread] Creating image viewer") + self.processed_images = {} + + @t_slot + def on_image_processed(self, image_id: str, result: str): + """Handle processed image (runs in main thread)""" + print(f"[Main Thread] Received processed image {image_id}") + self.processed_images[image_id] = result + + @t_slot + def on_batch_complete(self, results: list): + """Handle completed batch (runs in main thread)""" + print(f"[Main Thread] Batch processing complete: {len(results)} images") + + +async def main(): + # Create components + processor = ImageProcessor() + viewer = ImageViewer() + + # Connect signals + processor.processing_complete.connect(viewer, viewer.on_image_processed) + processor.batch_complete.connect(viewer, viewer.on_batch_complete) + + # Start worker + print("\n=== Starting worker ===\n") + processor.start(cache_size=5) + + # Simulate image processing requests + print("\n=== Processing single images ===\n") + for i in range(3): + image_id = f"img_{i}" + image_data = b"fake_image_data" + await processor.queue_task(processor.process_image(image_id, image_data)) + + # Simulate batch processing + print("\n=== Processing batch ===\n") + batch = [(f"batch_img_{i}", b"fake_batch_data") for i in range(3)] + await processor.queue_task(processor.process_batch(batch)) + + # Wait for processing to complete + await asyncio.sleep(3) + + # Stop worker + print("\n=== Stopping worker ===\n") + processor.stop() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/pyproject.toml b/pyproject.toml index 13751db..3bd850f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,10 +4,10 @@ build-backend = "setuptools.build_meta" [project] name = "tsignal" -version = "0.1.0" +version = "0.2.0" description = "A Python Signal-Slot library inspired by Qt" readme = "README.md" -requires-python = ">=3.7" +requires-python = ">=3.10" license = {text = "MIT"} authors = [ {name = "San Kim", email = "tsignal.dev@gmail.com"} @@ -18,9 +18,6 @@ classifiers = [ "Intended Audience :: Developers", "License :: OSI Approved :: MIT License", "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.7", - "Programming Language :: Python :: 3.8", - "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", @@ -37,7 +34,8 @@ Issues = "https://github.com/tsignal/tsignal-python/issues" dev = [ "pytest>=7.0", "pytest-cov>=4.0", - "pytest-asyncio>=0.21.0" + "pytest-asyncio>=0.21.0", + "memory_profiler" ] [tool.setuptools] @@ -49,6 +47,7 @@ addopts = "--strict-markers --disable-warnings" testpaths = ["tests"] markers = [ "asyncio: mark test as an async test", + "performance: mark test as a performance test", ] asyncio_mode = "auto" asyncio_default_fixture_loop_scope = "function" @@ -57,6 +56,14 @@ asyncio_default_fixture_loop_scope = "function" source = ["tsignal"] branch = true +[tool.pylint] +disable = [ + "protected-access", + "too-few-public-methods", + "too-many-statements", + "broad-exception-caught" +] + [tool.coverage.report] exclude_lines = [ "pragma: no cover", @@ -67,6 +74,21 @@ exclude_lines = [ "pass", "raise ImportError", ] + +[tool.mypy] +# Disable checking for untyped function bodies +check_untyped_defs = false + +# Allow implicit Optional +no_implicit_optional = false + +# Disable specific errors +disable_error_code = ["annotation-unchecked", "assignment"] + +# Do not treat warnings as errors +warn_return_any = false +warn_unused_configs = false + ignore_errors = true omit = [ "tests/*", diff --git a/src/tsignal/__init__.py b/src/tsignal/__init__.py index 76b9f33..20a7000 100644 --- a/src/tsignal/__init__.py +++ b/src/tsignal/__init__.py @@ -6,10 +6,9 @@ t_with_signals, t_signal, t_slot, - TSignal, TConnectionType, - TSignalConstants, ) +from .contrib.patterns.worker.decorators import t_with_worker __version__ = "0.1.0" @@ -17,7 +16,6 @@ "t_with_signals", "t_signal", "t_slot", - "TSignal", + "t_with_worker", "TConnectionType", - "TSignalConstants", ] diff --git a/src/tsignal/contrib/patterns/__init__.py b/src/tsignal/contrib/patterns/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/tsignal/contrib/patterns/worker/__init__.py b/src/tsignal/contrib/patterns/worker/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/tsignal/contrib/patterns/worker/decorators.py b/src/tsignal/contrib/patterns/worker/decorators.py new file mode 100644 index 0000000..52cd556 --- /dev/null +++ b/src/tsignal/contrib/patterns/worker/decorators.py @@ -0,0 +1,135 @@ +""" +Decorator for the worker pattern. + +This decorator enhances a class to support a worker pattern, allowing for +asynchronous task processing in a separate thread. It ensures that the +class has the required asynchronous `initialize` and `finalize` methods, +facilitating the management of worker threads and task queues. +""" + +import asyncio +import threading +import logging + +logger = logging.getLogger(__name__) + + +def t_with_worker(cls): + """Decorator for the worker pattern.""" + if not asyncio.iscoroutinefunction(getattr(cls, "initialize", None)): + raise TypeError(f"{cls.__name__}.initialize must be an async function") + if not asyncio.iscoroutinefunction(getattr(cls, "finalize", None)): + raise TypeError(f"{cls.__name__}.finalize must be an async function") + + class WorkerClass(cls): + """Worker class for the worker pattern.""" + def __init__(self): + self._worker_loop = None + self._worker_thread = None + self._task_queue = asyncio.Queue() + # Setting up thread/loop for compatibility with t_with_signals + self._thread = threading.current_thread() + try: + self._loop = asyncio.get_event_loop() + self._stopping = asyncio.Event() + except RuntimeError: + self._loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._loop) + super().__init__() + + async def queue_task(self, coro): + """Method to add a task to the queue""" + await self._task_queue.put(coro) + + async def _process_queue(self): + """Internal method to process the task queue""" + while not self._stopping.is_set(): + try: + coro = await asyncio.wait_for(self._task_queue.get(), timeout=0.1) + await coro + self._task_queue.task_done() + except asyncio.TimeoutError: + continue + except Exception as e: + logger.error("Error processing queued task: %s", e) + + def start(self, *args, **kwargs): + """Start the worker thread.""" + if self._worker_thread: + raise RuntimeError("Worker already started") + + def run(): + self._worker_loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._worker_loop) + self._thread = threading.current_thread() + self._loop = self._worker_loop + try: + self._worker_loop.run_until_complete( + self.initialize(*args, **kwargs) + ) + + async def run_loop(): + try: + queue_task = self._worker_loop.create_task( + self._process_queue() + ) + + await self._stopping.wait() + + queue_task.cancel() + try: + await queue_task + except asyncio.CancelledError: + pass + + if hasattr(self, "finalize"): + await self.finalize() + + # Wait until all callbacks in the current event loop are processed + while self._worker_loop.is_running(): + pending_tasks = [ + task + for task in asyncio.all_tasks(self._worker_loop) + if task is not asyncio.current_task() + ] + if not pending_tasks: + break + + await asyncio.gather( + *pending_tasks, return_exceptions=True + ) + + finally: + if self._worker_loop and self._worker_loop.is_running(): + self._worker_loop.stop() + + self._worker_loop.create_task(run_loop()) + self._worker_loop.run_forever() + finally: + if self._worker_loop: + self._worker_loop.close() + self._worker_loop = None + + self._worker_thread = threading.Thread( + target=run, name=f"{cls.__name__}Worker", daemon=True + ) + self._worker_thread.start() + + def stop(self): + """Stop the worker thread.""" + if ( + self._worker_loop + and self._worker_thread + and self._worker_thread.is_alive() + ): + self._worker_loop.call_soon_threadsafe(self._stopping.set) + self._worker_thread.join(timeout=1) + + if self._worker_thread and self._worker_thread.is_alive(): + logger.warning( + "Worker thread %s did not stop gracefully", + self._worker_thread.name, + ) + self._worker_thread = None + + return WorkerClass diff --git a/src/tsignal/core.py b/src/tsignal/core.py index 9a51309..b935ef5 100644 --- a/src/tsignal/core.py +++ b/src/tsignal/core.py @@ -1,18 +1,26 @@ -# Standard library imports +""" +Implementation of the Signal class for tsignal. + +Provides signal-slot communication pattern for event handling, supporting both +synchronous and asynchronous operations in a thread-safe manner. +""" + import asyncio import functools import logging import threading from enum import Enum -from typing import Callable, List, Tuple +from typing import Callable, List, Tuple, Optional, Union class TConnectionType(Enum): - DirectConnection = 1 - QueuedConnection = 2 + """Connection type for signal-slot connections.""" + DIRECT_CONNECTION = 1 + QUEUED_CONNECTION = 2 -class TSignalConstants: +class _SignalConstants: + """Constants for signal-slot communication.""" FROM_EMIT = "_from_emit" THREAD = "_thread" LOOP = "_loop" @@ -22,52 +30,83 @@ class TSignalConstants: logger = logging.getLogger(__name__) +def _wrap_direct_function(func): + """Wrapper for directly connected functions""" + is_coroutine = asyncio.iscoroutinefunction(func) + + @functools.wraps(func) + def wrapper(*args, **kwargs): + """Wrapper for directly connected functions""" + # Remove FROM_EMIT + kwargs.pop(_SignalConstants.FROM_EMIT, False) + + # DIRECT_CONNECTION executes immediately regardless of thread + if is_coroutine: + try: + loop = asyncio.get_event_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + return loop.create_task(func(*args, **kwargs)) + return func(*args, **kwargs) + + return wrapper + + class TSignal: - def __init__(self, owner): - """Initialize signal""" - self.owner = owner - self.connections: List[Tuple[object, str, TConnectionType]] = [] - logger.debug(f"Signal initialized for {owner.__class__.__name__}") - - def connect(self, receiver: object, slot: Callable): - """Connect signal to a slot""" - if receiver is None: - raise AttributeError("Cannot connect to None receiver") - if not callable(slot): - raise TypeError("Slot must be callable") - - logger.debug( - f"Connecting {receiver.__class__.__name__}.{slot.__name__} " - f"to {self.owner.__class__.__name__}" - ) + """Signal class for tsignal.""" + def __init__(self): + self.connections: List[Tuple[Optional[object], Callable, TConnectionType]] = [] + + def connect( + self, receiver_or_slot: Union[object, Callable], slot: Optional[Callable] = None + ): + """Connect signal to a slot.""" + if slot is None: + if not callable(receiver_or_slot): + logger.error( + "Invalid connection attempt - receiver_or_slot is not callable: %s", + receiver_or_slot, + ) + raise TypeError("When slot is not provided, receiver must be callable") + + receiver = None + + if hasattr(receiver_or_slot, "__self__"): + obj = receiver_or_slot.__self__ + if hasattr(obj, _SignalConstants.THREAD) and hasattr( + obj, _SignalConstants.LOOP + ): + receiver = obj + slot = receiver_or_slot + else: + slot = _wrap_direct_function(receiver_or_slot) + else: + slot = _wrap_direct_function(receiver_or_slot) + else: + if receiver_or_slot is None: + logger.error("Invalid connection attempt - receiver cannot be None") + raise AttributeError("Receiver cannot be None") + receiver = receiver_or_slot + if not callable(slot): + logger.error( + "Invalid connection attempt - slot is not callable: %s", + slot, + ) + raise TypeError("Slot must be callable") is_coroutine = asyncio.iscoroutinefunction(slot) conn_type = ( - TConnectionType.QueuedConnection + TConnectionType.QUEUED_CONNECTION if is_coroutine - or threading.current_thread() - != getattr(receiver, TSignalConstants.THREAD, None) - else TConnectionType.DirectConnection + else TConnectionType.DIRECT_CONNECTION ) - self.connections.append((receiver, slot, conn_type)) - logger.debug(f"Connection established with type: {conn_type.name}") def disconnect(self, receiver: object = None, slot: Callable = None) -> int: - """ - Disconnect signal from slot(s). - - Args: - receiver: Specific receiver object to disconnect. If None, matches any receiver. - slot: Specific slot to disconnect. If None, matches any slot. - - Returns: - Number of disconnected connections - """ + """Disconnect signal from slot(s).""" if receiver is None and slot is None: - logger.debug( - f"Disconnecting all slots from {self.owner.__class__.__name__}" - ) + logger.debug("Disconnecting all slots") count = len(self.connections) self.connections.clear() return count @@ -76,82 +115,50 @@ def disconnect(self, receiver: object = None, slot: Callable = None) -> int: new_connections = [] for r, s, t in self.connections: - if (receiver is None or r == receiver) and (slot is None or s == slot): - logger.debug( - f"Disconnecting {r.__class__.__name__}.{s.__name__} " - f"from {self.owner.__class__.__name__}" - ) + # Compare original function and wrapped function for directly connected functions + if r is None and slot is not None: + if getattr(s, "__wrapped__", None) == slot or s == slot: + continue + elif (receiver is None or r == receiver) and (slot is None or s == slot): continue new_connections.append((r, s, t)) self.connections = new_connections disconnected = original_count - len(self.connections) - - if disconnected > 0: - logger.debug( - f"Disconnected {disconnected} connection(s) from {self.owner.__class__.__name__}" - ) - else: - logger.debug( - f"No matching connections found to disconnect in {self.owner.__class__.__name__}" - ) - + logger.debug("Disconnected %d connection(s)", disconnected) return disconnected def emit(self, *args, **kwargs): - logger.debug( - f"Signal emission from {self.owner.__class__.__name__} " - f"with {len(self.connections)} connections" - ) - - current_loop = asyncio.get_event_loop() - logger.debug( - f"Current event loop: {current_loop}, running: {current_loop.is_running()}" - ) - - kwargs[TSignalConstants.FROM_EMIT] = True + """Emit signal to connected slots.""" + logger.debug("Signal emission started") for receiver, slot, conn_type in self.connections: - logger.debug(f"Processing {receiver.__class__.__name__}.{slot.__name__}") - - receiver_loop = getattr(receiver, TSignalConstants.LOOP, None) - if not receiver_loop: - logger.error( - f"No event loop found for receiver {receiver.__class__.__name__}" - ) - continue - - is_coroutine = asyncio.iscoroutinefunction(slot) - logger.debug(f"Is coroutine: {is_coroutine}") - try: - if conn_type == TConnectionType.DirectConnection: - logger.debug("Executing regular function directly") + if conn_type == TConnectionType.DIRECT_CONNECTION: slot(*args, **kwargs) - else: # QueuedConnection + else: # QUEUED_CONNECTION + receiver_loop = getattr(receiver, "_loop", None) + if not receiver_loop: + logger.error("No event loop found for receiver") + continue + + is_coroutine = asyncio.iscoroutinefunction(slot) if is_coroutine: - logger.debug("Creating async task") def create_task_wrapper(s=slot): task = asyncio.create_task(s(*args, **kwargs)) - logger.debug(f"Created task: {task}") return task receiver_loop.call_soon_threadsafe(create_task_wrapper) - logger.debug("Task creation scheduled") else: - logger.debug("Scheduling regular function via QueuedConnection") def call_wrapper(s=slot): s(*args, **kwargs) receiver_loop.call_soon_threadsafe(call_wrapper) - logger.debug("Regular function scheduled") + except Exception as e: - logger.error( - f"Error in signal emission to {receiver.__class__.__name__}.{slot.__name__}: {e}", - exc_info=True, - ) + logger.error("Error in signal emission: %s", e, exc_info=True) def t_signal(func): @@ -161,7 +168,7 @@ def t_signal(func): @property def wrapper(self): if not hasattr(self, f"_{sig_name}"): - setattr(self, f"_{sig_name}", TSignal(self)) + setattr(self, f"_{sig_name}", TSignal()) return getattr(self, f"_{sig_name}") return wrapper @@ -175,23 +182,18 @@ def t_slot(func): @functools.wraps(func) async def wrapper(self, *args, **kwargs): - from_emit = kwargs.pop(TSignalConstants.FROM_EMIT, False) + """Wrapper for coroutine slots""" + from_emit = kwargs.pop(_SignalConstants.FROM_EMIT, False) - if not hasattr(self, TSignalConstants.THREAD): + if not hasattr(self, _SignalConstants.THREAD): self._thread = threading.current_thread() - logger.debug( - f"Set thread for {self.__class__.__name__}: {self._thread}" - ) - if not hasattr(self, TSignalConstants.LOOP): + if not hasattr(self, _SignalConstants.LOOP): try: self._loop = asyncio.get_running_loop() except RuntimeError: self._loop = asyncio.new_event_loop() asyncio.set_event_loop(self._loop) - logger.debug( - f"Created new event loop for {self.__class__.__name__}" - ) if not from_emit: current_thread = threading.current_thread() @@ -208,30 +210,25 @@ async def wrapper(self, *args, **kwargs): @functools.wraps(func) def wrapper(self, *args, **kwargs): - from_emit = kwargs.pop(TSignalConstants.FROM_EMIT, False) + """Wrapper for regular slots""" + from_emit = kwargs.pop(_SignalConstants.FROM_EMIT, False) - if not hasattr(self, TSignalConstants.THREAD): + if not hasattr(self, _SignalConstants.THREAD): self._thread = threading.current_thread() - logger.debug( - f"Set thread for {self.__class__.__name__}: {self._thread}" - ) - if not hasattr(self, TSignalConstants.LOOP): + if not hasattr(self, _SignalConstants.LOOP): try: self._loop = asyncio.get_running_loop() except RuntimeError: self._loop = asyncio.new_event_loop() asyncio.set_event_loop(self._loop) - logger.debug( - f"Created new event loop for {self.__class__.__name__}" - ) if not from_emit: current_thread = threading.current_thread() if current_thread != self._thread: logger.debug("Executing regular slot from different thread") self._loop.call_soon_threadsafe(lambda: func(self, *args, **kwargs)) - return + return None return func(self, *args, **kwargs) diff --git a/tests/conftest.py b/tests/conftest.py index 1f244b4..fc7914d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,10 +1,20 @@ -import pytest +""" +Shared fixtures for tests. +""" + +# pylint: disable=no-member +# pylint: disable=redefined-outer-name +# pylint: disable=unused-variable +# pylint: disable=unused-argument +# pylint: disable=property-with-parameters + +import os +import sys import asyncio import threading -from tsignal import t_with_signals, t_signal, t_slot import logging -import sys -import os +import pytest +from tsignal import t_with_signals, t_signal, t_slot # Only creating the logger without configuration logger = logging.getLogger(__name__) @@ -12,60 +22,67 @@ @pytest.fixture(scope="function") def event_loop(): + """Create an event loop""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) yield loop loop.close() - @t_with_signals class Sender: + """Sender class""" @t_signal def value_changed(self, value): """Signal for value changes""" - pass def emit_value(self, value): + """Emit a value change signal""" self.value_changed.emit(value) - @t_with_signals class Receiver: + """Receiver class""" def __init__(self): super().__init__() self.received_value = None self.received_count = 0 self.id = id(self) - logger.info(f"Created Receiver[{self.id}]") + logger.info("Created Receiver[%d]", self.id) @t_slot async def on_value_changed(self, value: int): - logger.info(f"Receiver[{self.id}] on_value_changed called with value: {value}") - logger.info(f"Current thread: {threading.current_thread().name}") - logger.info(f"Current event loop: {asyncio.get_running_loop()}") + """Slot for value changes""" + logger.info("Receiver[%d] on_value_changed called with value: %d", self.id, value) + logger.info("Current thread: %s", threading.current_thread().name) + logger.info("Current event loop: %s", asyncio.get_running_loop()) self.received_value = value self.received_count += 1 logger.info( - f"Receiver[{self.id}] updated: value={self.received_value}, count={self.received_count}" + "Receiver[%d] updated: value=%d, count=%d", + self.id, self.received_value, self.received_count ) @t_slot def on_value_changed_sync(self, value: int): - print(f"Receiver[{self.id}] received value (sync): {value}") + """Sync slot for value changes""" + logger.info("Receiver[%d] on_value_changed_sync called with value: %d", self.id, value) self.received_value = value self.received_count += 1 - print( - f"Receiver[{self.id}] updated (sync): value={self.received_value}, count={self.received_count}" + logger.info( + "Receiver[%d] updated (sync): value=%d, count=%d", + self.id, self.received_value, self.received_count ) @pytest.fixture def receiver(event_loop): + """Create a receiver""" return Receiver() @pytest.fixture def sender(event_loop): + """Create a sender""" return Sender() @@ -79,6 +96,7 @@ def setup_logging(): default_level = logging.WARNING # Can enable DEBUG mode via environment variable + if os.environ.get("TSIGNAL_DEBUG"): default_level = logging.DEBUG diff --git a/tests/integration/test_async.py b/tests/integration/test_async.py index 95ac086..ba9ea19 100644 --- a/tests/integration/test_async.py +++ b/tests/integration/test_async.py @@ -1,6 +1,10 @@ -import pytest +""" +Test cases for asynchronous operations. +""" + import asyncio import logging +import pytest logger = logging.getLogger(__name__) @@ -8,30 +12,34 @@ @pytest.mark.asyncio async def test_multiple_async_slots(sender, receiver): """Test multiple async slots receiving signals""" - logger.info(f"Test starting with receiver[{receiver.id}]") + logger.info("Test starting with receiver[%s]", receiver.id) receiver2 = receiver.__class__() - logger.info(f"Created receiver2[{receiver2.id}]") + logger.info("Created receiver2[%s]", receiver2.id) - logger.info(f"Connecting receiver[{receiver.id}] to signal") + logger.info("Connecting receiver[%s] to signal", receiver.id) sender.value_changed.connect(receiver, receiver.on_value_changed) - logger.info(f"Connecting receiver2[{receiver2.id}] to signal") + logger.info("Connecting receiver2[%s] to signal", receiver2.id) sender.value_changed.connect(receiver2, receiver2.on_value_changed) logger.info("Emitting value 42") sender.emit_value(42) for i in range(5): - logger.info(f"Wait iteration {i+1}") + logger.info("Wait iteration %d", i + 1) if receiver.received_value is not None and receiver2.received_value is not None: logger.info("Both receivers have received values") break await asyncio.sleep(0.1) logger.info( - f"Final state - receiver1[{receiver.id}]: value={receiver.received_value}" + "Final state - receiver1[%s]: value=%d", + receiver.id, + receiver.received_value, ) logger.info( - f"Final state - receiver2[{receiver2.id}]: value={receiver2.received_value}" + "Final state - receiver2[%s]: value=%d", + receiver2.id, + receiver2.received_value, ) assert receiver.received_value == 42 @@ -52,5 +60,5 @@ async def test_async_slot_execution(sender, receiver): break await asyncio.sleep(0.1) - logger.info(f"Receiver value: {receiver.received_value}") + logger.info("Receiver value: %d", receiver.received_value) assert receiver.received_value == 42 diff --git a/tests/integration/test_threading.py b/tests/integration/test_threading.py index 3c8c5a5..2e02950 100644 --- a/tests/integration/test_threading.py +++ b/tests/integration/test_threading.py @@ -1,5 +1,9 @@ -import threading +""" +Test cases for threading. +""" + import asyncio +import threading from time import sleep @@ -9,6 +13,7 @@ def test_different_thread_connection(sender, receiver, event_loop): sender_done = threading.Event() def run_sender(): + """Run the sender thread""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) for i in range(3): @@ -18,6 +23,7 @@ def run_sender(): loop.close() async def wait_for_receiver(): + """Wait for the receiver to receive the signals""" while not sender_done.is_set() or receiver.received_count < 3: await asyncio.sleep(0.1) @@ -35,6 +41,7 @@ def test_call_slot_from_other_thread(receiver, event_loop): done = threading.Event() def other_thread(): + """Run the other thread""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) diff --git a/tests/integration/test_with_signal.py b/tests/integration/test_with_signal.py index ee1344f..a4056a8 100644 --- a/tests/integration/test_with_signal.py +++ b/tests/integration/test_with_signal.py @@ -1,4 +1,7 @@ -import pytest +""" +Test cases for the with-signal pattern. +""" + import asyncio from tests.conftest import Receiver @@ -25,6 +28,7 @@ def test_multiple_slots(sender, event_loop): sender.value_changed.connect(receiver2, receiver2.on_value_changed) async def test(): + """Test the multiple slot connections""" sender.emit_value(42) await asyncio.sleep(0.1) assert receiver1.received_value == 42 diff --git a/tests/integration/test_worker.py b/tests/integration/test_worker.py new file mode 100644 index 0000000..4619771 --- /dev/null +++ b/tests/integration/test_worker.py @@ -0,0 +1,123 @@ +""" +Test cases for the worker pattern. +""" + +# pylint: disable=no-member +# pylint: disable=redefined-outer-name +# pylint: disable=unused-variable + +import asyncio +import logging +import pytest +from tsignal.contrib.patterns.worker.decorators import t_with_worker + +logger = logging.getLogger(__name__) + + +@pytest.fixture +async def worker(): + """Create a worker""" + logger.info("Creating worker in fixture") + w = TestWorker() + yield w + logger.info("Cleaning up worker in fixture") + if getattr(w, "_worker_thread", None) and w._worker_thread.is_alive(): + logger.info("Stopping worker thread") + w.stop() + logger.info("Waiting for worker thread to join") + w._worker_thread.join(timeout=1) + logger.info("Worker thread cleanup complete") + + +@t_with_worker +class TestWorker: + """Test worker class""" + + def __init__(self): + logger.info("Initializing TestWorker") + self.initialize_called = False + self.finalize_called = False + self.data = [] + super().__init__() + + async def initialize(self, initial_value=None): + """Initialize the worker""" + logger.info("TestWorker.initialize called with %s", initial_value) + self.initialize_called = True + if initial_value: + self.data.append(initial_value) + + async def finalize(self): + """Finalize the worker""" + logger.info("TestWorker.finalize called") + self.finalize_called = True + + +def test_worker_requires_async_methods(): + """Test that the worker requires async methods""" + logger.info("Starting test_worker_requires_async_methods") + with pytest.raises(TypeError, match=r".*initialize must be an async function"): + + @t_with_worker + class InvalidWorker1: + """Invalid worker class""" + + def initialize(self): # not async + """Initialize the worker""" + + async def finalize(self): + """Finalize the worker""" + + logger.info("First check passed") + + with pytest.raises(TypeError, match=r".*finalize must be an async function"): + + @t_with_worker + class InvalidWorker2: + """Invalid worker class""" + + async def initialize(self): + """Initialize the worker""" + + def finalize(self): # not async + """Finalize the worker""" + + logger.info("Second check passed") + + +@pytest.mark.asyncio +async def test_worker_lifecycle(worker): + """Test the worker lifecycle""" + logger.info("Starting test_worker_lifecycle") + initial_value = "test" + + logger.info("Checking initial state") + assert worker._worker_thread is None + assert worker._worker_loop is None + assert not worker.initialize_called + assert not worker.finalize_called + + logger.info("Starting worker") + worker.start(initial_value) + + logger.info("Waiting for worker initialization") + for i in range(10): + if worker.initialize_called: + logger.info("Worker initialized after %d attempts", i + 1) + break + logger.info("Waiting attempt %d", i + 1) + await asyncio.sleep(0.1) + else: + logger.error("Worker failed to initialize") + pytest.fail("Worker did not initialize in time") + + logger.info("Checking worker state") + assert worker.initialize_called + assert worker.data == [initial_value] + + logger.info("Stopping worker") + worker.stop() + + logger.info("Checking final state") + assert worker.finalize_called + logger.info("Test complete") diff --git a/tests/integration/test_worker_queue.py b/tests/integration/test_worker_queue.py new file mode 100644 index 0000000..821bf5a --- /dev/null +++ b/tests/integration/test_worker_queue.py @@ -0,0 +1,152 @@ +""" +Test cases for the worker-queue pattern. +""" + +# pylint: disable=no-member +# pylint: disable=redefined-outer-name + +import asyncio +import logging +import pytest +from tsignal import t_signal +from tsignal.contrib.patterns.worker.decorators import t_with_worker + +logger = logging.getLogger(__name__) + + +@pytest.fixture +async def queue_worker(): + """Create a queue worker""" + logger.info("Creating QueueWorker") + w = QueueWorker() + yield w + logger.info("Cleaning up QueueWorker") + if getattr(w, "_worker_thread", None) and w._worker_thread.is_alive(): + w.stop() + w._worker_thread.join(timeout=1) + + +@t_with_worker +class QueueWorker: + """Queue worker class""" + + def __init__(self): + self.processed_items = [] + super().__init__() + + async def initialize(self): + """Initialize the worker""" + logger.info("QueueWorker initializing") + + async def finalize(self): + """Finalize the worker""" + logger.info("QueueWorker finalizing") + + async def process_item(self, item): + """Process an item""" + logger.info("Processing item: %s", item) + await asyncio.sleep(0.1) # Simulate work + self.processed_items.append(item) + + +@pytest.mark.asyncio +async def test_basic_queue_operation(queue_worker): + """Basic queue operation test""" + queue_worker.start() + await asyncio.sleep(0.1) + + await queue_worker.queue_task(queue_worker.process_item("item1")) + await queue_worker.queue_task(queue_worker.process_item("item2")) + + await asyncio.sleep(0.5) + + assert "item1" in queue_worker.processed_items + assert "item2" in queue_worker.processed_items + assert len(queue_worker.processed_items) == 2 + + +@pytest.mark.asyncio +async def test_queue_order(queue_worker): + """Test for ensuring the order of the task queue""" + queue_worker.start() + await asyncio.sleep(0.1) + + items = ["first", "second", "third"] + for item in items: + await queue_worker.queue_task(queue_worker.process_item(item)) + + await asyncio.sleep(0.5) + + assert queue_worker.processed_items == items + + +@pytest.mark.asyncio +async def test_queue_error_handling(queue_worker): + """Test for error handling in the task queue""" + + async def failing_task(): + raise ValueError("Test error") + + queue_worker.start() + await asyncio.sleep(0.1) + + # Add normal and failing tasks + await queue_worker.queue_task(queue_worker.process_item("good_item")) + await queue_worker.queue_task(failing_task()) + await queue_worker.queue_task(queue_worker.process_item("after_error")) + + await asyncio.sleep(0.5) + + # The error should not prevent the next task from being processed + assert "good_item" in queue_worker.processed_items + assert "after_error" in queue_worker.processed_items + + +@pytest.mark.asyncio +async def test_queue_cleanup_on_stop(queue_worker): + """Test for queue cleanup when worker stops""" + queue_worker.start() + await asyncio.sleep(0.1) + + # Add a long task + async def long_task(): + await asyncio.sleep(0.5) + queue_worker.processed_items.append("long_task") + + await queue_worker.queue_task(long_task()) + await asyncio.sleep(0.1) # Wait for the task to start + + # Stop the worker while the task is running + queue_worker.stop() + + # Check if the worker exited normally + assert not queue_worker._worker_thread.is_alive() + + +@pytest.mark.asyncio +async def test_mixed_signal_and_queue(queue_worker): + """Test for simultaneous use of signals and task queue""" + + # Add a signal + @t_signal + def task_completed(): + pass + + queue_worker.task_completed = task_completed.__get__(queue_worker) + signal_received = [] + queue_worker.task_completed.connect(lambda: signal_received.append(True)) + + queue_worker.start() + await asyncio.sleep(0.1) + + # Add a task and emit the signal + async def task_with_signal(): + await asyncio.sleep(0.1) + queue_worker.processed_items.append("signal_task") + queue_worker.task_completed.emit() + + await queue_worker.queue_task(task_with_signal()) + await asyncio.sleep(0.3) + + assert "signal_task" in queue_worker.processed_items + assert signal_received == [True] diff --git a/tests/integration/test_worker_signal.py b/tests/integration/test_worker_signal.py new file mode 100644 index 0000000..0b2f59d --- /dev/null +++ b/tests/integration/test_worker_signal.py @@ -0,0 +1,157 @@ +""" +Test cases for the worker-signal pattern. +""" + +# pylint: disable=redefined-outer-name +# pylint: disable=unnecessary-lambda +# pylint: disable=unnecessary-lambda-assignment +# pylint: disable=no-member + +import asyncio +import logging +import pytest +from tsignal.contrib.patterns.worker.decorators import t_with_worker +from tsignal import t_signal + +logger = logging.getLogger(__name__) + + +@pytest.fixture +async def signal_worker(): + """Create a signal worker""" + logger.info("Creating SignalWorker") + w = SignalWorker() + yield w + logger.info("Cleaning up SignalWorker") + if getattr(w, "_worker_thread", None) and w._worker_thread.is_alive(): + w.stop() + w._worker_thread.join(timeout=1) + + +@t_with_worker +class SignalWorker: + """Signal worker class""" + + def __init__(self): + self.value = None + super().__init__() + + @t_signal + def value_changed(self): + """Signal emitted when the value changes""" + + @t_signal + def worker_event(self): + """Signal emitted when a worker event occurs""" + + async def initialize(self): + """Initialize the worker""" + logger.info("SignalWorker initializing") + self.value_changed.emit("initialized") + + async def finalize(self): + """Finalize the worker""" + logger.info("SignalWorker finalizing") + self.value_changed.emit("finalized") + + def set_value(self, value): + """Set the value and emit the signal""" + logger.info("Setting value to: %s", value) + self.value = value + self.value_changed.emit(value) + + +@pytest.mark.asyncio +async def test_signal_from_initialize(signal_worker): + """Test if the signal emitted from initialize is processed correctly""" + received = [] + signal_worker.value_changed.connect(lambda v: received.append(v)) + + signal_worker.start() + await asyncio.sleep(0.1) + + assert "initialized" in received + + +@pytest.mark.asyncio +async def test_signal_from_finalize(signal_worker): + """Test if the signal emitted from finalize is processed correctly""" + received = [] + signal_worker.value_changed.connect(lambda v: received.append(v)) + + signal_worker.start() + await asyncio.sleep(0.1) + signal_worker.stop() + + assert "finalized" in received + + +@pytest.mark.asyncio +async def test_signal_from_worker_thread(signal_worker): + """Test if the signal emitted from the worker thread is processed correctly""" + received = [] + signal_worker.value_changed.connect(lambda v: received.append(v)) + + signal_worker.start() + await asyncio.sleep(0.1) + + # Emit signal from the worker thread's event loop + signal_worker._worker_loop.call_soon_threadsafe( + lambda: signal_worker.set_value("test_value") + ) + + await asyncio.sleep(0.1) + assert "test_value" in received + + +@pytest.mark.asyncio +async def test_multiple_signals(signal_worker): + """Test if multiple signals are processed independently""" + value_changes = [] + worker_events = [] + + signal_worker.value_changed.connect(lambda v: value_changes.append(v)) + signal_worker.worker_event.connect(lambda v: worker_events.append(v)) + + signal_worker.start() + await asyncio.sleep(0.1) + + # Emit value_changed signal + signal_worker._worker_loop.call_soon_threadsafe( + lambda: signal_worker.set_value("test_value") + ) + + # Emit worker_event signal + signal_worker._worker_loop.call_soon_threadsafe( + lambda: signal_worker.worker_event.emit("worker_event") + ) + + await asyncio.sleep(0.1) + + assert "test_value" in value_changes + assert "worker_event" in worker_events + assert len(worker_events) == 1 + + +@pytest.mark.asyncio +async def test_signal_disconnect(signal_worker): + """Test if signal disconnection works correctly""" + received = [] + handler = lambda v: received.append(v) + + signal_worker.value_changed.connect(handler) + signal_worker.start() + await asyncio.sleep(0.1) + + assert "initialized" in received + received.clear() + + # Disconnect signal + signal_worker.value_changed.disconnect(slot=handler) + + signal_worker._worker_loop.call_soon_threadsafe( + lambda: signal_worker.set_value("after_disconnect") + ) + + await asyncio.sleep(0.1) + assert len(received) == 0 diff --git a/tests/performance/test_memory.py b/tests/performance/test_memory.py new file mode 100644 index 0000000..8b10827 --- /dev/null +++ b/tests/performance/test_memory.py @@ -0,0 +1,42 @@ +""" +Test cases for memory usage. +""" + +# pylint: disable=no-member +# pylint: disable=redefined-outer-name +# pylint: disable=unused-variable + +from memory_profiler import profile +import pytest +from tsignal import t_with_signals, t_signal, t_slot + +@pytest.mark.performance +@profile +def test_memory_usage(): + """Test memory usage""" + # Create and delete signal/slot pairs repeatedly + for _ in range(1000): + sender = create_complex_signal_chain() + sender.signal.disconnect() + +def create_complex_signal_chain(): + """Create a complex signal chain""" + @t_with_signals + class Sender: + """Sender class""" + @t_signal + def signal(self): + """Signal method""" + + @t_with_signals + class Receiver: + """Receiver class""" + @t_slot + def slot(self, value): + """Slot method""" + + sender = Sender() + receivers = [Receiver() for _ in range(100)] + for r in receivers: + sender.signal.connect(r, r.slot) + return sender diff --git a/tests/performance/test_stress.py b/tests/performance/test_stress.py new file mode 100644 index 0000000..2a0a6ea --- /dev/null +++ b/tests/performance/test_stress.py @@ -0,0 +1,37 @@ +""" +Test cases for stress testing. +""" + +import asyncio +import pytest +from tsignal import t_with_signals, t_signal, t_slot + +# pylint: disable=no-member +# pylint: disable=redefined-outer-name +# pylint: disable=unused-variable + +@pytest.mark.asyncio +async def test_heavy_signal_load(): + """Test heavy signal load""" + @t_with_signals + class Sender: + """Sender class""" + @t_signal + def signal(self): + """Signal method""" + + @t_with_signals + class Receiver: + """Receiver class""" + @t_slot + async def slot(self): + """Slot method""" + await asyncio.sleep(0.001) + + sender = Sender() + receivers = [Receiver() for _ in range(100)] + for r in receivers: + sender.signal.connect(r, r.slot) + + for _ in range(1000): + sender.signal.emit() diff --git a/tests/unit/test_signal.py b/tests/unit/test_signal.py index 7fc35ee..c5aa5ae 100644 --- a/tests/unit/test_signal.py +++ b/tests/unit/test_signal.py @@ -1,7 +1,10 @@ -import pytest +""" +Test cases for the signal pattern. +""" + import asyncio -from tsignal.core import TSignal -from tsignal.core import t_with_signals, t_signal, t_slot +import pytest +from tsignal.core import t_with_signals , t_slot, TSignal from ..conftest import Receiver @@ -128,17 +131,18 @@ def test_signal_disconnect_nonexistent(sender, receiver): @pytest.mark.asyncio -async def test_signal_disconnect_during_emit(sender, receiver, event_loop): +async def test_signal_disconnect_during_emit(sender, receiver): """Test disconnecting slots while emission is in progress""" @t_with_signals class SlowReceiver: + """Receiver class for slow slot""" def __init__(self): - super().__init__() self.received_value = None @t_slot async def on_value_changed(self, value): + """Slot for value changed""" await asyncio.sleep(0.1) self.received_value = value @@ -154,3 +158,118 @@ async def on_value_changed(self, value): assert slow_receiver.received_value == 42 assert receiver.received_value is None + + +def test_direct_function_connection(sender): + """Test direct connection of lambda and regular functions""" + received_values = [] + + def collect_value(value): + """Slot for value changed""" + received_values.append(value) + + # Connect lambda function + sender.value_changed.connect(lambda v: received_values.append(v * 2)) + + # Connect regular function + sender.value_changed.connect(collect_value) + + # Emit signal + sender.emit_value(42) + + assert 42 in received_values # Added by collect_value + assert 84 in received_values # Added by lambda function (42 * 2) + assert len(received_values) == 2 + + +@pytest.mark.asyncio +async def test_direct_async_function_connection(sender): + """Test direct connection of async functions""" + received_values = [] + + async def async_collector(value): + """Slot for value changed""" + await asyncio.sleep(0.1) + received_values.append(value) + + # Connect async function + sender.value_changed.connect(async_collector) + + # Emit signal + sender.emit_value(42) + + # Wait for async processing + await asyncio.sleep(0.2) + + assert received_values == [42] + + +def test_direct_function_disconnect(sender): + """Test disconnection of directly connected functions""" + received_values = [] + + def collector(v): + """Slot for value changed""" + received_values.append(v) + + sender.value_changed.connect(collector) + + # First emit + sender.emit_value(42) + assert received_values == [42] + + # Disconnect + disconnected = sender.value_changed.disconnect(slot=collector) + assert disconnected == 1 + + # Second emit - should not add value since connection is disconnected + sender.emit_value(43) + assert received_values == [42] + + +def test_method_connection_with_signal_attributes(sender): + """Test connecting a method with _thread and _loop attributes automatically sets up receiver""" + received_values = [] + + @t_with_signals + class SignalReceiver: + """Receiver class for signal attributes""" + def collect_value(self, value): + """Slot for value changed""" + received_values.append(value) + + class RegularClass: + """Regular class for value changed""" + def collect_value(self, value): + """Slot for value changed""" + received_values.append(value * 2) + + # signal_receiver's method + signal_receiver = SignalReceiver() + sender.value_changed.connect(signal_receiver.collect_value) + + # regular class's method + regular_receiver = RegularClass() + sender.value_changed.connect(regular_receiver.collect_value) + + # Emit signal + sender.emit_value(42) + + # signal_receiver's method is QUEUED_CONNECTION + connection = next( + conn + for conn in sender.value_changed.connections + if conn[1].__name__ == signal_receiver.collect_value.__name__ + ) + assert connection[0] == signal_receiver # receiver is set automatically + + # regular_receiver's method is DIRECT_CONNECTION + connection = next( + conn + for conn in sender.value_changed.connections + if hasattr(conn[1], "__wrapped__") + ) + assert connection[0] is None + + assert 42 in received_values + assert 84 in received_values diff --git a/tests/unit/test_slot.py b/tests/unit/test_slot.py index d91c29a..2917563 100644 --- a/tests/unit/test_slot.py +++ b/tests/unit/test_slot.py @@ -1,5 +1,9 @@ -import pytest +""" +Test cases for the slot pattern. +""" + import asyncio +import pytest from tsignal import t_with_signals, t_slot @@ -28,8 +32,10 @@ def test_slot_exception(sender, receiver, event_loop): @t_with_signals class ExceptionReceiver: + """Receiver class for exception testing""" @t_slot async def on_value_changed(self, value): + """Slot for value changed""" raise ValueError("Test exception") exception_receiver = ExceptionReceiver()