diff --git a/.gitignore b/.gitignore index d396881..4ff6e42 100644 --- a/.gitignore +++ b/.gitignore @@ -20,3 +20,6 @@ contracts/contract.abi.json /src/utils/__pycache__ /src/models/__pycache__ .coverage + +# Ignore docker compose override file +docker-compose.override.yml diff --git a/Dockerfile b/Dockerfile index a7b1126..f06fa08 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,7 +1,7 @@ # Dockerfile to create a clean, lightweight Docker Image for the Service Quality Oracle # Use Python 3.9 slim as the base image for a lightweight container -FROM python:3.9-slim +FROM python:3.11-slim # Add metadata labels LABEL description="Service Quality Oracle" \ diff --git a/README.md b/README.md index 79de8d3..d2b4461 100644 --- a/README.md +++ b/README.md @@ -49,17 +49,21 @@ Please refer to the [ELIGIBILITY_CRITERIA.md](./ELIGIBILITY_CRITERIA.md) file to The application follows a clear data flow, managed by a daily scheduler: -1. **Scheduler (`scheduler.py`)**: This is the main entry point. It runs on a schedule (e.g., daily), manages the application lifecycle, and triggers the oracle run. It is also responsible for catching up on any missed runs. +1. **Scheduler (`scheduler.py`)**: This is the main entry point. It runs on a schedule (e.g., daily), manages the application lifecycle, and triggers the oracle run. It is also responsible for catching up on any missed runs. -2. **Orchestrator (`service_quality_oracle.py`)**: For each run, this module orchestrates the end-to-end process by coordinating the other components. +2. **Orchestrator (`service_quality_oracle.py`)**: For each run, this module orchestrates the end-to-end process by coordinating the other components. -3. **Data Fetching (`bigquery_provider.py`)**: The orchestrator calls this provider to execute a configurable SQL query against Google BigQuery, fetching the raw indexer performance data. +3. **Data Fetching (`bigquery_provider.py`)**: The orchestrator calls this provider to execute a configurable SQL query against Google BigQuery, fetching the raw indexer performance data. -4. **Data Processing (`eligibility_pipeline.py`)**: The raw data is passed to this module, which processes it, filters for eligible and ineligible indexers, and generates CSV artifacts for auditing and record-keeping. +4. **Data Processing (`eligibility_pipeline.py`)**: The raw data is passed to this module, which processes it, filters for eligible and ineligible indexers, and generates CSV artifacts for auditing and record-keeping. -5. **Blockchain Submission (`blockchain_client.py`)**: The orchestrator takes the final list of eligible indexers and passes it to this client, which handles the complexities of batching, signing, and sending the transaction to the blockchain via RPC providers with built-in failover. +5. **Blockchain Submission (`blockchain_client.py`)**: The orchestrator takes the final list of eligible indexers and passes it to this client, which handles the complexities of batching, signing, and sending the transaction to the blockchain via RPC providers with built-in failover. -6. **Notifications (`slack_notifier.py`)**: Throughout the process, status updates (success, failure, warnings) are sent to Slack. +6. **Notifications (`slack_notifier.py`)**: Throughout the process, status updates (success, failure, warnings) are sent to Slack. + +## Architecture + +For a more detailed explanation of key architectural decisions, such as the RPC provider failover and circuit breaker logic, please see the [Technical Design Document](./docs/technical-design.md). ## CI/CD Pipeline @@ -128,7 +132,6 @@ bandit -r src/ ## TODO List (only outstanding TODOs) ### 1. Testing -- [ ] Create integration tests for the entire pipeline - [ ] Security review of code and dependencies ### 2. Documentation diff --git a/docker-compose.yml b/docker-compose.yml index b3e0605..8e32c6d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -51,7 +51,7 @@ services: memory: 512M # Restart policy - restart: unless-stopped + restart: "on-failure" # Healthcheck to ensure the container is running healthcheck: diff --git a/docs/technical-design.md b/docs/technical-design.md new file mode 100644 index 0000000..cce7b00 --- /dev/null +++ b/docs/technical-design.md @@ -0,0 +1,56 @@ +# Technical Design & Architecture + +This document outlines key architectural decisions and data flows within the Service Quality Oracle. + +## RPC Provider Failover and Circuit Breaker Logic + +The application is designed to be resilient to transient network issues and RPC provider failures. It uses a multi-layered approach involving internal retries, provider rotation, and an application-level circuit breaker to prevent catastrophic failures and infinite restart loops. + +The following diagram illustrates the sequence of events when all RPC providers fail, leading to a single recorded failure by the circuit breaker. + +```mermaid +sequenceDiagram + # Setup column titles + participant main_oracle as service_quality_oracle.py + participant blockchain_client as blockchain_client.py + participant circuit_breaker as circuit_breaker.py + participant slack_notifier as slack_notifier.py + + # Attempt function call + main_oracle->>blockchain_client: batch_allow_indexers_issuance_eligibility() + + # Describe failure loop inside the blockchain_client module + activate blockchain_client + alt RPC Loop (for each provider) + + # Attempt RPC call + blockchain_client->>blockchain_client: _execute_rpc_call() with provider A + note right of blockchain_client: Fails after 5 retries + + # Log failure + blockchain_client-->>blockchain_client: raises ConnectionError + note right of blockchain_client: Catches error, logs rotation + + # Retry RPC call + blockchain_client->>blockchain_client: _execute_rpc_call() with provider B + note right of blockchain_client: Fails after 5 retries + + # Log final failure + blockchain_client-->>blockchain_client: raises ConnectionError + note right of blockchain_client: All providers tried and failed + end + + # Raise error back to main_oracle oracle and exit blockchain_client module + blockchain_client-->>main_oracle: raises Final ConnectionError + deactivate blockchain_client + + # Take note of the failure in the circuit breaker, which can break the restart loop if triggered enough times in a short duration + main_oracle->>circuit_breaker: record_failure() + + # Notify of the RPC failure in slack + main_oracle->>slack_notifier: send_failure_notification() + + # Document restart process + note right of main_oracle: sys.exit(1) + note right of main_oracle: Docker will restart. CircuitBreaker can halt via sys.exit(0) +``` diff --git a/requirements.txt b/requirements.txt index c28fa5b..57264f1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,7 +2,7 @@ # Requires Python 3.9+ # Configuration management -tomli==2.2.1 # TOML support for Python < 3.11 +tomli==2.2.1 # Scheduling and resilience schedule==1.2.2 @@ -10,35 +10,34 @@ pytz==2025.2 tenacity==8.5.0 # Google Cloud BigQuery for data processing -google-cloud-bigquery==3.26.0 -bigframes==1.42.0 +google-cloud-bigquery==3.34.0 +bigframes==2.8.0 # Data processing and validation pandas==2.2.3 pandera==0.20.4 -numpy>=2.0.0 # Added as pandas dependency +numpy==2.3.1 # Blockchain integration - Latest compatible versions web3==7.12.0 -eth-account>=0.13.0 -eth-typing>=5.2.0 +eth-account==0.13.7 +eth-typing==5.2.1 # GraphQL and subgraph integration (for future subgraph functionality) gql==3.5.2 # HTTP and API requests==2.32.3 -aiohttp>=3.9.0 # For async HTTP requests (used by web3) +aiohttp==3.12.13 # Development/Testing -pytest>=8.0.0 -pytest-cov>=6.0.0 -pytest-mock>=3.0.0 -pytest-snapshot>=0.9.0 -mypy>=1.0.0 -types-pytz # Type stubs for pytz -types-requests # Type stubs for requests +pytest==8.4.1 +pytest-cov==6.2.1 +pytest-mock==3.14.1 +pytest-snapshot==0.9.0 +mypy==1.16.1 +types-pytz==2025.2.0.20250516 +types-requests==2.32.4.20250611 # Linting and formatting -ruff>=0.6.0 -pip==25.1 +ruff==0.12.0 diff --git a/scripts/test_slack_notifications.py b/scripts/test_slack_notifications.py index 6d0f3c2..0693b18 100755 --- a/scripts/test_slack_notifications.py +++ b/scripts/test_slack_notifications.py @@ -1,7 +1,10 @@ #!/usr/bin/env python3 """ Test script for Slack notifications. -This script tests the Slack notification functionality without running the full oracle. + +This script provides a simple way to test all Slack notification types without +running the full oracle pipeline. It requires the `SLACK_WEBHOOK_URL` environment +variable to be set. """ import logging @@ -20,135 +23,144 @@ def test_info_notification(notifier: SlackNotifier) -> bool: """ - Test sending an informational notification to Slack. + Test sending an informational notification. Args: - notifier: Configured SlackNotifier instance + notifier: A configured SlackNotifier instance. Returns: - True if test passes, False otherwise + True if the test passes, False otherwise. """ - # Send test info notification with sample message logger.info("Testing info notification...") - success = notifier.send_info_notification("Test info notification", "Test Notification") - logger.info(f"Info notification: {'PASSED' if success else 'FAILED'}") - return success + try: + notifier.send_info_notification( + title="Test Script Info", + message="This is a test informational notification.", + ) + logger.info("Info notification: PASSED") + return True + except Exception as e: + logger.error(f"Info notification: FAILED - {e}") + return False def test_success_notification(notifier: SlackNotifier) -> bool: """ - Test sending a success notification to Slack. + Test sending a success notification. Args: - notifier: Configured SlackNotifier instance + notifier: A configured SlackNotifier instance. Returns: - True if test passes, False otherwise + True if the test passes, False otherwise. """ - # Send test success notification with sample indexer data and transaction links logger.info("Testing success notification...") - - test_indexers = [ - "0x1234567890abcdef1234567890abcdef12345678", - "0xabcdef1234567890abcdef1234567890abcdef12", - "0x9876543210fedcba9876543210fedcba98765432", - ] - - test_transaction_links = [ - "https://sepolia.arbiscan.io/tx/0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef12", - "https://sepolia.arbiscan.io/tx/0xabcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890ab", - ] - - success = notifier.send_success_notification( - eligible_indexers=test_indexers, - total_processed=len(test_indexers), - execution_time=1, - transaction_links=test_transaction_links, - batch_count=2, - ) - - logger.info(f"Success notification: {'PASSED' if success else 'FAILED'}") - return success + try: + test_indexers = [ + "0x1234567890abcdef1234567890abcdef12345678", + "0xabcdef1234567890abcdef1234567890abcdef12", + "0x9876543210fedcba9876543210fedcba98765432", + ] + test_transaction_links = [ + "https://sepolia.arbiscan.io/tx/0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef12", + "https://sepolia.arbiscan.io/tx/0xabcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890ab", + ] + + notifier.send_success_notification( + eligible_indexers=test_indexers, + total_processed=len(test_indexers), + execution_time=123.45, + transaction_links=test_transaction_links, + batch_count=len(test_transaction_links), + ) + logger.info("Success notification: PASSED") + return True + except Exception as e: + logger.error(f"Success notification: FAILED - {e}") + return False def test_failure_notification(notifier: SlackNotifier) -> bool: """ - Test sending a failure notification to Slack. + Test sending a failure notification. Args: - notifier: Configured SlackNotifier instance + notifier: A configured SlackNotifier instance. Returns: - True if test passes, False otherwise + True if the test passes, False otherwise. """ - # Send test failure notification with sample error and partial transaction info logger.info("Testing failure notification...") - - partial_transactions = [ - "https://sepolia.arbiscan.io/tx/0x1111111111111111111111111111111111111111111111111111111111111111", - ] - - success = notifier.send_failure_notification( - error_message="Test error message for verification", - stage="Test Blockchain Submission", - execution_time=1, - partial_transaction_links=partial_transactions, - indexers_processed=150, - ) - - logger.info(f"Failure notification: {'PASSED' if success else 'FAILED'}") - return success + try: + partial_transactions = [ + "https://sepolia.arbiscan.io/tx/0x1111111111111111111111111111111111111111111111111111111111111111" + ] + + notifier.send_failure_notification( + error_message="This is a test error to verify failure notifications. Everything is fine.", + stage="Test Blockchain Submission", + execution_time=1, + partial_transaction_links=partial_transactions, + indexers_processed=150, + ) + logger.info("Failure notification: PASSED") + return True + except Exception as e: + logger.error(f"Failure notification: FAILED - {e}") + return False def run_all_tests() -> bool: """ - Run all tests and return True if all tests pass, False otherwise. + Run all Slack notification tests and report the results. Returns: - True if all tests pass, False otherwise + True if all tests pass, False otherwise. """ - # Get the Slack webhook URL from the environment variable webhook_url = os.environ.get("SLACK_WEBHOOK_URL") if not webhook_url: - logger.error("SLACK_WEBHOOK_URL environment variable not set") + logger.error("SLACK_WEBHOOK_URL environment variable not set. Cannot run tests.") return False - # Create a Slack notifier instance using the webhook URL notifier = create_slack_notifier(webhook_url) if not notifier: - logger.error("Failed to create Slack notifier") + logger.error("Failed to create Slack notifier. Check webhook URL or network.") return False - # Define the list of tests to run - tests = [ - test_info_notification, - test_success_notification, - test_failure_notification, - ] + logger.info("Starting Slack Notification Tests ---") + + tests_to_run = { + "Info Notification": test_info_notification, + "Success Notification": test_success_notification, + "Failure Notification": test_failure_notification, + } - # Run each test and return False if any test fails - for test in tests: - if not test(notifier): - return False + results = {} + for name, test_func in tests_to_run.items(): + results[name] = test_func(notifier) - # If all tests pass, return True - return True + logger.info("--- Test Results Summary ---") + all_passed = True + for name, result in results.items(): + status = "PASSED" if result else "FAILED" + logger.info(f"- {name}: {status}") + if not result: + all_passed = False + logger.info("----------------------------") + + return all_passed def main(): - """ - Main function to orchestrate Slack notification testing. - """ - # Display test header information - logger.info("Service Quality Oracle - Slack Notification Test") + """Main entry point for the Slack notification test script.""" + logger.info("===== Service Quality Oracle - Slack Notification Test Script =====") if run_all_tests(): logger.info("All tests completed successfully!") - logger.info("Check Slack channel to verify notifications were received.") + logger.info("Please check the Slack channel to verify notifications were received correctly.") sys.exit(0) - else: - logger.error("Some tests failed!") + logger.error("Some notification tests failed. Please review the logs above.") sys.exit(1) diff --git a/src/models/blockchain_client.py b/src/models/blockchain_client.py index d760e21..3c47e19 100644 --- a/src/models/blockchain_client.py +++ b/src/models/blockchain_client.py @@ -13,6 +13,7 @@ from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Tuple, cast +from eth_account.datastructures import SignedTransaction from requests.exceptions import ConnectionError, HTTPError, Timeout from web3 import Web3 from web3.contract import Contract @@ -174,9 +175,12 @@ def do_call(): return do_call() - # If we get an exception, log the error and switch to the next RPC provider + # If we get an exception after all retries, log the error and switch to the next RPC provider except RPC_FAILOVER_EXCEPTIONS as e: - logger.warning(f"RPC call failed with provider at index {self.current_rpc_index}: {e}") + current_provider = self.rpc_providers[self.current_rpc_index] + logger.warning( + f"RPC call failed with provider at index {self.current_rpc_index} ({current_provider}): {e}" + ) self._get_next_rpc_provider() # If we have tried all RPC providers, log the error and raise an exception @@ -400,7 +404,7 @@ def _build_and_sign_transaction( raise - def _send_signed_transaction(self, signed_tx: Any) -> str: + def _send_signed_transaction(self, signed_tx: SignedTransaction) -> str: """ Send a signed transaction and wait for the receipt. @@ -412,22 +416,23 @@ def _send_signed_transaction(self, signed_tx: Any) -> str: """ # Try to send the transaction and wait for the receipt try: - tx_hash = self._execute_rpc_call(self.w3.eth.send_raw_transaction, signed_tx.rawTransaction) - tx_hash_hex = tx_hash.hex() - logger.info(f"Transaction sent with hash: {tx_hash_hex}") + # Send the signed transaction + tx_hash = self._execute_rpc_call(self.w3.eth.send_raw_transaction, signed_tx.raw_transaction) + logger.info(f"Transaction sent with hash: 0x{tx_hash.hex()}") + # Wait for the transaction receipt receipt = self._execute_rpc_call( self.w3.eth.wait_for_transaction_receipt, tx_hash, self.tx_timeout_seconds ) # If the transaction was successful, log the success and return the hash if receipt["status"] == 1: - logger.info(f"Transaction successful: {self.block_explorer_url}/tx/{tx_hash_hex}") - return tx_hash_hex + logger.info(f"Transaction successful: {self.block_explorer_url}/tx/0x{tx_hash.hex()}") + return tx_hash.hex() # If the transaction failed, handle the error else: - error_msg = f"Transaction failed: {self.block_explorer_url}/tx/{tx_hash_hex}" + error_msg = f"Transaction failed: {self.block_explorer_url}/tx/0x{tx_hash.hex()}" logger.error(error_msg) raise Exception(error_msg) @@ -435,7 +440,7 @@ def _send_signed_transaction(self, signed_tx: Any) -> str: except Exception as e: error_msg = f"Error sending transaction or waiting for receipt: {str(e)}" logger.error(error_msg) - raise Exception(error_msg) from e + raise Exception(error_msg) # This part should be unreachable, but it's here for safety. raise Exception("Transaction failed for an unknown reason.") @@ -578,7 +583,7 @@ def batch_allow_indexers_issuance_eligibility( batch_size: int, replace: bool = False, data_bytes: bytes = b"", - ) -> List[str]: + ) -> tuple[List[str], str]: """ Batches indexer addresses and sends multiple transactions for issuance eligibility. @@ -596,12 +601,15 @@ def batch_allow_indexers_issuance_eligibility( data_bytes: Additional data for the transaction. Returns: - A list of transaction hashes for all the batches sent. + A tuple containing: + - A list of transaction hashes for all the batches sent + - The RPC provider URL that was used for the transactions """ # Ensure there are indexer addresses to process if not indexer_addresses: logger.warning("No indexer addresses provided.") - return [] + current_rpc_provider = self.rpc_providers[self.current_rpc_index] + return [], current_rpc_provider logger.info( f"Starting batch transaction for {len(indexer_addresses)} indexers, with batch size {batch_size}." @@ -623,6 +631,7 @@ def batch_allow_indexers_issuance_eligibility( replace, data_bytes, ) + tx_hash = "0x" + tx_hash transaction_hashes.append(tx_hash) logger.info(f"Successfully sent batch {i // batch_size + 1}, tx_hash: {tx_hash}") @@ -631,4 +640,6 @@ def batch_allow_indexers_issuance_eligibility( logger.error(f"Failed to send batch {i // batch_size + 1}. Halting batch processing. Error: {e}") raise - return transaction_hashes + # Return transaction hashes and the current RPC provider used + current_rpc_provider = self.rpc_providers[self.current_rpc_index] + return transaction_hashes, current_rpc_provider diff --git a/src/models/service_quality_oracle.py b/src/models/service_quality_oracle.py index 3e5edc2..2b8d799 100644 --- a/src/models/service_quality_oracle.py +++ b/src/models/service_quality_oracle.py @@ -17,6 +17,7 @@ from src.models.bigquery_provider import BigQueryProvider from src.models.blockchain_client import BlockchainClient from src.models.eligibility_pipeline import EligibilityPipeline +from src.utils.circuit_breaker import CircuitBreaker from src.utils.configuration import ( credential_manager, load_config, @@ -45,6 +46,18 @@ def main(run_date_override: date = None): project_root_path = Path(__file__).resolve().parents[2] slack_notifier = None + # --- Circuit Breaker Initialization and Check --- + circuit_breaker_log = project_root_path / "data" / "circuit_breaker.log" + circuit_breaker = CircuitBreaker( + failure_threshold=3, + window_minutes=60, + log_file=circuit_breaker_log, + ) + + # If circuit_breaker.check returns False, exit cleanly (code 0) to prevent Docker container restart. + if not circuit_breaker.check(): + sys.exit(0) + try: # Configuration and credentials config = load_config() @@ -105,7 +118,7 @@ def main(run_date_override: date = None): tx_timeout_seconds=config["TX_TIMEOUT_SECONDS"], slack_notifier=slack_notifier, ) - transaction_links = blockchain_client.batch_allow_indexers_issuance_eligibility( + transaction_links, rpc_provider_used = blockchain_client.batch_allow_indexers_issuance_eligibility( indexer_addresses=eligible_indexers, private_key=config["PRIVATE_KEY"], chain_id=config["BLOCKCHAIN_CHAIN_ID"], @@ -118,6 +131,9 @@ def main(run_date_override: date = None): execution_time = time.time() - start_time logger.info(f"Oracle run completed successfully in {execution_time:.2f} seconds") + # On a fully successful run, reset the circuit breaker. + circuit_breaker.reset() + if slack_notifier: try: batch_count = len(transaction_links) if transaction_links else 0 @@ -128,11 +144,15 @@ def main(run_date_override: date = None): execution_time=execution_time, transaction_links=transaction_links, batch_count=batch_count, + rpc_provider_used=rpc_provider_used, ) except Exception as e: logger.error(f"Failed to send Slack success notification: {e}", exc_info=True) except Exception as e: + # A failure occurred; record it with the circuit breaker. + circuit_breaker.record_failure() + execution_time = time.time() - start_time error_msg = f"Oracle failed at stage '{stage}': {str(e)}" logger.error(error_msg, exc_info=True) diff --git a/src/utils/circuit_breaker.py b/src/utils/circuit_breaker.py new file mode 100644 index 0000000..35e6e09 --- /dev/null +++ b/src/utils/circuit_breaker.py @@ -0,0 +1,128 @@ +""" +Application circuit breaker utility to prevent infinite restart loops. +""" + +import logging +from datetime import datetime, timedelta +from pathlib import Path +from typing import List + +logger = logging.getLogger(__name__) + + +class CircuitBreaker: + """ + A simple circuit breaker to prevent an application from restarting indefinitely + due to persistent, deterministic errors. + + It works by logging failure timestamps to a file. On startup, it checks how + many failures have occurred within a given time window. If the count exceeds a + threshold, it "opens" the circuit, signaling the application to halt. + """ + + def __init__(self, failure_threshold: int, window_minutes: int, log_file: Path): + """ + Initialize the circuit breaker. + + Args: + failure_threshold: The number of failures required to open the circuit. + window_minutes: The time window in minutes to check for failures. + log_file: The path to the file used for persisting failure timestamps. + """ + self.failure_threshold = failure_threshold + self.window_minutes = window_minutes + self.log_file = log_file + + + def _get_failure_timestamps(self) -> List[datetime]: + """ + Reads and parses all timestamps from the log file. + + Returns: + List[datetime]: A list of datetime objects representing the failure timestamps. + """ + # If the log file does not exist, return an empty list + if not self.log_file.exists(): + return [] + + # If the log file exists, read and parse all timestamps + try: + with self.log_file.open("r") as f: + timestamps = [datetime.fromisoformat(line.strip()) for line in f if line.strip()] + return timestamps + + # If there is an error reading or parsing the log file, log the error and return an empty list + except (IOError, ValueError) as e: + logger.error(f"Error reading or parsing circuit breaker log file {self.log_file}: {e}") + return [] + + + def check(self) -> bool: + """ + Check the state of the circuit. This is used to determine if the application should proceed or halt. + + Returns: + bool: True if the circuit is closed (safe to proceed), False if it is open (should halt). + """ + # Get the failure timestamps from the log file + timestamps = self._get_failure_timestamps() + + # If there are no failure timestamps, return True + if not timestamps: + return True + + # Calculate the window start time and get the recent failures + window_start = datetime.now() - timedelta(minutes=self.window_minutes) + recent_failures = [ts for ts in timestamps if ts > window_start] + + # If the number of recent failures is greater than or equal to the failure threshold, return False + if len(recent_failures) >= self.failure_threshold: + logger.critical( + f"CIRCUIT BREAKER OPEN: Found {len(recent_failures)} failures in the last " + f"{self.window_minutes} minutes (threshold is {self.failure_threshold}). Halting execution." + ) + return False + + # If the number of recent failures is less than the failure threshold, return True + logger.info("Circuit breaker is closed. Proceeding with execution.") + return True + + + def record_failure(self) -> None: + """Records a failure by appending the current timestamp to the log file. + + Returns: + None + """ + # Try to log the failure + try: + # Create the parent directory if it doesn't exist + self.log_file.parent.mkdir(parents=True, exist_ok=True) + + # Append the current timestamp to the log file + with self.log_file.open("a") as f: + f.write(f"{datetime.now().isoformat()}\n") + + # Log the success + logger.warning("Circuit breaker has recorded a failure.") + + # If there is an error appending the timestamp to the log file, log the error + except IOError as e: + logger.error(f"Failed to record failure to circuit breaker log {self.log_file}: {e}") + + + def reset(self) -> None: + """Resets the circuit by deleting the log file on a successful run. + + Returns: + None + """ + # If the log file exists, delete it + if self.log_file.exists(): + try: + self.log_file.unlink() + logger.info("Circuit breaker has been reset after a successful run.") + + # If there is an error deleting the log file, log the error + except IOError as e: + logger.error(f"Failed to reset circuit breaker log {self.log_file}: {e}") diff --git a/src/utils/slack_notifier.py b/src/utils/slack_notifier.py index 6ac8d0c..4ba63c6 100644 --- a/src/utils/slack_notifier.py +++ b/src/utils/slack_notifier.py @@ -90,6 +90,7 @@ def send_success_notification( execution_time: Optional[float] = None, transaction_links: Optional[List[str]] = None, batch_count: Optional[int] = None, + rpc_provider_used: Optional[str] = None, ) -> bool: """ Send a success notification to Slack. @@ -100,6 +101,7 @@ def send_success_notification( execution_time: Execution time in seconds (optional) transaction_links: List of blockchain transaction links (optional) batch_count: Number of transaction batches sent (optional) + rpc_provider_used: The RPC provider URL that was used for the successful transaction (optional) Returns: bool: True if notification was sent successfully @@ -122,6 +124,10 @@ def send_success_notification( if batch_count: fields.append({"title": "Transaction Batches", "value": str(batch_count), "short": True}) + # Add RPC provider information if provided + if rpc_provider_used: + fields.append({"title": "RPC Provider", "value": rpc_provider_used, "short": True}) + # Add transaction links if provided if transaction_links: tx_links = "\n".join([f"Batch {i + 1}: {link}" for i, link in enumerate(transaction_links)]) diff --git a/tests/test_bigquery_provider.py b/tests/test_bigquery_provider.py index 16740ed..13f72e8 100644 --- a/tests/test_bigquery_provider.py +++ b/tests/test_bigquery_provider.py @@ -67,9 +67,7 @@ class TestInitialization: """Tests for the __init__ method.""" - def test_initialization_sets_bigquery_options_and_instance_vars( - self, provider: BigQueryProvider, mock_bpd: MagicMock - ): + def test_init_sets_bigquery_options_and_instance_vars(self, provider: BigQueryProvider, mock_bpd: MagicMock): """ Tests that BigQueryProvider initializes correctly, setting BigQuery options and instance variables. """ @@ -98,7 +96,7 @@ def test_get_indexer_eligibility_query_matches_snapshot(self, provider: BigQuery snapshot.assert_match(query, "indexer_eligibility_query.sql") - def test_get_indexer_eligibility_query_with_single_day_range(self, provider: BigQueryProvider): + def test_get_indexer_eligibility_query_handles_single_day_range(self, provider: BigQueryProvider): """ Tests that the query is constructed correctly when start and end dates are the same, covering an edge case for a single-day analysis period. @@ -108,7 +106,7 @@ def test_get_indexer_eligibility_query_with_single_day_range(self, provider: Big assert f"BETWEEN '{SINGLE_DATE.strftime('%Y-%m-%d')}' AND '{SINGLE_DATE.strftime('%Y-%m-%d')}'" in query - def test_get_indexer_eligibility_query_with_invalid_date_range(self, provider: BigQueryProvider): + def test_get_indexer_eligibility_query_handles_invalid_date_range(self, provider: BigQueryProvider): """ Tests that the query is constructed correctly even with a logically invalid date range (start > end), which should result in an empty set from BigQuery @@ -127,7 +125,7 @@ class TestReadGbqDataframe: """Tests for the _read_gbq_dataframe method.""" - def test_read_gbq_dataframe_success( + def test_read_gbq_dataframe_succeeds_on_happy_path( self, mock_sleep: MagicMock, provider: BigQueryProvider, mock_bpd: MagicMock ): """ @@ -148,7 +146,7 @@ def test_read_gbq_dataframe_success( @pytest.mark.parametrize("exception_to_raise", RETRYABLE_EXCEPTIONS) - def test_read_gbq_dataframe_on_retryable_error_succeeds( + def test_read_gbq_dataframe_succeeds_after_retrying_on_error( self, mock_sleep: MagicMock, exception_to_raise: Exception, provider: BigQueryProvider, mock_bpd: MagicMock ): """ @@ -170,7 +168,7 @@ def test_read_gbq_dataframe_on_retryable_error_succeeds( pd.testing.assert_frame_equal(result_df, MOCK_DATAFRAME) - def test_read_gbq_dataframe_on_persistent_error_fails( + def test_read_gbq_dataframe_fails_on_persistent_error( self, mock_sleep: MagicMock, provider: BigQueryProvider, mock_bpd: MagicMock ): """ @@ -191,7 +189,7 @@ def test_read_gbq_dataframe_on_persistent_error_fails( mock_sleep.assert_not_called() - def test_read_gbq_dataframe_on_non_retryable_error_fails_immediately( + def test_read_gbq_dataframe_fails_immediately_on_non_retryable_error( self, mock_sleep: MagicMock, provider: BigQueryProvider, mock_bpd: MagicMock ): """ @@ -214,7 +212,7 @@ class TestFetchIndexerIssuanceEligibilityData: """Tests for the main fetch_indexer_issuance_eligibility_data method.""" - def test_fetch_data_happy_path(self, provider: BigQueryProvider): + def test_fetch_indexer_issuance_eligibility_data_succeeds_on_happy_path(self, provider: BigQueryProvider): """ Tests the happy path for `fetch_indexer_issuance_eligibility_data`, ensuring it orchestrates calls correctly and returns the final DataFrame. @@ -238,7 +236,9 @@ def test_fetch_data_happy_path(self, provider: BigQueryProvider): pd.testing.assert_frame_equal(result_df, MOCK_DATAFRAME) - def test_fetch_data_with_empty_result_returns_empty_dataframe(self, provider: BigQueryProvider): + def test_fetch_indexer_issuance_eligibility_data_returns_empty_dataframe_on_empty_result( + self, provider: BigQueryProvider + ): """ Tests that the method gracefully handles and returns an empty DataFrame from BigQuery. """ @@ -262,7 +262,9 @@ def test_fetch_data_with_empty_result_returns_empty_dataframe(self, provider: Bi pd.testing.assert_frame_equal(result_df, MOCK_EMPTY_DATAFRAME) - def test_fetch_data_on_read_error_propagates_exception(self, provider: BigQueryProvider): + def test_fetch_indexer_issuance_eligibility_data_propagates_exception_on_read_error( + self, provider: BigQueryProvider + ): """ Tests that an exception from `_read_gbq_dataframe` is correctly propagated. """ diff --git a/tests/test_blockchain_client.py b/tests/test_blockchain_client.py index e248df2..c2de70b 100644 --- a/tests/test_blockchain_client.py +++ b/tests/test_blockchain_client.py @@ -85,7 +85,7 @@ class TestInitializationAndConnection: """Tests focusing on the client's initialization and RPC connection logic.""" - def test_successful_initialization(self, blockchain_client: BlockchainClient, mock_w3, mock_file): + def test_init_succeeds_on_happy_path(self, blockchain_client: BlockchainClient, mock_w3, mock_file): """ Tests that the BlockchainClient initializes correctly on the happy path. """ @@ -109,7 +109,7 @@ def test_successful_initialization(self, blockchain_client: BlockchainClient, mo assert client.contract is not None - def test_initialization_fails_if_abi_not_found(self, mock_w3, mock_slack): + def test_init_fails_if_abi_not_found(self, mock_w3, mock_slack): """ Tests that BlockchainClient raises an exception if the ABI file cannot be found. """ @@ -127,7 +127,7 @@ def test_initialization_fails_if_abi_not_found(self, mock_w3, mock_slack): ) - def test_rpc_failover_mechanism(self, mock_w3, mock_slack): + def test_init_failover_succeeds_if_primary_rpc_fails(self, mock_w3, mock_slack): """ Tests that the client successfully fails over to a secondary RPC if the primary fails. """ @@ -159,7 +159,7 @@ def test_rpc_failover_mechanism(self, mock_w3, mock_slack): assert client.current_rpc_index == 1 - def test_connection_error_if_all_rpcs_fail(self, mock_w3, mock_slack): + def test_init_fails_if_all_rpcs_fail(self, mock_w3, mock_slack): """ Tests that a ConnectionError is raised if the client cannot connect to any RPC provider. """ @@ -184,7 +184,7 @@ def test_connection_error_if_all_rpcs_fail(self, mock_w3, mock_slack): ) - def test_execute_rpc_call_with_failover(self, blockchain_client: BlockchainClient): + def test_execute_rpc_call_failover_succeeds_on_connection_error(self, blockchain_client: BlockchainClient): """ Tests that _execute_rpc_call fails over to the next provider if the first one is unreachable, and sends a Slack notification. @@ -216,7 +216,7 @@ def test_execute_rpc_call_with_failover(self, blockchain_client: BlockchainClien assert "Switching from previous RPC" in call_kwargs["message"] - def test_execute_rpc_call_reraises_unexpected_exceptions(self, blockchain_client: BlockchainClient): + def test_execute_rpc_call_reraises_unexpected_exception(self, blockchain_client: BlockchainClient): """ Tests that _execute_rpc_call does not attempt to failover on unexpected, non-network errors and instead re-raises them immediately. @@ -234,7 +234,7 @@ def test_execute_rpc_call_reraises_unexpected_exceptions(self, blockchain_client blockchain_client.slack_notifier.send_info_notification.assert_not_called() - def test_initialization_fails_with_empty_rpc_provider_list(self, mock_w3, mock_slack): + def test_init_fails_with_empty_rpc_list(self, mock_w3, mock_slack): """ Tests that BlockchainClient raises an exception if initialized with an empty list of RPC providers. """ @@ -256,7 +256,7 @@ class TestTransactionLogic: """Tests focusing on the helper methods for building and sending a transaction.""" - def test_setup_transaction_account_success(self, blockchain_client: BlockchainClient): + def test_setup_transaction_account_succeeds_with_valid_key(self, blockchain_client: BlockchainClient): """ Tests that _setup_transaction_account returns the correct address and formatted key for a valid private key. @@ -272,9 +272,7 @@ def test_setup_transaction_account_success(self, blockchain_client: BlockchainCl assert key == MOCK_PRIVATE_KEY - def test_setup_transaction_account_invalid_key_raises_key_validation_error( - self, blockchain_client: BlockchainClient - ): + def test_setup_transaction_account_fails_with_invalid_key(self, blockchain_client: BlockchainClient): """ Tests that _setup_transaction_account raises KeyValidationError for an invalid key. """ @@ -285,7 +283,7 @@ def test_setup_transaction_account_invalid_key_raises_key_validation_error( blockchain_client._setup_transaction_account("invalid-key") - def test_setup_transaction_account_unexpected_error(self, blockchain_client: BlockchainClient): + def test_setup_transaction_account_fails_on_unexpected_error(self, blockchain_client: BlockchainClient): """ Tests that _setup_transaction_account raises a generic exception for unexpected errors. """ @@ -296,7 +294,7 @@ def test_setup_transaction_account_unexpected_error(self, blockchain_client: Blo blockchain_client._setup_transaction_account("any-key") - def test_estimate_transaction_gas_success(self, blockchain_client: BlockchainClient): + def test_estimate_transaction_gas_succeeds_and_adds_buffer(self, blockchain_client: BlockchainClient): """ Tests that _estimate_transaction_gas correctly estimates gas and adds a 25% buffer. """ @@ -318,7 +316,7 @@ def test_estimate_transaction_gas_success(self, blockchain_client: BlockchainCli mock_contract_func.return_value.estimate_gas.assert_called_once_with({"from": MOCK_SENDER_ADDRESS}) - def test_estimate_transaction_gas_failure(self, blockchain_client: BlockchainClient): + def test_estimate_transaction_gas_fails_on_rpc_error(self, blockchain_client: BlockchainClient): """ Tests that _estimate_transaction_gas raises an exception if the RPC call fails. """ @@ -336,7 +334,7 @@ def test_estimate_transaction_gas_failure(self, blockchain_client: BlockchainCli ) - def test_determine_transaction_nonce_new(self, blockchain_client: BlockchainClient): + def test_determine_transaction_nonce_fetches_next_nonce_for_new_tx(self, blockchain_client: BlockchainClient): """ Tests that the next available nonce is fetched for a new transaction (replace=False). """ @@ -352,7 +350,9 @@ def test_determine_transaction_nonce_new(self, blockchain_client: BlockchainClie blockchain_client.mock_w3_instance.eth.get_transaction_count.assert_called_once_with(MOCK_SENDER_ADDRESS) - def test_determine_transaction_nonce_replace(self, blockchain_client: BlockchainClient): + def test_determine_transaction_nonce_uses_oldest_pending_for_replacement( + self, blockchain_client: BlockchainClient + ): """ Tests that the nonce of the oldest pending transaction is used for replacement (replace=True). """ @@ -375,7 +375,7 @@ def test_determine_transaction_nonce_replace(self, blockchain_client: Blockchain blockchain_client.mock_w3_instance.eth.get_block.assert_called_once_with("pending", full_transactions=True) - def test_determine_transaction_nonce_replace_no_pending_tx_with_nonce_gap( + def test_determine_transaction_nonce_falls_back_to_latest_on_nonce_gap( self, blockchain_client: BlockchainClient ): """ @@ -397,7 +397,7 @@ def test_determine_transaction_nonce_replace_no_pending_tx_with_nonce_gap( assert blockchain_client.mock_w3_instance.eth.get_transaction_count.call_count == 2 - def test_determine_transaction_nonce_replace_no_pending_tx_no_gap_fallback( + def test_determine_transaction_nonce_falls_back_to_standard_if_no_pending_or_gap( self, blockchain_client: BlockchainClient ): """ @@ -421,7 +421,7 @@ def test_determine_transaction_nonce_replace_no_pending_tx_no_gap_fallback( assert w3_instance.eth.get_transaction_count.call_count == 3 - def test_determine_transaction_nonce_replace_handles_errors(self, blockchain_client: BlockchainClient): + def test_determine_transaction_nonce_falls_back_on_error(self, blockchain_client: BlockchainClient): """ Tests that nonce determination falls back gracefully if checking for pending transactions fails. @@ -440,7 +440,7 @@ def test_determine_transaction_nonce_replace_handles_errors(self, blockchain_cli w3_instance.eth.get_transaction_count.assert_called() - def test_get_gas_prices_success(self, blockchain_client: BlockchainClient): + def test_get_gas_prices_succeeds_on_happy_path(self, blockchain_client: BlockchainClient): """ Tests that _get_gas_prices successfully fetches and returns the base and priority fees. """ @@ -459,7 +459,7 @@ def test_get_gas_prices_success(self, blockchain_client: BlockchainClient): assert max_priority_fee == mock_priority_fee - def test_get_gas_prices_fallback_on_base_fee_error(self, blockchain_client: BlockchainClient): + def test_get_gas_prices_falls_back_on_base_fee_error(self, blockchain_client: BlockchainClient): """ Tests that _get_gas_prices falls back to a default base fee if the RPC call fails. """ @@ -475,7 +475,7 @@ def test_get_gas_prices_fallback_on_base_fee_error(self, blockchain_client: Bloc blockchain_client.mock_w3_instance.to_wei.assert_called_once_with(10, "gwei") - def test_get_gas_prices_fallback_on_priority_fee_error(self, blockchain_client: BlockchainClient): + def test_get_gas_prices_falls_back_on_priority_fee_error(self, blockchain_client: BlockchainClient): """ Tests that _get_gas_prices falls back to a default priority fee if the RPC call fails. """ @@ -500,7 +500,7 @@ def test_get_gas_prices_fallback_on_priority_fee_error(self, blockchain_client: (True, 420, 20), # Replacement: base*4 + priority*2 ], ) - def test_build_transaction_params( + def test_build_transaction_params_builds_correctly( self, replace, expected_max_fee, @@ -528,7 +528,7 @@ def test_build_transaction_params( assert tx_params["nonce"] == 1 - def test_build_and_sign_transaction_success(self, blockchain_client: BlockchainClient): + def test_build_and_sign_transaction_succeeds_on_happy_path(self, blockchain_client: BlockchainClient): """ Tests that _build_and_sign_transaction successfully builds and signs a transaction. """ @@ -557,7 +557,7 @@ def test_build_and_sign_transaction_success(self, blockchain_client: BlockchainC ) - def test_build_and_sign_transaction_failure(self, blockchain_client: BlockchainClient): + def test_build_and_sign_transaction_fails_on_build_error(self, blockchain_client: BlockchainClient): """ Tests that _build_and_sign_transaction raises an exception if building fails. """ @@ -576,31 +576,35 @@ def test_build_and_sign_transaction_failure(self, blockchain_client: BlockchainC ) - def test_send_signed_transaction_success(self, blockchain_client: BlockchainClient): + def test_send_signed_transaction_succeeds_on_happy_path(self, blockchain_client: BlockchainClient): """ - Tests that a signed transaction is sent and its hash is returned on success. + Tests that _send_signed_transaction successfully sends a transaction + and waits for a successful receipt. """ # Arrange - mock_signed_tx = MagicMock() - mock_signed_tx.rawTransaction = b"raw_tx_bytes" mock_tx_hash = b"tx_hash" - blockchain_client.mock_w3_instance.eth.send_raw_transaction.return_value = mock_tx_hash - blockchain_client.mock_w3_instance.eth.wait_for_transaction_receipt.return_value = {"status": 1} + blockchain_client.w3.eth.send_raw_transaction.return_value = mock_tx_hash + blockchain_client.w3.eth.wait_for_transaction_receipt.return_value = {"status": 1} + + # Mock the SignedTransaction object with the .raw_transaction property + mock_signed_tx = MagicMock() + type(mock_signed_tx).raw_transaction = PropertyMock(return_value=b"raw_tx_bytes") # Act - tx_hash_hex = blockchain_client._send_signed_transaction(mock_signed_tx) + tx_hash = blockchain_client._send_signed_transaction(mock_signed_tx) # Assert - assert tx_hash_hex == mock_tx_hash.hex() - blockchain_client.mock_w3_instance.eth.send_raw_transaction.assert_called_once_with( - mock_signed_tx.rawTransaction - ) - blockchain_client.mock_w3_instance.eth.wait_for_transaction_receipt.assert_called_once_with( + # Check that send_raw_transaction was called with the correct bytes + blockchain_client.w3.eth.send_raw_transaction.assert_called_once_with(b"raw_tx_bytes") + # Check that wait_for_transaction_receipt was called with the returned hash + blockchain_client.w3.eth.wait_for_transaction_receipt.assert_called_once_with( mock_tx_hash, MOCK_TX_TIMEOUT_SECONDS ) + # Check that the final hash is correct + assert tx_hash == mock_tx_hash.hex() - def test_send_signed_transaction_raises_exception_when_reverted(self, blockchain_client: BlockchainClient): + def test_send_signed_transaction_fails_if_reverted(self, blockchain_client: BlockchainClient): """ Tests that an exception is raised if the transaction is reverted on-chain. """ @@ -614,12 +618,12 @@ def test_send_signed_transaction_raises_exception_when_reverted(self, blockchain # Act & Assert with pytest.raises( - Exception, match=f"Transaction failed: {MOCK_BLOCK_EXPLORER_URL}/tx/{mock_tx_hash.hex()}" + Exception, match=f"Transaction failed: {MOCK_BLOCK_EXPLORER_URL}/tx/0x{mock_tx_hash.hex()}" ): blockchain_client._send_signed_transaction(mock_signed_tx) - def test_send_signed_transaction_raises_exception_on_timeout(self, blockchain_client: BlockchainClient): + def test_send_signed_transaction_fails_on_timeout(self, blockchain_client: BlockchainClient): """ Tests that an exception is raised if waiting for the transaction receipt times out. """ @@ -681,7 +685,7 @@ class TestOrchestrationAndBatching: """Tests focusing on the end-to-end orchestration and batch processing logic.""" - def test_execute_complete_transaction_happy_path( + def test_execute_complete_transaction_succeeds_on_happy_path( self, blockchain_client: BlockchainClient, mocker: MockerFixture, @@ -726,7 +730,7 @@ def test_execute_complete_transaction_happy_path( mock_full_transaction_flow["send"].assert_called_once_with("signed_tx") - def test_execute_complete_transaction_missing_params(self, blockchain_client: BlockchainClient): + def test_execute_complete_transaction_fails_on_missing_params(self, blockchain_client: BlockchainClient): """ Tests that _execute_complete_transaction raises ValueError if required parameters are missing. """ @@ -738,7 +742,7 @@ def test_execute_complete_transaction_missing_params(self, blockchain_client: Bl blockchain_client._execute_complete_transaction(incomplete_params) - def test_execute_complete_transaction_invalid_function(self, blockchain_client: BlockchainClient): + def test_execute_complete_transaction_fails_on_invalid_function(self, blockchain_client: BlockchainClient): """ Tests that _execute_complete_transaction raises ValueError for a non-existent contract function. """ @@ -761,7 +765,7 @@ def test_execute_complete_transaction_invalid_function(self, blockchain_client: blockchain_client._execute_complete_transaction(params) - def test_send_transaction_to_allow_indexers_orchestration( + def test_send_transaction_to_allow_indexers_calls_execution_method( self, blockchain_client: BlockchainClient, mocker: MockerFixture ): """ @@ -792,7 +796,7 @@ def test_send_transaction_to_allow_indexers_orchestration( assert call_args["replace"] is False - def test_batch_processing_splits_correctly(self, blockchain_client: BlockchainClient): + def test_batch_allow_indexers_splits_batches_correctly(self, blockchain_client: BlockchainClient): """ Tests that the batch processing logic correctly splits a list of addresses into multiple transactions based on batch size. @@ -804,7 +808,7 @@ def test_batch_processing_splits_correctly(self, blockchain_client: BlockchainCl # Act # Use a batch size of 2, which should result in 3 calls (2, 2, 1) - tx_hashes = blockchain_client.batch_allow_indexers_issuance_eligibility( + tx_hashes, rpc_provider = blockchain_client.batch_allow_indexers_issuance_eligibility( indexer_addresses=addresses, private_key=MOCK_PRIVATE_KEY, chain_id=1, @@ -814,6 +818,7 @@ def test_batch_processing_splits_correctly(self, blockchain_client: BlockchainCl # Assert assert len(tx_hashes) == 3 + assert rpc_provider in blockchain_client.rpc_providers assert blockchain_client.send_transaction_to_allow_indexers.call_count == 3 # Check the contents of each call @@ -822,7 +827,7 @@ def test_batch_processing_splits_correctly(self, blockchain_client: BlockchainCl assert blockchain_client.send_transaction_to_allow_indexers.call_args_list[2][0][0] == addresses[4:5] - def test_batch_processing_halts_on_failure(self, blockchain_client: BlockchainClient): + def test_batch_allow_indexers_halts_on_failure(self, blockchain_client: BlockchainClient): """ Tests that the batch processing halts immediately if one of the transactions fails. """ @@ -849,7 +854,7 @@ def test_batch_processing_halts_on_failure(self, blockchain_client: BlockchainCl assert blockchain_client.send_transaction_to_allow_indexers.call_count == 2 - def test_batch_processing_handles_empty_list(self, blockchain_client: BlockchainClient): + def test_batch_allow_indexers_handles_empty_list(self, blockchain_client: BlockchainClient): """ Tests that batch processing handles an empty list of addresses gracefully. """ @@ -857,7 +862,7 @@ def test_batch_processing_handles_empty_list(self, blockchain_client: Blockchain blockchain_client.send_transaction_to_allow_indexers = MagicMock() # Act - tx_hashes = blockchain_client.batch_allow_indexers_issuance_eligibility( + tx_hashes, rpc_provider = blockchain_client.batch_allow_indexers_issuance_eligibility( indexer_addresses=[], private_key=MOCK_PRIVATE_KEY, chain_id=1, @@ -867,4 +872,5 @@ def test_batch_processing_handles_empty_list(self, blockchain_client: Blockchain # Assert assert tx_hashes == [] + assert rpc_provider in blockchain_client.rpc_providers blockchain_client.send_transaction_to_allow_indexers.assert_not_called() diff --git a/tests/test_circuit_breaker.py b/tests/test_circuit_breaker.py new file mode 100644 index 0000000..9a7ca00 --- /dev/null +++ b/tests/test_circuit_breaker.py @@ -0,0 +1,121 @@ +""" +Unit tests for the CircuitBreaker utility. +""" + +from datetime import datetime, timedelta +from unittest.mock import MagicMock, mock_open, patch + +import pytest + +from src.utils.circuit_breaker import CircuitBreaker + + +@pytest.fixture +def mock_path(): + """Fixture to mock the Path object for file system interactions.""" + with patch("src.utils.circuit_breaker.Path"): + mock_instance = MagicMock() + mock_instance.exists.return_value = False + mock_instance.open = mock_open() + yield mock_instance + + +@pytest.fixture +def breaker(mock_path: MagicMock) -> CircuitBreaker: + """Provides a CircuitBreaker instance with a mocked log file path.""" + return CircuitBreaker(failure_threshold=3, window_minutes=60, log_file=mock_path) + + +def test_check_returns_true_when_log_file_does_not_exist(breaker: CircuitBreaker, mock_path: MagicMock): + """ + GIVEN no circuit breaker log file exists + WHEN check() is called + THEN it should return True, allowing execution. + """ + mock_path.exists.return_value = False + assert breaker.check() is True + + +def test_check_returns_true_when_failures_are_below_threshold(breaker: CircuitBreaker, mock_path: MagicMock): + """ + GIVEN a log file with fewer failures than the threshold + WHEN check() is called + THEN it should return True. + """ + now = datetime.now() + timestamps = [(now - timedelta(minutes=i)).isoformat() for i in range(2)] # 2 failures + mock_path.exists.return_value = True + mock_path.open.return_value.__enter__.return_value.readlines.return_value = [f"{ts}\n" for ts in timestamps] + + assert breaker.check() is True + + +def test_check_returns_false_when_failures_meet_threshold(breaker: CircuitBreaker, mock_path: MagicMock): + """ + GIVEN a log file with failures meeting the threshold + WHEN check() is called + THEN it should return False, halting execution. + """ + now = datetime.now() + timestamps = [(now - timedelta(minutes=i)).isoformat() for i in range(3)] # 3 failures + mock_path.exists.return_value = True + mock_path.open.return_value.__enter__.return_value = mock_open(read_data="\n".join(timestamps)).return_value + + assert breaker.check() is False + + +def test_check_ignores_old_failures(breaker: CircuitBreaker, mock_path: MagicMock): + """ + GIVEN a log file with old and recent failures + WHEN check() is called + THEN it should only count recent failures and return True. + """ + now = datetime.now() + timestamps = [ + (now - timedelta(minutes=10)).isoformat(), # Recent + (now - timedelta(minutes=20)).isoformat(), # Recent + (now - timedelta(minutes=70)).isoformat(), # Old + (now - timedelta(minutes=80)).isoformat(), # Old + ] + mock_path.exists.return_value = True + mock_path.open.return_value.__enter__.return_value = mock_open(read_data="\n".join(timestamps)).return_value + + assert breaker.check() is True + + +def test_record_failure_appends_timestamp_to_log(breaker: CircuitBreaker, mock_path: MagicMock): + """ + GIVEN a circuit breaker + WHEN record_failure() is called + THEN it should create the parent directory and append a timestamp to the log file. + """ + breaker.record_failure() + + mock_path.parent.mkdir.assert_called_once_with(parents=True, exist_ok=True) + mock_path.open.assert_called_once_with("a") + # Check that something was written, without being too specific about the exact timestamp + handle = mock_path.open() + handle.write.assert_called_once() + assert len(handle.write.call_args[0][0]) > 10 + + +def test_reset_deletes_log_file(breaker: CircuitBreaker, mock_path: MagicMock): + """ + GIVEN a circuit breaker log file exists + WHEN reset() is called + THEN it should delete the log file. + """ + mock_path.exists.return_value = True + breaker.reset() + mock_path.unlink.assert_called_once() + + +def test_reset_does_nothing_if_log_file_does_not_exist(breaker: CircuitBreaker, mock_path: MagicMock): + """ + GIVEN no circuit breaker log file exists + WHEN reset() is called + THEN it should not attempt to delete anything. + """ + mock_path.exists.return_value = False + breaker.reset() + mock_path.unlink.assert_not_called() diff --git a/tests/test_configuration.py b/tests/test_configuration.py index 115a140..1233e74 100644 --- a/tests/test_configuration.py +++ b/tests/test_configuration.py @@ -152,7 +152,7 @@ class TestConfigLoader: """Tests for the ConfigLoader class.""" - def test_successful_loading_and_substitution(self, temp_config_file: str, mock_env): + def test_load_config_succeeds_with_env_var_substitution(self, temp_config_file: str, mock_env): """ GIVEN a valid config file and set environment variables WHEN the config is loaded @@ -173,7 +173,7 @@ def test_successful_loading_and_substitution(self, temp_config_file: str, mock_e assert config["MIN_ONLINE_DAYS"] == 5 # Should be converted to int - def test_optional_integer_fields_default_to_none(self, temp_config_file: str, mock_env): + def test_load_config_defaults_optional_integers_to_none(self, temp_config_file: str, mock_env): """ GIVEN a config file where optional integer fields are missing WHEN the config is loaded @@ -191,7 +191,7 @@ def test_optional_integer_fields_default_to_none(self, temp_config_file: str, mo assert config["BATCH_SIZE"] is None - def test_raises_error_if_config_missing(self): + def test_load_config_fails_if_file_missing(self): """ GIVEN an invalid file path WHEN the config is loaded @@ -201,7 +201,7 @@ def test_raises_error_if_config_missing(self): ConfigLoader(config_path="/a/fake/path/config.toml").get_flat_config() - def test_raises_error_if_toml_is_malformed(self, tmp_path: Path): + def test_load_config_fails_if_toml_is_malformed(self, tmp_path: Path): """ GIVEN a malformed TOML file WHEN the config is loaded @@ -216,7 +216,7 @@ def test_raises_error_if_toml_is_malformed(self, tmp_path: Path): ConfigLoader(config_path=str(config_path)).get_flat_config() - def test_raises_error_if_env_var_missing(self, temp_config_file: str): + def test_load_config_fails_if_env_var_is_missing(self, temp_config_file: str): """ GIVEN a config referencing an unset environment variable WHEN the config is loaded @@ -228,7 +228,7 @@ def test_raises_error_if_env_var_missing(self, temp_config_file: str): ConfigLoader(config_path=temp_config_file).get_flat_config() - def test_raises_error_for_invalid_integer_value(self, tmp_path: Path): + def test_load_config_fails_on_invalid_integer(self, tmp_path: Path): """ GIVEN a config with a non-integer value for a numeric field WHEN the config is loaded @@ -244,7 +244,7 @@ def test_raises_error_for_invalid_integer_value(self, tmp_path: Path): loader.get_flat_config() - def test_get_default_config_path_docker(self, monkeypatch): + def test_get_default_config_path_returns_docker_path(self, monkeypatch): """ GIVEN the app is running in a Docker-like environment WHEN the default config path is retrieved @@ -272,7 +272,9 @@ def mock_exists(path_obj): ["src/utils", "src/utils/deep/nested"], ids=["from-nested-dir", "from-deeply-nested-dir"], ) - def test_get_default_config_path_local_dev(self, monkeypatch, tmp_path: Path, start_dir_str: str): + def test_get_default_config_path_finds_root_config_in_local_dev( + self, monkeypatch, tmp_path: Path, start_dir_str: str + ): """ GIVEN the app is in a local dev environment (no /app/config.toml) WHEN the default config path is retrieved from a nested directory @@ -306,7 +308,7 @@ def mock_exists_local(path_obj): assert found_path == str(config_in_root_path) - def test_get_default_config_path_not_found(self, monkeypatch): + def test_get_default_config_path_fails_if_not_found(self, monkeypatch): """ GIVEN that no config.toml exists in the path hierarchy WHEN the default config path is retrieved @@ -321,7 +323,7 @@ def test_get_default_config_path_not_found(self, monkeypatch): ConfigLoader()._get_default_config_path() - def test_get_missing_env_vars(self, monkeypatch, temp_config_file: str): + def test_get_missing_env_vars_returns_missing_vars(self, monkeypatch, temp_config_file: str): """ GIVEN a config file with environment variable placeholders WHEN get_missing_env_vars is called without the env vars set @@ -348,7 +350,7 @@ def test_get_missing_env_vars(self, monkeypatch, temp_config_file: str): (["http://test.com"], ["http://test.com"]), ], ) - def test_parse_rpc_urls(self, rpc_input, expected_output): + def test_parse_rpc_urls_handles_various_formats(self, rpc_input, expected_output): """ GIVEN various RPC URL list formats (including invalid types) WHEN _parse_rpc_urls is called @@ -364,7 +366,7 @@ def test_parse_rpc_urls(self, rpc_input, expected_output): assert result == expected_output - def test_empty_string_for_integer_field_is_none(self, tmp_path: Path): + def test_load_config_parses_empty_integer_as_none(self, tmp_path: Path): """ GIVEN a config with an empty string for a numeric field WHEN the config is loaded @@ -382,7 +384,7 @@ def test_empty_string_for_integer_field_is_none(self, tmp_path: Path): assert config["MIN_ONLINE_DAYS"] is None - def test_null_value_for_integer_field_is_none(self, tmp_path: Path): + def test_load_config_parses_null_integer_as_none(self, tmp_path: Path): """ GIVEN a config with a null value for a numeric field WHEN the config is loaded @@ -404,7 +406,7 @@ class TestConfigValidation: """Tests for config validation logic.""" - def test_validate_config_success(self, full_valid_config: dict): + def test_validate_config_succeeds_on_valid_config(self, full_valid_config: dict): """ GIVEN a complete, valid config dictionary WHEN _validate_config is called @@ -414,7 +416,7 @@ def test_validate_config_success(self, full_valid_config: dict): _validate_config(full_valid_config) # Should not raise - def test_validate_config_handles_zero_values(self, full_valid_config: dict): + def test_validate_config_succeeds_with_zero_values(self, full_valid_config: dict): """ GIVEN a config where a required numeric field is 0 WHEN _validate_config is called @@ -431,7 +433,7 @@ def test_validate_config_handles_zero_values(self, full_valid_config: dict): pytest.fail(f"Validation incorrectly failed for a field with value 0: {e}") - def test_validate_config_missing_required_field(self): + def test_validate_config_fails_on_missing_field(self): """ GIVEN a config dictionary missing required fields WHEN _validate_config is called @@ -445,7 +447,7 @@ def test_validate_config_missing_required_field(self): _validate_config(config) - def test_validate_config_invalid_time_format(self, full_valid_config: dict): + def test_validate_config_fails_on_invalid_time_format(self, full_valid_config: dict): """ GIVEN a config with an invalid SCHEDULED_RUN_TIME format WHEN _validate_config is called @@ -460,7 +462,7 @@ def test_validate_config_invalid_time_format(self, full_valid_config: dict): _validate_config(config) - def test_validate_config_invalid_time_type(self, full_valid_config: dict): + def test_validate_config_fails_on_invalid_time_type(self, full_valid_config: dict): """ GIVEN a config with a non-string value for SCHEDULED_RUN_TIME WHEN _validate_config is called @@ -475,7 +477,7 @@ def test_validate_config_invalid_time_type(self, full_valid_config: dict): _validate_config(config) - def test_validate_all_required_env_vars_success(self, mock_env): + def test_validate_all_required_env_vars_succeeds_when_all_set(self, mock_env): """ GIVEN all required environment variables are set WHEN validate_all_required_env_vars is called @@ -490,7 +492,7 @@ def test_validate_all_required_env_vars_success(self, mock_env): mock_loader.return_value.get_missing_env_vars.assert_called_once() - def test_validate_all_required_env_vars_failure(self): + def test_validate_all_required_env_vars_fails_when_missing(self): """ GIVEN that required environment variables are missing WHEN validate_all_required_env_vars is called @@ -524,7 +526,7 @@ class TestCredentialManager: ("{not valid json}", "Invalid credentials JSON"), ], ) - def test_parse_and_validate_credentials_json_raises_for_invalid(self, creds_json, expected_error_msg): + def test_parse_and_validate_credentials_json_fails_on_invalid_json(self, creds_json, expected_error_msg): """ GIVEN an invalid or incomplete JSON string for credentials WHEN _parse_and_validate_credentials_json is called @@ -547,10 +549,10 @@ def test_parse_and_validate_credentials_json_raises_for_invalid(self, creds_json "Incomplete authorized_user", ), ('{"type": "unsupported"}', "Unsupported credential type"), - ("{not valid json}", "Error processing inline credentials: Invalid credentials JSON"), + ("{not valid json}", "Invalid credentials JSON"), ], ) - def test_setup_google_credentials_raises_for_invalid_json(self, mock_env, creds_json, expected_error_msg): + def test_setup_google_credentials_fails_on_invalid_json(self, mock_env, creds_json, expected_error_msg): """ GIVEN an invalid or incomplete JSON string in the environment variable WHEN setup_google_credentials is called @@ -562,7 +564,7 @@ def test_setup_google_credentials_raises_for_invalid_json(self, mock_env, creds_ manager.setup_google_credentials() - def test_setup_google_credentials_handles_service_account_json( + def test_setup_google_credentials_succeeds_with_service_account_json( self, mock_env, mock_google_auth, mock_service_account_json ): """ @@ -585,9 +587,7 @@ def test_setup_google_credentials_handles_service_account_json( assert call_args[0] == parsed_json - def test_setup_service_account_raises_value_error_on_sdk_failure( - self, mock_env, mock_google_auth, mock_service_account_json - ): + def test_setup_service_account_fails_on_sdk_error(self, mock_env, mock_google_auth, mock_service_account_json): """ GIVEN the Google SDK fails to create credentials from service account info WHEN _setup_service_account_credentials_from_dict is called @@ -605,7 +605,7 @@ def test_setup_service_account_raises_value_error_on_sdk_failure( manager._setup_service_account_credentials_from_dict(creds_data) - def test_setup_google_credentials_handles_authorized_user_json( + def test_setup_google_credentials_succeeds_with_authorized_user_json( self, mock_env, mock_google_auth, mock_auth_user_json ): """ @@ -630,7 +630,7 @@ def test_setup_google_credentials_handles_authorized_user_json( ) - def test_setup_authorized_user_raises_on_sdk_failure(self, mock_env, mock_google_auth, mock_auth_user_json): + def test_setup_authorized_user_propagates_sdk_error(self, mock_env, mock_google_auth, mock_auth_user_json): """ GIVEN the Google SDK fails to create credentials WHEN _setup_user_credentials_from_dict is called @@ -650,7 +650,7 @@ def test_setup_authorized_user_raises_on_sdk_failure(self, mock_env, mock_google @patch("src.utils.configuration.CredentialManager._parse_and_validate_credentials_json") - def test_setup_google_credentials_clears_dictionary_on_success( + def test_setup_google_credentials_clears_creds_dict_on_success( self, mock_parse_and_validate, mock_env, mock_google_auth ): """ @@ -677,7 +677,7 @@ def test_setup_google_credentials_clears_dictionary_on_success( @patch("src.utils.configuration.CredentialManager._parse_and_validate_credentials_json") - def test_setup_google_credentials_clears_dictionary_on_failure( + def test_setup_google_credentials_clears_creds_dict_on_failure( self, mock_parse_and_validate, mock_env, mock_service_account_json ): """ @@ -706,7 +706,7 @@ def test_setup_google_credentials_clears_dictionary_on_failure( mock_data_with_clear.clear.assert_called_once() - def test_setup_google_credentials_handles_invalid_file_path(self, mock_env, caplog): + def test_setup_google_credentials_logs_warning_for_invalid_path(self, mock_env, caplog): """ GIVEN the environment variable points to a file that does not exist WHEN setup_google_credentials is called @@ -723,7 +723,7 @@ def test_setup_google_credentials_handles_invalid_file_path(self, mock_env, capl assert "is not valid JSON or a file path" in caplog.text - def test_setup_google_credentials_not_set(self, mock_env, caplog): + def test_setup_google_credentials_logs_warning_when_not_set(self, mock_env, caplog): """ GIVEN the GOOGLE_APPLICATION_CREDENTIALS environment variable is not set WHEN setup_google_credentials is called @@ -745,7 +745,7 @@ class TestLoadConfig: @patch("src.utils.configuration._validate_config") @patch("src.utils.configuration.ConfigLoader") - def test_load_config_happy_path(self, mock_loader_cls, mock_validate, mock_env): + def test_load_config_orchestrates_loading_and_validation(self, mock_loader_cls, mock_validate, mock_env): """ GIVEN a valid configuration environment WHEN load_config is called diff --git a/tests/test_eligibility_pipeline.py b/tests/test_eligibility_pipeline.py index aed0780..1953812 100644 --- a/tests/test_eligibility_pipeline.py +++ b/tests/test_eligibility_pipeline.py @@ -151,7 +151,7 @@ def _assert_output_files( "data_with_non_numeric_values", ], ) -def test_process_correctly_filters_and_saves_data( +def test_process_filters_and_saves_data_correctly( pipeline: EligibilityPipeline, input_data_fixture: str, expected_eligible: List[str], @@ -177,7 +177,7 @@ def test_process_correctly_filters_and_saves_data( _assert_output_files(pipeline, current_date_val, input_data, expected_eligible, expected_ineligible) -def test_process_raises_valueerror_on_invalid_structure(pipeline: EligibilityPipeline): +def test_process_fails_on_invalid_dataframe_structure(pipeline: EligibilityPipeline): """ Tests that `process` correctly raises a ValueError when the input DataFrame is missing required columns. @@ -190,7 +190,7 @@ def test_process_raises_valueerror_on_invalid_structure(pipeline: EligibilityPip pipeline.process(invalid_df, current_date=date.today()) -def test_process_with_none_input_raises_attribute_error(pipeline: EligibilityPipeline): +def test_process_fails_on_none_input(pipeline: EligibilityPipeline): """ Tests that `process` raises an AttributeError when the input is not a DataFrame, as it will fail when trying to access attributes like `columns`. @@ -216,7 +216,7 @@ def test_process_with_none_input_raises_attribute_error(pipeline: EligibilityPip ], ids=["standard_cleanup", "zero_max_age", "all_recent_are_kept", "negative_max_age_keeps_all"], ) -def test_clean_old_date_directories_removes_old_and_preserves_new( +def test_clean_old_date_directories_removes_correct_directories( pipeline: EligibilityPipeline, max_age, days_to_create, expected_to_exist, expected_to_be_deleted ): """ @@ -243,7 +243,7 @@ def test_clean_old_date_directories_removes_old_and_preserves_new( assert not dirs_to_create[day].exists(), f"Directory for {day} days ago should have been deleted." -def test_clean_old_date_directories_ignores_malformed_dirs_and_files(pipeline: EligibilityPipeline): +def test_clean_old_date_directories_ignores_malformed_names(pipeline: EligibilityPipeline): """ Tests that `clean_old_date_directories` ignores directories with names that are not in date format and also ignores loose files. @@ -270,7 +270,7 @@ def test_clean_old_date_directories_ignores_malformed_dirs_and_files(pipeline: E assert some_file.exists() -def test_clean_old_date_directories_runs_without_error_if_output_dir_missing( +def test_clean_old_date_directories_handles_missing_output_dir( pipeline: EligibilityPipeline, caplog: pytest.LogCaptureFixture ): """ @@ -286,7 +286,7 @@ def test_clean_old_date_directories_runs_without_error_if_output_dir_missing( # --- Tests for get_date_output_directory() --- -def test_get_date_output_directory_returns_correct_format(pipeline: EligibilityPipeline): +def test_get_date_output_directory_returns_correct_path_format(pipeline: EligibilityPipeline): """ Tests that `get_date_output_directory` returns a correctly formatted path for a given date. @@ -314,7 +314,7 @@ def test_get_date_output_directory_returns_correct_format(pipeline: EligibilityP ], ids=["valid_structure", "missing_one_column", "empty_with_missing_column"], ) -def test_validate_dataframe_structure( +def test_validate_dataframe_structure_validates_correctly( pipeline: EligibilityPipeline, df_data: dict, required_cols: List[str], should_raise: bool ): """ @@ -335,7 +335,7 @@ def test_validate_dataframe_structure( # --- Tests for get_directory_size_info() --- -def test_get_directory_size_info_with_content(pipeline: EligibilityPipeline): +def test_get_directory_size_info_succeeds_with_content(pipeline: EligibilityPipeline): """ Tests `get_directory_size_info` for a directory with files and subdirectories. """ @@ -358,7 +358,7 @@ def test_get_directory_size_info_with_content(pipeline: EligibilityPipeline): assert info["path"] == str(output_dir) -def test_get_directory_size_info_for_empty_directory(pipeline: EligibilityPipeline): +def test_get_directory_size_info_succeeds_for_empty_directory(pipeline: EligibilityPipeline): """ Tests `get_directory_size_info` for an empty directory. """ @@ -376,7 +376,7 @@ def test_get_directory_size_info_for_empty_directory(pipeline: EligibilityPipeli assert info["file_count"] == 0 -def test_get_directory_size_info_for_non_existent_directory(pipeline: EligibilityPipeline): +def test_get_directory_size_info_succeeds_for_non_existent_directory(pipeline: EligibilityPipeline): """ Tests `get_directory_size_info` for a directory that does not exist. """ @@ -395,7 +395,7 @@ def test_get_directory_size_info_for_non_existent_directory(pipeline: Eligibilit assert info["file_count"] == 0 -def test_get_directory_size_info_with_megabyte_content(pipeline: EligibilityPipeline): +def test_get_directory_size_info_calculates_megabytes_correctly(pipeline: EligibilityPipeline): """ Tests `get_directory_size_info` correctly calculates size in megabytes. """ diff --git a/tests/test_key_validator.py b/tests/test_key_validator.py index 8cdb38a..aceeea8 100644 --- a/tests/test_key_validator.py +++ b/tests/test_key_validator.py @@ -26,7 +26,7 @@ VALID_KEY_TEST_CASES, ids=[case[0] for case in VALID_KEY_TEST_CASES], ) -def test_valid_keys_are_formatted_correctly(test_id, input_key, expected): +def test_validate_and_format_private_key_succeeds_on_valid_keys(test_id, input_key, expected): """ Test that various valid private key formats are correctly validated and formatted. This single test covers multiple valid input scenarios. @@ -56,7 +56,7 @@ def test_valid_keys_are_formatted_correctly(test_id, input_key, expected): INVALID_FORMAT_TEST_CASES, ids=[case[0] for case in INVALID_FORMAT_TEST_CASES], ) -def test_invalid_key_format_raises_error(test_id, invalid_key): +def test_validate_and_format_private_key_fails_on_invalid_format(test_id, invalid_key): """ Test that keys with an invalid format (incorrect length or non-hex characters) raise a KeyValidationError with a specific message. @@ -85,7 +85,7 @@ def test_invalid_key_format_raises_error(test_id, invalid_key): INVALID_INPUT_TYPE_CASES, ids=[case[0] for case in INVALID_INPUT_TYPE_CASES], ) -def test_invalid_input_type_raises_error(test_id, invalid_input): +def test_validate_and_format_private_key_fails_on_invalid_input_type(test_id, invalid_input): """ Test that invalid input types (e.g., None, non-string) or an empty string raise a KeyValidationError with a specific message. diff --git a/tests/test_retry_decorator.py b/tests/test_retry_decorator.py index 91c10f0..2ac9c45 100644 --- a/tests/test_retry_decorator.py +++ b/tests/test_retry_decorator.py @@ -17,7 +17,7 @@ class CustomError(Exception): pass -def test_decorated_function_called_once_on_success(): +def test_retry_with_backoff_calls_function_once_on_success(): """ Tests that the decorated function is called only once if it succeeds on the first attempt. """ @@ -36,7 +36,7 @@ def decorated_func(): mock_func.assert_called_once() -def test_retries_on_exception_and_reraises(): +def test_retry_with_backoff_retries_and_reraises_on_exception(): """ Tests that the decorator retries on a specified exception, up to the max number of attempts, and then re-raises the exception. @@ -59,7 +59,7 @@ def decorated_func(): assert mock_func.call_count == max_attempts -def test_exception_is_suppressed_with_reraise_false(): +def test_retry_with_backoff_suppresses_exception_with_reraise_false(): """ Tests that the final exception is wrapped in a RetryError when reraise is False. """ @@ -76,7 +76,7 @@ def decorated_func(): decorated_func() -def test_succeeds_after_initial_failures(): +def test_retry_with_backoff_succeeds_after_initial_failures(): """ Tests that the decorator stops retrying and returns the result as soon as the decorated function succeeds. @@ -99,7 +99,7 @@ def decorated_func(): assert mock_func.call_count == 3 -def test_backoff_timing(monkeypatch): +def test_retry_with_backoff_engages_exponential_backoff_timing(monkeypatch): """ Tests that there is a measurable delay between retries, confirming that the exponential backoff is being engaged. diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 3c816fe..07a6f9c 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -13,11 +13,6 @@ from src.models.scheduler import Scheduler from src.utils.configuration import ConfigurationError -# Because the Scheduler imports the oracle module at the top level, we need to mock it -# before the scheduler is imported for any test. -sys.modules["src.models.service_quality_oracle"] = MagicMock() - - MOCK_CONFIG = { "SLACK_WEBHOOK_URL": "http://fake.slack.com", "SCHEDULED_RUN_TIME": "10:00", @@ -26,6 +21,18 @@ MOCK_CONFIG_NO_SLACK = {"SCHEDULED_RUN_TIME": "10:00", "SLACK_WEBHOOK_URL": None} +@pytest.fixture(autouse=True) +def mock_oracle_module(): + """ + Fixture to mock the entire service_quality_oracle module. + This prevents the real module from being loaded during scheduler tests, + isolating them and preventing side effects. `autouse=True` ensures it runs + for every test in this file. + """ + with patch.dict(sys.modules, {"src.models.service_quality_oracle": MagicMock()}): + yield + + @pytest.fixture def mock_dependencies(): """A comprehensive fixture to mock all external dependencies of the scheduler.""" @@ -84,7 +91,7 @@ class TestSchedulerInitialization: """Tests for the __init__ and initialize methods.""" - def test_initialize_succeeds_on_happy_path(self, mock_dependencies: SimpleNamespace): + def test_init_succeeds_on_happy_path(self, mock_dependencies: SimpleNamespace): """Tests that the scheduler initializes correctly, scheduling the job and performing checks.""" with patch.object(Scheduler, "check_missed_runs") as mock_check_missed: scheduler = Scheduler() @@ -104,7 +111,7 @@ def test_initialize_succeeds_on_happy_path(self, mock_dependencies: SimpleNamesp mock_dependencies.open.assert_any_call("/app/healthcheck", "w") - def test_initialize_handles_configuration_error_and_exits(self, mock_dependencies: SimpleNamespace): + def test_init_handles_config_error_and_exits(self, mock_dependencies: SimpleNamespace): """Tests that sys.exit is called if initialization fails due to a configuration error.""" mock_dependencies.validate.side_effect = ConfigurationError("Missing env var") mock_dependencies.os.environ.get.return_value = MOCK_CONFIG["SLACK_WEBHOOK_URL"] @@ -116,7 +123,7 @@ def test_initialize_handles_configuration_error_and_exits(self, mock_dependencie mock_dependencies.exit.assert_called_once_with(1) - def test_initialize_handles_generic_exception_and_exits(self, mock_dependencies: SimpleNamespace): + def test_init_handles_generic_exception_and_exits(self, mock_dependencies: SimpleNamespace): """Tests that sys.exit is called for any non-ConfigurationError exception during init.""" mock_dependencies.load_config.side_effect = Exception("A wild error appears!") mock_dependencies.os.environ.get.return_value = MOCK_CONFIG["SLACK_WEBHOOK_URL"] @@ -128,7 +135,7 @@ def test_initialize_handles_generic_exception_and_exits(self, mock_dependencies: mock_dependencies.exit.assert_called_once_with(1) - def test_initialize_runs_oracle_when_run_on_startup_is_true(self, mock_dependencies: SimpleNamespace): + def test_init_runs_oracle_on_startup_if_flag_is_true(self, mock_dependencies: SimpleNamespace): """Tests that the oracle is executed immediately if RUN_ON_STARTUP is true.""" mock_dependencies.os.environ.get.side_effect = lambda key, default: ( "true" if key == "RUN_ON_STARTUP" else "false" @@ -139,7 +146,7 @@ def test_initialize_runs_oracle_when_run_on_startup_is_true(self, mock_dependenc mock_run_oracle.assert_called_once_with() - def test_initialize_skips_startup_run_when_flag_is_false(self, mock_dependencies: SimpleNamespace): + def test_init_skips_oracle_on_startup_if_flag_is_false(self, mock_dependencies: SimpleNamespace): """Tests that the oracle is NOT executed on startup if RUN_ON_STARTUP is 'false'.""" mock_dependencies.os.environ.get.return_value = "false" with patch.object(Scheduler, "run_oracle") as mock_run_oracle: @@ -148,7 +155,7 @@ def test_initialize_skips_startup_run_when_flag_is_false(self, mock_dependencies mock_run_oracle.assert_not_called() - def test_initialize_handles_disabled_slack(self, mock_dependencies: SimpleNamespace): + def test_init_handles_disabled_slack(self, mock_dependencies: SimpleNamespace): """Tests that initialization proceeds without Slack if the webhook is missing.""" mock_dependencies.load_config.return_value = MOCK_CONFIG_NO_SLACK mock_dependencies.create_slack.return_value = None @@ -174,7 +181,7 @@ class TestSchedulerStateManagement: ids=["recent_date", "file_not_exists", "date_is_capped", "corrupted_file"], ) @patch("src.models.scheduler.datetime") - def test_get_last_run_date( + def test_get_last_run_date_handles_various_scenarios( self, mock_datetime, file_content, @@ -202,7 +209,7 @@ def test_get_last_run_date( @patch("src.models.scheduler.datetime") - def test_get_last_run_date_logs_warning_when_capping( + def test_get_last_run_date_logs_warning_on_capping( self, mock_datetime, scheduler: Scheduler, mock_dependencies: SimpleNamespace ): """Tests that a warning is logged when the last run date is capped.""" @@ -216,7 +223,7 @@ def test_get_last_run_date_logs_warning_when_capping( @patch("src.models.scheduler.datetime") - def test_get_last_run_date_logs_error_on_corrupted_file( + def test_get_last_run_date_logs_error_on_corrupt_file( self, mock_datetime, scheduler: Scheduler, mock_dependencies: SimpleNamespace ): """Tests that an error is logged when the last run date file is corrupt.""" @@ -229,7 +236,9 @@ def test_get_last_run_date_logs_error_on_corrupted_file( mock_dependencies.logger.error.assert_called_once() - def test_save_last_run_date_writes_to_file(self, scheduler: Scheduler, mock_dependencies: SimpleNamespace): + def test_save_last_run_date_writes_correctly_to_file( + self, scheduler: Scheduler, mock_dependencies: SimpleNamespace + ): """Tests that `save_last_run_date` correctly writes the formatted date string to a file.""" run_date = date(2023, 10, 27) expected_dir = "/app/data" @@ -248,7 +257,7 @@ def test_save_last_run_date_writes_to_file(self, scheduler: Scheduler, mock_depe mock_dependencies.open().write.assert_called_once_with("2023-10-27") - def test_save_last_run_date_logs_error_on_io_failure( + def test_save_last_run_date_logs_error_on_io_error( self, scheduler: Scheduler, mock_dependencies: SimpleNamespace ): """Tests that an error is logged if writing the last run date fails.""" @@ -257,7 +266,7 @@ def test_save_last_run_date_logs_error_on_io_failure( mock_dependencies.logger.error.assert_called_with("Error saving last run date: Permission denied") - def test_update_healthcheck_writes_timestamp_and_message( + def test_update_healthcheck_writes_correct_content( self, scheduler: Scheduler, mock_dependencies: SimpleNamespace ): """Tests that `update_healthcheck` writes a timestamp and message to the healthcheck file.""" @@ -269,7 +278,7 @@ def test_update_healthcheck_writes_timestamp_and_message( assert "testing" in file_handle.write.call_args_list[1].args[0] - def test_update_healthcheck_logs_warning_on_io_failure( + def test_update_healthcheck_logs_warning_on_io_error( self, scheduler: Scheduler, mock_dependencies: SimpleNamespace ): """Tests that a warning is logged if the healthcheck file cannot be updated.""" @@ -291,7 +300,9 @@ class TestSchedulerTasks: ], ids=["missed_run", "recent_run", "no_history"], ) - def test_check_missed_runs(self, last_run_delta, should_run_oracle, should_notify_slack, scheduler: Scheduler): + def test_check_missed_runs_handles_various_scenarios( + self, last_run_delta, should_run_oracle, should_notify_slack, scheduler: Scheduler + ): """Tests `check_missed_runs` for various scenarios.""" today = datetime(2023, 10, 27).date() last_run_date = (today - last_run_delta) if last_run_delta else None @@ -312,7 +323,7 @@ def test_check_missed_runs(self, last_run_delta, should_run_oracle, should_notif scheduler.slack_notifier.send_info_notification.assert_not_called() - def test_check_missed_runs_does_not_notify_if_slack_is_disabled(self, scheduler: Scheduler): + def test_check_missed_runs_skips_notification_if_slack_disabled(self, scheduler: Scheduler): """Tests that no Slack notification is sent for missed runs if Slack is disabled.""" scheduler.slack_notifier = None scheduler.get_last_run_date = MagicMock(return_value=datetime(2023, 10, 27).date() - timedelta(days=3)) @@ -331,7 +342,7 @@ def test_check_missed_runs_does_not_notify_if_slack_is_disabled(self, scheduler: ], ids=["with_no_override", "with_override"], ) - def test_run_oracle_calls_main_and_updates_state( + def test_run_oracle_calls_main_and_updates_state_on_success( self, run_date_override, expected_date_in_call, scheduler: Scheduler, mock_dependencies: SimpleNamespace ): """Tests that `run_oracle` calls the main oracle function and saves state upon success.""" @@ -364,7 +375,9 @@ class TestSchedulerRunLoop: """Tests for the main `run` loop of the scheduler.""" - def test_run_loop_calls_run_pending_and_sleeps(self, scheduler: Scheduler, mock_dependencies: SimpleNamespace): + def test_run_loop_calls_run_pending_and_sleeps_correctly( + self, scheduler: Scheduler, mock_dependencies: SimpleNamespace + ): """Tests that the run loop correctly calls schedule and sleeps.""" mock_dependencies.schedule.run_pending.side_effect = [None, None, KeyboardInterrupt] scheduler.update_healthcheck = MagicMock() @@ -392,7 +405,9 @@ def test_run_loop_handles_keyboard_interrupt_gracefully( mock_dependencies.exit.assert_not_called() - def test_run_loop_handles_unexpected_exception(self, scheduler: Scheduler, mock_dependencies: SimpleNamespace): + def test_run_loop_handles_unexpected_exception_and_exits( + self, scheduler: Scheduler, mock_dependencies: SimpleNamespace + ): """Tests that a generic exception is caught, a notification is sent, and the program exits.""" test_exception = Exception("Critical failure") mock_dependencies.schedule.run_pending.side_effect = test_exception diff --git a/tests/test_service_quality_oracle.py b/tests/test_service_quality_oracle.py index 193244c..9a15deb 100644 --- a/tests/test_service_quality_oracle.py +++ b/tests/test_service_quality_oracle.py @@ -51,6 +51,7 @@ def oracle_context(): patch("src.models.bigquery_provider.BigQueryProvider") as mock_bq_provider_cls, patch("src.models.eligibility_pipeline.EligibilityPipeline") as mock_pipeline_cls, patch("src.models.blockchain_client.BlockchainClient") as mock_client_cls, + patch("src.utils.circuit_breaker.CircuitBreaker") as mock_circuit_breaker_cls, patch("src.models.service_quality_oracle.Path") as mock_path_cls, patch("logging.Logger.error") as mock_logger_error, ): @@ -60,6 +61,10 @@ def oracle_context(): mock_path_instance.resolve.return_value.parents.__getitem__.return_value = mock_project_root mock_path_cls.return_value = mock_path_instance + # Configure mock CircuitBreaker to always return True on check + mock_breaker_instance = mock_circuit_breaker_cls.return_value + mock_breaker_instance.check.return_value = True + # Configure instance return values for mocked classes mock_bq_provider = mock_bq_provider_cls.return_value mock_bq_provider.fetch_indexer_issuance_eligibility_data.return_value = pd.DataFrame() @@ -68,7 +73,13 @@ def oracle_context(): mock_pipeline.process.return_value = (["0xEligible"], ["0xIneligible"]) mock_client = mock_client_cls.return_value - mock_client.batch_allow_indexers_issuance_eligibility.return_value = ["http://tx-link"] + mock_client.batch_allow_indexers_issuance_eligibility.return_value = ( + ["http://tx-link"], + "https://test-rpc.com", + ) + + # Configure Slack notifier + mock_slack_notifier = mock_create_slack.return_value # Reload module so that patched objects are used inside it if "src.models.service_quality_oracle" in sys.modules: @@ -81,19 +92,20 @@ def oracle_context(): "main": sqo.main, "setup_creds": mock_setup_creds, "load_config": mock_load_config, - "slack": {"create": mock_create_slack, "notifier": mock_create_slack.return_value}, + "slack": {"create": mock_create_slack, "notifier": mock_slack_notifier}, "bq_provider_cls": mock_bq_provider_cls, "bq_provider": mock_bq_provider, "pipeline_cls": mock_pipeline_cls, "pipeline": mock_pipeline, "client_cls": mock_client_cls, "client": mock_client, + "circuit_breaker": mock_breaker_instance, "project_root": mock_project_root, "logger_error": mock_logger_error, } -def test_main_successful_run(oracle_context): +def test_main_succeeds_on_happy_path(oracle_context): """Test the primary successful execution path of the oracle.""" ctx = oracle_context ctx["main"]() @@ -119,6 +131,8 @@ def test_main_successful_run(oracle_context): replace=True, ) + ctx["circuit_breaker"].reset.assert_called_once() + ctx["circuit_breaker"].record_failure.assert_not_called() ctx["slack"]["notifier"].send_success_notification.assert_called_once() @@ -134,7 +148,7 @@ def test_main_successful_run(oracle_context): ("client", "Blockchain Submission"), ], ) -def test_main_handles_failures_gracefully(oracle_context, failing_component, expected_stage): +def test_main_handles_failures_at_each_stage(oracle_context, failing_component, expected_stage): """Test that failures at different stages are caught, logged, and cause a system exit.""" ctx = oracle_context error = Exception(f"{failing_component} error") @@ -156,6 +170,7 @@ def test_main_handles_failures_gracefully(oracle_context, failing_component, exp assert excinfo.value.code == 1, "The application should exit with status code 1 on failure." + ctx["circuit_breaker"].record_failure.assert_called_once() ctx["logger_error"].assert_any_call(f"Oracle failed at stage '{expected_stage}': {error}", exc_info=True) # If config loading or Slack notifier creation fails, no notification can be sent. @@ -168,7 +183,7 @@ def test_main_handles_failures_gracefully(oracle_context, failing_component, exp assert call_args["error_message"] == str(error) -def test_main_with_date_override(oracle_context): +def test_main_uses_date_override_correctly(oracle_context): """Test that providing a date override correctly adjusts the analysis window.""" ctx = oracle_context override = date(2023, 10, 27) @@ -181,7 +196,7 @@ def test_main_with_date_override(oracle_context): assert args == (start_expected, override) -def test_main_with_no_eligible_indexers(oracle_context): +def test_main_succeeds_with_no_eligible_indexers(oracle_context): """Test the execution path when the pipeline finds no eligible indexers.""" ctx = oracle_context ctx["pipeline"].process.return_value = ([], ["0xIneligible"]) @@ -196,10 +211,11 @@ def test_main_with_no_eligible_indexers(oracle_context): batch_size=MOCK_CONFIG["BATCH_SIZE"], replace=True, ) + ctx["circuit_breaker"].reset.assert_called_once() ctx["slack"]["notifier"].send_success_notification.assert_called_once() -def test_main_no_slack_configured(oracle_context): +def test_main_succeeds_when_slack_is_not_configured(oracle_context): """Test that the oracle runs without sending notifications if Slack is not configured.""" ctx = oracle_context ctx["slack"]["create"].return_value = None @@ -208,11 +224,12 @@ def test_main_no_slack_configured(oracle_context): ctx["load_config"].assert_called_once() ctx["client"].batch_allow_indexers_issuance_eligibility.assert_called_once() + ctx["circuit_breaker"].reset.assert_called_once() ctx["slack"]["notifier"].send_success_notification.assert_not_called() ctx["slack"]["notifier"].send_failure_notification.assert_not_called() -def test_main_failure_notification_fails(oracle_context): +def test_main_exits_gracefully_if_failure_notification_fails(oracle_context): """Test that the oracle exits gracefully if sending the failure notification also fails.""" ctx = oracle_context ctx["pipeline"].process.side_effect = Exception("Pipeline error") @@ -222,6 +239,7 @@ def test_main_failure_notification_fails(oracle_context): ctx["main"]() assert excinfo.value.code == 1 + ctx["circuit_breaker"].record_failure.assert_called_once() ctx["logger_error"].assert_any_call( "Oracle failed at stage 'Data Processing and Artifact Generation': Pipeline error", exc_info=True, @@ -229,7 +247,7 @@ def test_main_failure_notification_fails(oracle_context): ctx["logger_error"].assert_any_call("Failed to send Slack failure notification: Slack is down", exc_info=True) -def test_main_success_notification_fails(oracle_context): +def test_main_logs_error_but_succeeds_if_success_notification_fails(oracle_context): """Test that a failure in sending the success notification is logged but does not cause an exit.""" ctx = oracle_context error = Exception("Slack API error on success") @@ -237,6 +255,7 @@ def test_main_success_notification_fails(oracle_context): ctx["main"]() + ctx["circuit_breaker"].reset.assert_called_once() ctx["logger_error"].assert_called_once_with( f"Failed to send Slack success notification: {error}", exc_info=True ) diff --git a/tests/test_slack_notifier.py b/tests/test_slack_notifier.py index b645157..abafe86 100644 --- a/tests/test_slack_notifier.py +++ b/tests/test_slack_notifier.py @@ -23,13 +23,13 @@ def mock_requests(): # 1. Initialization and Factory Tests -def test_successful_initialization(): +def test_init_succeeds_with_webhook_url(): """Tests that the SlackNotifier can be initialized with a webhook URL.""" notifier = SlackNotifier(MOCK_WEBHOOK_URL) assert notifier.webhook_url == MOCK_WEBHOOK_URL -def test_create_slack_notifier_with_url(): +def test_create_slack_notifier_returns_instance_with_url(): """Tests that the factory function returns a Notifier instance when a URL is provided.""" notifier = create_slack_notifier(MOCK_WEBHOOK_URL) assert isinstance(notifier, SlackNotifier) @@ -37,7 +37,7 @@ def test_create_slack_notifier_with_url(): @pytest.mark.parametrize("url", [None, "", " "]) -def test_create_slack_notifier_without_url(url: str): +def test_create_slack_notifier_returns_none_without_url(url: str): """Tests that the factory function returns None if the URL is missing or empty.""" notifier = create_slack_notifier(url) assert notifier is None @@ -46,7 +46,7 @@ def test_create_slack_notifier_without_url(url: str): # 2. Sending Logic Tests -def test_send_message_success(mock_requests: MagicMock): +def test_send_message_succeeds_on_happy_path(mock_requests: MagicMock): """Tests a successful message send.""" notifier = SlackNotifier(MOCK_WEBHOOK_URL) mock_requests.return_value.status_code = 200 @@ -60,7 +60,7 @@ def test_send_message_success(mock_requests: MagicMock): ) -def test_send_message_retry_on_failure(mock_requests: MagicMock): +def test_send_message_retries_on_request_failure(mock_requests: MagicMock): """Tests that the retry decorator is engaged on a request failure.""" notifier = SlackNotifier(MOCK_WEBHOOK_URL) # The decorator is configured with max_attempts=8 @@ -79,7 +79,7 @@ def test_send_message_retry_on_failure(mock_requests: MagicMock): # 3. Payload Construction Tests -def test_send_success_notification_payload(mock_requests: MagicMock): +def test_send_success_notification_builds_correct_payload(mock_requests: MagicMock): """Tests that the success notification has the correct structure.""" notifier = SlackNotifier(MOCK_WEBHOOK_URL) notifier.send_success_notification( @@ -106,7 +106,7 @@ def test_send_success_notification_payload(mock_requests: MagicMock): assert "Batch 1: http://etherscan.io/tx/1" in fields["Transactions"] -def test_send_failure_notification_payload(mock_requests: MagicMock): +def test_send_failure_notification_builds_correct_payload(mock_requests: MagicMock): """Tests that the failure notification has the correct structure.""" notifier = SlackNotifier(MOCK_WEBHOOK_URL) notifier.send_failure_notification(error_message="Something broke", stage="Test Stage") @@ -123,7 +123,7 @@ def test_send_failure_notification_payload(mock_requests: MagicMock): assert "Something broke" in fields["Error"] -def test_send_info_notification_payload(mock_requests: MagicMock): +def test_send_info_notification_builds_correct_payload(mock_requests: MagicMock): """Tests that the info notification has the correct structure.""" notifier = SlackNotifier(MOCK_WEBHOOK_URL) notifier.send_info_notification(message="Just an FYI", title="Friendly Reminder")