Skip to content

chore(SQO): Create unit tests. #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Jun 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
3a527ce
Detailed TODO list for tests
MoonBoi9001 Jun 13, 2025
8c8c1cc
Create test suite for the blockchain client module
MoonBoi9001 Jun 13, 2025
c5c02fd
Create test suite for the bigquery data provider test module
MoonBoi9001 Jun 13, 2025
6879a0b
rename bigquery_data_access_provider to bigquery_provider and update …
MoonBoi9001 Jun 13, 2025
7b97246
Create test suite for the eligibility pipeline
MoonBoi9001 Jun 13, 2025
b2999dc
Create test suite for the scheduler
MoonBoi9001 Jun 13, 2025
fde990e
Create test suite for the core oracle orchestrator module.py
MoonBoi9001 Jun 13, 2025
07d0f98
Create test suite for the config module
MoonBoi9001 Jun 13, 2025
002db10
Create test suite for key validator
MoonBoi9001 Jun 13, 2025
46e7fbe
Create test suite for retry decorator
MoonBoi9001 Jun 13, 2025
9e94588
Create test suite for retry decorator
MoonBoi9001 Jun 13, 2025
00a2054
Create test suite for slack notifier
MoonBoi9001 Jun 13, 2025
d9e0a8a
Update test_retry_decorator.py
MoonBoi9001 Jun 13, 2025
0d87722
Update test_retry_decorator.py
MoonBoi9001 Jun 13, 2025
5654029
Update service_quality_oracle.py
MoonBoi9001 Jun 13, 2025
86e5873
Create __init__.py
MoonBoi9001 Jun 13, 2025
ae3d9c4
Update blockchain_client.py
MoonBoi9001 Jun 13, 2025
056155d
fixes from tests
MoonBoi9001 Jun 14, 2025
52fd2dc
full test suite update. All now pass.
MoonBoi9001 Jun 14, 2025
5eae5a9
Improve BigQuery Provider test module
MoonBoi9001 Jun 16, 2025
9595d82
Improve Blockchain Client test module
MoonBoi9001 Jun 18, 2025
42b93c0
Improve configuration and key validator test modules
MoonBoi9001 Jun 18, 2025
d83f8a4
Update ELIGIBILITY_CRITERIA.md
MoonBoi9001 Jun 19, 2025
35441b6
Improve configuration and eligibility pipeline tests
MoonBoi9001 Jun 20, 2025
f55d801
Improve unit tests for the scheduler module
MoonBoi9001 Jun 20, 2025
f882b1c
Improve unit tests for the service quality oracle core module
MoonBoi9001 Jun 20, 2025
d9ea35c
Ruff
MoonBoi9001 Jun 21, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,8 @@ subgraph/
contracts/contract.abi.json

# Ignore Ruff
.ruff_cache/
.ruff_cache/
/tests/__pycache__
/src/utils/__pycache__
/src/models/__pycache__
.coverage
24 changes: 12 additions & 12 deletions ELIGIBILITY_CRITERIA.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

We will announce changes to the eligibility criteria in the table below. Once the change goes live then it will be reflected in the eligibility criteria section of this document.

| Upcoming Requirement | Justification | Date Introduced (YYYY-MM-DD)|
|----------------------|---------------|-----------------------------|
| Upcoming Requirement | Justification | Date Updated/Introduced (YYYY-MM-DD)|
|----------------------|---------------|-------------------------------------|
| **Requirement 1:** | This is a placeholder for future criteria, watch this space to stay informed. We will also announce any upcoming requirements via our existing official channels. | YYYY-MM-DD |

> **Note**:
> **Note**:
>
> When announcing new eligibility criteria we will allow a window for indexers to prepare their infrastructure before any new criteria goes live, refer to the date introduced column to see when new criteria will merge.
> When announcing new eligibility criteria we will allow a window for indexers to prepare their infrastructure before any new/updated criteria goes live, refer to the `Date Updated/Introduced (YYYY-MM-DD)` column to see when upcoming eligibility criteria will merge.

# Eligibility Criteria

Expand All @@ -22,9 +22,9 @@ The Service Quality Oracle determines which indexers are eligible to receive ind
3. The query was served <50,000 blocks behind chainhead.
4. The subgraph had at least 500 GRT in curation signal at the time that the query was served.

> **Note**:
> **Note**:
>
> All four quality criteria must be satisfied simultaneously for a query to count towards the daily requirement.
> All four quality criteria must be satisfied simultaneously for a query to count towards the daily requirement.
>
> The above query criteria must be satisfied on 10+ subgraphs per day, for 5+ days in any given 28 day rolling window.
>
Expand All @@ -34,9 +34,9 @@ The Service Quality Oracle determines which indexers are eligible to receive ind



| Requirement | Justification |
|-------------|---------------|
| **Query Status:** The query must have a `200 OK` HTTP response status indicating query success | Indexer infrastructure needs to be capable of serving successful queries to benefit data consumers. |
| **Query Latency:** The query response must be delivered to the gateway in `< 5,000 ms` | Fast query responses are important to data consumers. |
| **Query Freshness:** The query must be served from a subgraph that is `< 50,000 blocks` behind chainhead | Data needs to be fresh to be useful to data consumers. |
| **Subgraph Signal:** The subgraph needs to have `≥ 500 GRT` in curation signal at the time when the query was served. | Indexers are encouraged to serve data on subgraphs that have curation signal. This also creates an economic barrier against those that prefer to game the system. |
| Requirement | Justification | Date Updated/Introduced (YYYY-MM-DD)|
|-------------|---------------|-------------------------------------|
| **Query Status:** The query must have a `200 OK` HTTP response status indicating query success | Indexer infrastructure needs to be capable of serving successful queries to benefit data consumers. | TBD (at genesis of the SQO) |
| **Query Latency:** The query response must be delivered to the gateway in `< 5,000 ms` | Fast query responses are important to data consumers. | TBD (at genesis of the SQO) |
| **Query Freshness:** The query must be served from a subgraph that is `< 50,000 blocks` behind chainhead | Data needs to be fresh to be useful to data consumers. | TBD (at genesis of the SQO) |
| **Subgraph Signal:** The subgraph needs to have `≥ 500 GRT` in curation signal at the time when the query was served. | Indexers are encouraged to serve data on subgraphs that have curation signal. This also creates an economic barrier against those that prefer to game the system. | TBD (at genesis of the SQO) |
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ The application follows a clear data flow, managed by a daily scheduler:

2. **Orchestrator (`service_quality_oracle.py`)**: For each run, this module orchestrates the end-to-end process by coordinating the other components.

3. **Data Fetching (`bigquery_data_access_provider.py`)**: The orchestrator calls this provider to execute a configurable SQL query against Google BigQuery, fetching the raw indexer performance data.
3. **Data Fetching (`bigquery_provider.py`)**: The orchestrator calls this provider to execute a configurable SQL query against Google BigQuery, fetching the raw indexer performance data.

4. **Data Processing (`eligibility_pipeline.py`)**: The raw data is passed to this module, which processes it, filters for eligible and ineligible indexers, and generates CSV artifacts for auditing and record-keeping.

Expand Down
7 changes: 7 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,10 @@ max-complexity = 10
ignore_missing_imports = true
no_strict_optional = true
explicit_package_bases = true

[tool.pytest.ini_options]
minversion = "6.0"
addopts = "--cov=src --cov-report=term-missing -v"
testpaths = ["tests"]
python_files = "test_*.py"
pythonpath = ["."]
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ aiohttp>=3.9.0 # For async HTTP requests (used by web3)
pytest>=8.0.0
pytest-cov>=6.0.0
pytest-mock>=3.0.0
pytest-snapshot>=0.9.0
mypy>=1.0.0
types-pytz # Type stubs for pytz
types-requests # Type stubs for requests
Expand Down
Empty file added src/models/__init__.py
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ def fetch_indexer_issuance_eligibility_data(self, start_date: date, end_date: da
- eligible_for_indexing_rewards: Whether the indexer is eligible for indexing rewards.
"""
# Construct the query
query = self._get_indexer_eligibility_query(start_date, end_date)
query = self._get_indexer_eligibility_query(start_date=start_date, end_date=end_date)

# Return the results df
return self._read_gbq_dataframe(query)
37 changes: 7 additions & 30 deletions src/models/blockchain_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,15 +316,15 @@ def _determine_transaction_nonce(self, sender_address: ChecksumAddress, replace:
return nonce


def _get_gas_prices(self, replace: bool) -> Tuple[int, int]:
def _get_gas_prices(self) -> Tuple[int, int]:
"""Get base fee and max priority fee for transaction."""
# Get current gas prices with detailed logging
try:
latest_block_data = self._execute_rpc_call(self.w3.eth.get_block, "latest")
latest_block = cast(BlockData, latest_block_data)
base_fee_hex = latest_block["baseFeePerGas"]
base_fee = int(base_fee_hex)
logger.info(f"Latest block base fee: {base_fee/1e9:.2f} gwei")
base_fee = int(base_fee_hex) if isinstance(base_fee_hex, int) else int(str(base_fee_hex), 16)
logger.info(f"Latest block base fee: {base_fee / 1e9:.2f} gwei")

# If the base fee cannot be retrieved, use a fallback value
except Exception as e:
Expand All @@ -334,7 +334,7 @@ def _get_gas_prices(self, replace: bool) -> Tuple[int, int]:
# Try to get the max priority fee
try:
max_priority_fee = self._execute_rpc_call(lambda: self.w3.eth.max_priority_fee)
logger.info(f"Max priority fee: {max_priority_fee/1e9:.2f} gwei")
logger.info(f"Max priority fee: {max_priority_fee / 1e9:.2f} gwei")

# If the max priority fee cannot be retrieved, use a fallback value
except Exception as e:
Expand Down Expand Up @@ -364,15 +364,15 @@ def _build_transaction_params(
max_priority_fee_per_gas = max_priority_fee * 2
tx_params["maxFeePerGas"] = max_fee_per_gas
tx_params["maxPriorityFeePerGas"] = max_priority_fee_per_gas
logger.info(f"High gas for replacement: {max_fee_per_gas/1e9:.2f} gwei")
logger.info(f"High gas for replacement: {max_fee_per_gas / 1e9:.2f} gwei")

# If we are not replacing a pending transaction, use a lower gas price
else:
max_fee_per_gas = base_fee * 2 + max_priority_fee
max_priority_fee_per_gas = max_priority_fee
tx_params["maxFeePerGas"] = max_fee_per_gas
tx_params["maxPriorityFeePerGas"] = max_priority_fee_per_gas
logger.info(f"Standard gas: {max_fee_per_gas/1e9:.2f} gwei")
logger.info(f"Standard gas: {max_fee_per_gas / 1e9:.2f} gwei")

logger.info(f"Transaction parameters: nonce={nonce}, gas={gas_limit}, chain_id={chain_id}")
return tx_params
Expand Down Expand Up @@ -400,29 +400,6 @@ def _build_and_sign_transaction(
raise


def _handle_transaction_error(self, error_msg: str) -> None:
"""Handle and log specific transaction error types."""
# If the error message contains "insufficient funds", log the error
if "insufficient funds" in error_msg.lower():
logger.error("Insufficient funds to pay for gas")

# If the error message contains "nonce too low", log the error
elif "nonce too low" in error_msg.lower():
logger.error("Nonce is too low - transaction may have already been sent")

# If the error message contains "nonce too high", log the error
elif "nonce too high" in error_msg.lower():
logger.error("Nonce is too high - there may be pending transactions")

# If the error message contains "gas", log the error
elif "gas" in error_msg.lower():
logger.error("Gas-related issue - transaction may consume too much gas")

# If the error message contains "400", log the error
elif "400" in error_msg:
logger.error("HTTP 400 Bad Request - RPC provider rejected the request")


def _send_signed_transaction(self, signed_tx: Any) -> str:
"""
Send a signed transaction and wait for the receipt.
Expand Down Expand Up @@ -531,7 +508,7 @@ def _execute_complete_transaction(self, params: Dict) -> str:
nonce = self._determine_transaction_nonce(sender_address, replace)

# 5. Get gas prices
base_fee, max_priority_fee = self._get_gas_prices(replace)
base_fee, max_priority_fee = self._get_gas_prices()

# 6. Build transaction parameters
tx_params = self._build_transaction_params(
Expand Down
24 changes: 16 additions & 8 deletions src/models/eligibility_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,20 @@ def process(self, input_data_from_bigquery: pd.DataFrame, current_date: date) ->
required_cols = ["indexer", "eligible_for_indexing_rewards"]
self.validate_dataframe_structure(input_data_from_bigquery, required_cols)

# Make a copy to avoid modifying the original DataFrame and prevent SettingWithCopyWarning
processed_df = input_data_from_bigquery.copy()

# Coerce eligibility column to numeric, treating errors (e.g., non-numeric values) as NaN, then fill with 0
processed_df["eligible_for_indexing_rewards"] = pd.to_numeric(
processed_df["eligible_for_indexing_rewards"], errors="coerce"
).fillna(0)

# 2. Filter data into eligible and ineligible groups
eligible_df = input_data_from_bigquery[
input_data_from_bigquery["eligible_for_indexing_rewards"] == 1
].copy()
eligible_df = processed_df[processed_df["eligible_for_indexing_rewards"] == 1].copy()

ineligible_df = input_data_from_bigquery[
input_data_from_bigquery["eligible_for_indexing_rewards"] == 0
].copy()
ineligible_df = processed_df[processed_df["eligible_for_indexing_rewards"] != 1].copy()

# 3. Generate and save files
# 3. Generate and save files, ensuring the original data is used for the raw artifact
output_date_dir = self.get_date_output_directory(current_date)
self._generate_files(input_data_from_bigquery, eligible_df, ineligible_df, output_date_dir)

Expand Down Expand Up @@ -106,6 +110,10 @@ def clean_old_date_directories(self, max_age_before_deletion: int) -> None:
Args:
max_age_before_deletion: Maximum age in days before deleting data output
"""
if max_age_before_deletion < 0:
logger.info("Negative max_age_before_deletion provided; no directories will be removed.")
return

today = date.today()

# Check if the output directory exists
Expand Down Expand Up @@ -175,7 +183,7 @@ def validate_dataframe_structure(self, df: pd.DataFrame, required_columns: List[
# If any required columns are missing, raise an error
if missing_columns:
raise ValueError(
f"DataFrame missing required columns: {missing_columns}. " f"Found columns: {list(df.columns)}"
f"DataFrame missing required columns: {missing_columns}. Found columns: {list(df.columns)}"
)

# If all required columns are present, return True
Expand Down
6 changes: 6 additions & 0 deletions src/models/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ def update_healthcheck(self, message=None):
before_sleep=lambda retry_state: logger.warning(
f"Retry attempt {retry_state.attempt_number} after error: {retry_state.outcome.exception()}"
),
reraise=True,
)
def run_oracle(self, run_date_override=None):
"""
Expand Down Expand Up @@ -190,6 +191,11 @@ def initialize(self):

except Exception as e:
logger.error(f"Failed to initialize scheduler: {e}", exc_info=True)
if not self.slack_notifier:
webhook_url = os.environ.get("SLACK_WEBHOOK_URL")
if webhook_url:
self.slack_notifier = create_slack_notifier(webhook_url)

if self.slack_notifier:
self.slack_notifier.send_failure_notification(
error_message=str(e), stage="Scheduler Initialization", execution_time=0
Expand Down
44 changes: 26 additions & 18 deletions src/models/service_quality_oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@
from datetime import date, timedelta
from pathlib import Path

# Add project root to path
project_root_path = Path(__file__).resolve().parents[2]
sys.path.insert(0, str(project_root_path))

# Import data access utilities with absolute import
from src.models.bigquery_data_access_provider import BigQueryProvider
from src.models.bigquery_provider import BigQueryProvider
from src.models.blockchain_client import BlockchainClient
from src.models.eligibility_pipeline import EligibilityPipeline
from src.utils.configuration import credential_manager, load_config
from src.utils.configuration import (
credential_manager,
load_config,
)
from src.utils.slack_notifier import create_slack_notifier

# Set up basic logging
Expand All @@ -42,19 +41,22 @@ def main(run_date_override: date = None):
run_date_override: If provided, use this date for the run instead of today.
"""
start_time = time.time()
slack_notifier = None
stage = "Initialization"
project_root_path = Path(__file__).resolve().parents[2]
slack_notifier = None

try:
# Configuration and credentials
credential_manager.setup_google_credentials()
config = load_config()
slack_notifier = create_slack_notifier(config.get("SLACK_WEBHOOK_URL"))

if slack_notifier:
logger.info("Slack notifications enabled")
else:
logger.info("Slack notifications disabled (no webhook URL configured)")

credential_manager.setup_google_credentials()

# Define the date for the current run
current_run_date = run_date_override or date.today()
start_date = current_run_date - timedelta(days=config["BIGQUERY_ANALYSIS_PERIOD_DAYS"])
Expand Down Expand Up @@ -117,15 +119,18 @@ def main(run_date_override: date = None):
logger.info(f"Oracle run completed successfully in {execution_time:.2f} seconds")

if slack_notifier:
batch_count = len(transaction_links) if transaction_links else 0
total_processed = len(eligible_indexers)
slack_notifier.send_success_notification(
eligible_indexers=eligible_indexers,
total_processed=total_processed,
execution_time=execution_time,
transaction_links=transaction_links,
batch_count=batch_count,
)
try:
batch_count = len(transaction_links) if transaction_links else 0
total_processed = len(eligible_indexers)
slack_notifier.send_success_notification(
eligible_indexers=eligible_indexers,
total_processed=total_processed,
execution_time=execution_time,
transaction_links=transaction_links,
batch_count=batch_count,
)
except Exception as e:
logger.error(f"Failed to send Slack success notification: {e}", exc_info=True)

except Exception as e:
execution_time = time.time() - start_time
Expand All @@ -138,7 +143,10 @@ def main(run_date_override: date = None):
error_message=str(e), stage=stage, execution_time=execution_time
)
except Exception as slack_e:
logger.error(f"Failed to send Slack failure notification: {slack_e}", exc_info=True)
logger.error(
f"Failed to send Slack failure notification: {slack_e}",
exc_info=True,
)

sys.exit(1)

Expand Down
24 changes: 8 additions & 16 deletions src/utils/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
from pathlib import Path
from typing import Any, Optional

import google.auth
from google.oauth2 import service_account
from google.oauth2.credentials import Credentials

# Handle Python version compatibility for TOML loading
if sys.version_info >= (3, 11):
import tomllib
Expand Down Expand Up @@ -266,7 +270,7 @@ def _validate_config(config: dict[str, Any]) -> dict[str, Any]:
"ETHERSCAN_API_KEY",
"ARBITRUM_API_KEY",
]
missing = [field for field in required if not config.get(field)]
missing = [field for field in required if config.get(field) is None or config.get(field) in ("", [])]
if missing:
raise ConfigurationError(
"Missing required configuration fields in config.toml or environment variables:",
Expand Down Expand Up @@ -350,9 +354,6 @@ def _parse_and_validate_credentials_json(self, creds_env: str) -> dict:

def _setup_user_credentials_from_dict(self, creds_data: dict) -> None:
"""Set up user account credentials directly from a dictionary."""
import google.auth
from google.oauth2.credentials import Credentials

# Try to set up the credentials
try:
credentials = Credentials(
Expand All @@ -367,17 +368,13 @@ def _setup_user_credentials_from_dict(self, creds_data: dict) -> None:
google.auth._default._CREDENTIALS = credentials # type: ignore[attr-defined]
logger.info("Successfully loaded user account credentials from environment variable")

# Clear credentials from memory
finally:
if "creds_data" in locals():
creds_data.clear()
except Exception as e:
# Re-raise SDK exceptions for the caller to handle.
raise e


def _setup_service_account_credentials_from_dict(self, creds_data: dict) -> None:
"""Set up service account credentials directly from a dictionary."""
import google.auth
from google.oauth2 import service_account

# Try to set up the credentials
try:
# Create credentials object directly from dict
Expand All @@ -391,11 +388,6 @@ def _setup_service_account_credentials_from_dict(self, creds_data: dict) -> None
except Exception as e:
raise ValueError(f"Invalid service account credentials: {e}") from e

# Clear the original credentials dict from memory if it exists
finally:
if "creds_data" in locals():
creds_data.clear()


def setup_google_credentials(self) -> None:
"""
Expand Down
Loading