Skip to content

Commit 68ba132

Browse files
authored
chore(SQO): Create unit tests. (#5)
* Detailed TODO list for tests Ruff Update test_subgraph_data_access_provider.py * Create test suite for the blockchain client module Ruff * Create test suite for the bigquery data provider test module Ruff * rename bigquery_data_access_provider to bigquery_provider and update refs Ruff * Create test suite for the eligibility pipeline Ruff * Create test suite for the scheduler Ruff * Create test suite for the core oracle orchestrator module.py Ruff * Create test suite for the config module * Create test suite for key validator * Create test suite for retry decorator * Create test suite for retry decorator Ruff * Create test suite for slack notifier * Update test_retry_decorator.py * Update service_quality_oracle.py * Create __init__.py * Update blockchain_client.py * fixes from tests * full test suite update. All now pass. Update pyproject.toml Update pyproject.toml * Improve BigQuery Provider test module fix tests bigquery provider + ruff Update test_bigquery_provider.py Update test_bigquery_provider.py Update test_bigquery_provider.py Update test_bigquery_provider.py Update tests for higher coverage Update .gitignore Update .gitignore * Improve Blockchain Client test module fix failing tests * Improve configuration and key validator test modules * Update ELIGIBILITY_CRITERIA.md * Improve configuration and eligibility pipeline tests * Improve unit tests for the scheduler module * Improve unit tests for the service quality oracle core module * Ruff
1 parent 5a6ea0a commit 68ba132

24 files changed

+3531
-133
lines changed

.gitignore

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,8 @@ subgraph/
1515
contracts/contract.abi.json
1616

1717
# Ignore Ruff
18-
.ruff_cache/
18+
.ruff_cache/
19+
/tests/__pycache__
20+
/src/utils/__pycache__
21+
/src/models/__pycache__
22+
.coverage

ELIGIBILITY_CRITERIA.md

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@
22

33
We will announce changes to the eligibility criteria in the table below. Once the change goes live then it will be reflected in the eligibility criteria section of this document.
44

5-
| Upcoming Requirement | Justification | Date Introduced (YYYY-MM-DD)|
6-
|----------------------|---------------|-----------------------------|
5+
| Upcoming Requirement | Justification | Date Updated/Introduced (YYYY-MM-DD)|
6+
|----------------------|---------------|-------------------------------------|
77
| **Requirement 1:** | This is a placeholder for future criteria, watch this space to stay informed. We will also announce any upcoming requirements via our existing official channels. | YYYY-MM-DD |
88

9-
> **Note**:
9+
> **Note**:
1010
>
11-
> When announcing new eligibility criteria we will allow a window for indexers to prepare their infrastructure before any new criteria goes live, refer to the date introduced column to see when new criteria will merge.
11+
> When announcing new eligibility criteria we will allow a window for indexers to prepare their infrastructure before any new/updated criteria goes live, refer to the `Date Updated/Introduced (YYYY-MM-DD)` column to see when upcoming eligibility criteria will merge.
1212
1313
# Eligibility Criteria
1414

@@ -22,9 +22,9 @@ The Service Quality Oracle determines which indexers are eligible to receive ind
2222
3. The query was served <50,000 blocks behind chainhead.
2323
4. The subgraph had at least 500 GRT in curation signal at the time that the query was served.
2424

25-
> **Note**:
25+
> **Note**:
2626
>
27-
> All four quality criteria must be satisfied simultaneously for a query to count towards the daily requirement.
27+
> All four quality criteria must be satisfied simultaneously for a query to count towards the daily requirement.
2828
>
2929
> The above query criteria must be satisfied on 10+ subgraphs per day, for 5+ days in any given 28 day rolling window.
3030
>
@@ -34,9 +34,9 @@ The Service Quality Oracle determines which indexers are eligible to receive ind
3434
3535

3636

37-
| Requirement | Justification |
38-
|-------------|---------------|
39-
| **Query Status:** The query must have a `200 OK` HTTP response status indicating query success | Indexer infrastructure needs to be capable of serving successful queries to benefit data consumers. |
40-
| **Query Latency:** The query response must be delivered to the gateway in `< 5,000 ms` | Fast query responses are important to data consumers. |
41-
| **Query Freshness:** The query must be served from a subgraph that is `< 50,000 blocks` behind chainhead | Data needs to be fresh to be useful to data consumers. |
42-
| **Subgraph Signal:** The subgraph needs to have `≥ 500 GRT` in curation signal at the time when the query was served. | Indexers are encouraged to serve data on subgraphs that have curation signal. This also creates an economic barrier against those that prefer to game the system. |
37+
| Requirement | Justification | Date Updated/Introduced (YYYY-MM-DD)|
38+
|-------------|---------------|-------------------------------------|
39+
| **Query Status:** The query must have a `200 OK` HTTP response status indicating query success | Indexer infrastructure needs to be capable of serving successful queries to benefit data consumers. | TBD (at genesis of the SQO) |
40+
| **Query Latency:** The query response must be delivered to the gateway in `< 5,000 ms` | Fast query responses are important to data consumers. | TBD (at genesis of the SQO) |
41+
| **Query Freshness:** The query must be served from a subgraph that is `< 50,000 blocks` behind chainhead | Data needs to be fresh to be useful to data consumers. | TBD (at genesis of the SQO) |
42+
| **Subgraph Signal:** The subgraph needs to have `≥ 500 GRT` in curation signal at the time when the query was served. | Indexers are encouraged to serve data on subgraphs that have curation signal. This also creates an economic barrier against those that prefer to game the system. | TBD (at genesis of the SQO) |

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ The application follows a clear data flow, managed by a daily scheduler:
5353

5454
2. **Orchestrator (`service_quality_oracle.py`)**: For each run, this module orchestrates the end-to-end process by coordinating the other components.
5555

56-
3. **Data Fetching (`bigquery_data_access_provider.py`)**: The orchestrator calls this provider to execute a configurable SQL query against Google BigQuery, fetching the raw indexer performance data.
56+
3. **Data Fetching (`bigquery_provider.py`)**: The orchestrator calls this provider to execute a configurable SQL query against Google BigQuery, fetching the raw indexer performance data.
5757

5858
4. **Data Processing (`eligibility_pipeline.py`)**: The raw data is passed to this module, which processes it, filters for eligible and ineligible indexers, and generates CSV artifacts for auditing and record-keeping.
5959

pyproject.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,3 +55,10 @@ max-complexity = 10
5555
ignore_missing_imports = true
5656
no_strict_optional = true
5757
explicit_package_bases = true
58+
59+
[tool.pytest.ini_options]
60+
minversion = "6.0"
61+
addopts = "--cov=src --cov-report=term-missing -v"
62+
testpaths = ["tests"]
63+
python_files = "test_*.py"
64+
pythonpath = ["."]

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ aiohttp>=3.9.0 # For async HTTP requests (used by web3)
3434
pytest>=8.0.0
3535
pytest-cov>=6.0.0
3636
pytest-mock>=3.0.0
37+
pytest-snapshot>=0.9.0
3738
mypy>=1.0.0
3839
types-pytz # Type stubs for pytz
3940
types-requests # Type stubs for requests

src/models/__init__.py

Whitespace-only changes.

src/models/bigquery_data_access_provider.py renamed to src/models/bigquery_provider.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ def fetch_indexer_issuance_eligibility_data(self, start_date: date, end_date: da
185185
- eligible_for_indexing_rewards: Whether the indexer is eligible for indexing rewards.
186186
"""
187187
# Construct the query
188-
query = self._get_indexer_eligibility_query(start_date, end_date)
188+
query = self._get_indexer_eligibility_query(start_date=start_date, end_date=end_date)
189+
189190
# Return the results df
190191
return self._read_gbq_dataframe(query)

src/models/blockchain_client.py

Lines changed: 7 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -316,15 +316,15 @@ def _determine_transaction_nonce(self, sender_address: ChecksumAddress, replace:
316316
return nonce
317317

318318

319-
def _get_gas_prices(self, replace: bool) -> Tuple[int, int]:
319+
def _get_gas_prices(self) -> Tuple[int, int]:
320320
"""Get base fee and max priority fee for transaction."""
321321
# Get current gas prices with detailed logging
322322
try:
323323
latest_block_data = self._execute_rpc_call(self.w3.eth.get_block, "latest")
324324
latest_block = cast(BlockData, latest_block_data)
325325
base_fee_hex = latest_block["baseFeePerGas"]
326-
base_fee = int(base_fee_hex)
327-
logger.info(f"Latest block base fee: {base_fee/1e9:.2f} gwei")
326+
base_fee = int(base_fee_hex) if isinstance(base_fee_hex, int) else int(str(base_fee_hex), 16)
327+
logger.info(f"Latest block base fee: {base_fee / 1e9:.2f} gwei")
328328

329329
# If the base fee cannot be retrieved, use a fallback value
330330
except Exception as e:
@@ -334,7 +334,7 @@ def _get_gas_prices(self, replace: bool) -> Tuple[int, int]:
334334
# Try to get the max priority fee
335335
try:
336336
max_priority_fee = self._execute_rpc_call(lambda: self.w3.eth.max_priority_fee)
337-
logger.info(f"Max priority fee: {max_priority_fee/1e9:.2f} gwei")
337+
logger.info(f"Max priority fee: {max_priority_fee / 1e9:.2f} gwei")
338338

339339
# If the max priority fee cannot be retrieved, use a fallback value
340340
except Exception as e:
@@ -364,15 +364,15 @@ def _build_transaction_params(
364364
max_priority_fee_per_gas = max_priority_fee * 2
365365
tx_params["maxFeePerGas"] = max_fee_per_gas
366366
tx_params["maxPriorityFeePerGas"] = max_priority_fee_per_gas
367-
logger.info(f"High gas for replacement: {max_fee_per_gas/1e9:.2f} gwei")
367+
logger.info(f"High gas for replacement: {max_fee_per_gas / 1e9:.2f} gwei")
368368

369369
# If we are not replacing a pending transaction, use a lower gas price
370370
else:
371371
max_fee_per_gas = base_fee * 2 + max_priority_fee
372372
max_priority_fee_per_gas = max_priority_fee
373373
tx_params["maxFeePerGas"] = max_fee_per_gas
374374
tx_params["maxPriorityFeePerGas"] = max_priority_fee_per_gas
375-
logger.info(f"Standard gas: {max_fee_per_gas/1e9:.2f} gwei")
375+
logger.info(f"Standard gas: {max_fee_per_gas / 1e9:.2f} gwei")
376376

377377
logger.info(f"Transaction parameters: nonce={nonce}, gas={gas_limit}, chain_id={chain_id}")
378378
return tx_params
@@ -400,29 +400,6 @@ def _build_and_sign_transaction(
400400
raise
401401

402402

403-
def _handle_transaction_error(self, error_msg: str) -> None:
404-
"""Handle and log specific transaction error types."""
405-
# If the error message contains "insufficient funds", log the error
406-
if "insufficient funds" in error_msg.lower():
407-
logger.error("Insufficient funds to pay for gas")
408-
409-
# If the error message contains "nonce too low", log the error
410-
elif "nonce too low" in error_msg.lower():
411-
logger.error("Nonce is too low - transaction may have already been sent")
412-
413-
# If the error message contains "nonce too high", log the error
414-
elif "nonce too high" in error_msg.lower():
415-
logger.error("Nonce is too high - there may be pending transactions")
416-
417-
# If the error message contains "gas", log the error
418-
elif "gas" in error_msg.lower():
419-
logger.error("Gas-related issue - transaction may consume too much gas")
420-
421-
# If the error message contains "400", log the error
422-
elif "400" in error_msg:
423-
logger.error("HTTP 400 Bad Request - RPC provider rejected the request")
424-
425-
426403
def _send_signed_transaction(self, signed_tx: Any) -> str:
427404
"""
428405
Send a signed transaction and wait for the receipt.
@@ -531,7 +508,7 @@ def _execute_complete_transaction(self, params: Dict) -> str:
531508
nonce = self._determine_transaction_nonce(sender_address, replace)
532509

533510
# 5. Get gas prices
534-
base_fee, max_priority_fee = self._get_gas_prices(replace)
511+
base_fee, max_priority_fee = self._get_gas_prices()
535512

536513
# 6. Build transaction parameters
537514
tx_params = self._build_transaction_params(

src/models/eligibility_pipeline.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,16 +48,20 @@ def process(self, input_data_from_bigquery: pd.DataFrame, current_date: date) ->
4848
required_cols = ["indexer", "eligible_for_indexing_rewards"]
4949
self.validate_dataframe_structure(input_data_from_bigquery, required_cols)
5050

51+
# Make a copy to avoid modifying the original DataFrame and prevent SettingWithCopyWarning
52+
processed_df = input_data_from_bigquery.copy()
53+
54+
# Coerce eligibility column to numeric, treating errors (e.g., non-numeric values) as NaN, then fill with 0
55+
processed_df["eligible_for_indexing_rewards"] = pd.to_numeric(
56+
processed_df["eligible_for_indexing_rewards"], errors="coerce"
57+
).fillna(0)
58+
5159
# 2. Filter data into eligible and ineligible groups
52-
eligible_df = input_data_from_bigquery[
53-
input_data_from_bigquery["eligible_for_indexing_rewards"] == 1
54-
].copy()
60+
eligible_df = processed_df[processed_df["eligible_for_indexing_rewards"] == 1].copy()
5561

56-
ineligible_df = input_data_from_bigquery[
57-
input_data_from_bigquery["eligible_for_indexing_rewards"] == 0
58-
].copy()
62+
ineligible_df = processed_df[processed_df["eligible_for_indexing_rewards"] != 1].copy()
5963

60-
# 3. Generate and save files
64+
# 3. Generate and save files, ensuring the original data is used for the raw artifact
6165
output_date_dir = self.get_date_output_directory(current_date)
6266
self._generate_files(input_data_from_bigquery, eligible_df, ineligible_df, output_date_dir)
6367

@@ -106,6 +110,10 @@ def clean_old_date_directories(self, max_age_before_deletion: int) -> None:
106110
Args:
107111
max_age_before_deletion: Maximum age in days before deleting data output
108112
"""
113+
if max_age_before_deletion < 0:
114+
logger.info("Negative max_age_before_deletion provided; no directories will be removed.")
115+
return
116+
109117
today = date.today()
110118

111119
# Check if the output directory exists
@@ -175,7 +183,7 @@ def validate_dataframe_structure(self, df: pd.DataFrame, required_columns: List[
175183
# If any required columns are missing, raise an error
176184
if missing_columns:
177185
raise ValueError(
178-
f"DataFrame missing required columns: {missing_columns}. " f"Found columns: {list(df.columns)}"
186+
f"DataFrame missing required columns: {missing_columns}. Found columns: {list(df.columns)}"
179187
)
180188

181189
# If all required columns are present, return True

src/models/scheduler.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ def update_healthcheck(self, message=None):
8989
before_sleep=lambda retry_state: logger.warning(
9090
f"Retry attempt {retry_state.attempt_number} after error: {retry_state.outcome.exception()}"
9191
),
92+
reraise=True,
9293
)
9394
def run_oracle(self, run_date_override=None):
9495
"""
@@ -190,6 +191,11 @@ def initialize(self):
190191

191192
except Exception as e:
192193
logger.error(f"Failed to initialize scheduler: {e}", exc_info=True)
194+
if not self.slack_notifier:
195+
webhook_url = os.environ.get("SLACK_WEBHOOK_URL")
196+
if webhook_url:
197+
self.slack_notifier = create_slack_notifier(webhook_url)
198+
193199
if self.slack_notifier:
194200
self.slack_notifier.send_failure_notification(
195201
error_message=str(e), stage="Scheduler Initialization", execution_time=0

src/models/service_quality_oracle.py

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,14 @@
1313
from datetime import date, timedelta
1414
from pathlib import Path
1515

16-
# Add project root to path
17-
project_root_path = Path(__file__).resolve().parents[2]
18-
sys.path.insert(0, str(project_root_path))
19-
2016
# Import data access utilities with absolute import
21-
from src.models.bigquery_data_access_provider import BigQueryProvider
17+
from src.models.bigquery_provider import BigQueryProvider
2218
from src.models.blockchain_client import BlockchainClient
2319
from src.models.eligibility_pipeline import EligibilityPipeline
24-
from src.utils.configuration import credential_manager, load_config
20+
from src.utils.configuration import (
21+
credential_manager,
22+
load_config,
23+
)
2524
from src.utils.slack_notifier import create_slack_notifier
2625

2726
# Set up basic logging
@@ -42,19 +41,22 @@ def main(run_date_override: date = None):
4241
run_date_override: If provided, use this date for the run instead of today.
4342
"""
4443
start_time = time.time()
45-
slack_notifier = None
4644
stage = "Initialization"
45+
project_root_path = Path(__file__).resolve().parents[2]
46+
slack_notifier = None
4747

4848
try:
4949
# Configuration and credentials
50-
credential_manager.setup_google_credentials()
5150
config = load_config()
5251
slack_notifier = create_slack_notifier(config.get("SLACK_WEBHOOK_URL"))
52+
5353
if slack_notifier:
5454
logger.info("Slack notifications enabled")
5555
else:
5656
logger.info("Slack notifications disabled (no webhook URL configured)")
5757

58+
credential_manager.setup_google_credentials()
59+
5860
# Define the date for the current run
5961
current_run_date = run_date_override or date.today()
6062
start_date = current_run_date - timedelta(days=config["BIGQUERY_ANALYSIS_PERIOD_DAYS"])
@@ -117,15 +119,18 @@ def main(run_date_override: date = None):
117119
logger.info(f"Oracle run completed successfully in {execution_time:.2f} seconds")
118120

119121
if slack_notifier:
120-
batch_count = len(transaction_links) if transaction_links else 0
121-
total_processed = len(eligible_indexers)
122-
slack_notifier.send_success_notification(
123-
eligible_indexers=eligible_indexers,
124-
total_processed=total_processed,
125-
execution_time=execution_time,
126-
transaction_links=transaction_links,
127-
batch_count=batch_count,
128-
)
122+
try:
123+
batch_count = len(transaction_links) if transaction_links else 0
124+
total_processed = len(eligible_indexers)
125+
slack_notifier.send_success_notification(
126+
eligible_indexers=eligible_indexers,
127+
total_processed=total_processed,
128+
execution_time=execution_time,
129+
transaction_links=transaction_links,
130+
batch_count=batch_count,
131+
)
132+
except Exception as e:
133+
logger.error(f"Failed to send Slack success notification: {e}", exc_info=True)
129134

130135
except Exception as e:
131136
execution_time = time.time() - start_time
@@ -138,7 +143,10 @@ def main(run_date_override: date = None):
138143
error_message=str(e), stage=stage, execution_time=execution_time
139144
)
140145
except Exception as slack_e:
141-
logger.error(f"Failed to send Slack failure notification: {slack_e}", exc_info=True)
146+
logger.error(
147+
f"Failed to send Slack failure notification: {slack_e}",
148+
exc_info=True,
149+
)
142150

143151
sys.exit(1)
144152

src/utils/configuration.py

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@
1111
from pathlib import Path
1212
from typing import Any, Optional
1313

14+
import google.auth
15+
from google.oauth2 import service_account
16+
from google.oauth2.credentials import Credentials
17+
1418
# Handle Python version compatibility for TOML loading
1519
if sys.version_info >= (3, 11):
1620
import tomllib
@@ -266,7 +270,7 @@ def _validate_config(config: dict[str, Any]) -> dict[str, Any]:
266270
"ETHERSCAN_API_KEY",
267271
"ARBITRUM_API_KEY",
268272
]
269-
missing = [field for field in required if not config.get(field)]
273+
missing = [field for field in required if config.get(field) is None or config.get(field) in ("", [])]
270274
if missing:
271275
raise ConfigurationError(
272276
"Missing required configuration fields in config.toml or environment variables:",
@@ -350,9 +354,6 @@ def _parse_and_validate_credentials_json(self, creds_env: str) -> dict:
350354

351355
def _setup_user_credentials_from_dict(self, creds_data: dict) -> None:
352356
"""Set up user account credentials directly from a dictionary."""
353-
import google.auth
354-
from google.oauth2.credentials import Credentials
355-
356357
# Try to set up the credentials
357358
try:
358359
credentials = Credentials(
@@ -367,17 +368,13 @@ def _setup_user_credentials_from_dict(self, creds_data: dict) -> None:
367368
google.auth._default._CREDENTIALS = credentials # type: ignore[attr-defined]
368369
logger.info("Successfully loaded user account credentials from environment variable")
369370

370-
# Clear credentials from memory
371-
finally:
372-
if "creds_data" in locals():
373-
creds_data.clear()
371+
except Exception as e:
372+
# Re-raise SDK exceptions for the caller to handle.
373+
raise e
374374

375375

376376
def _setup_service_account_credentials_from_dict(self, creds_data: dict) -> None:
377377
"""Set up service account credentials directly from a dictionary."""
378-
import google.auth
379-
from google.oauth2 import service_account
380-
381378
# Try to set up the credentials
382379
try:
383380
# Create credentials object directly from dict
@@ -391,11 +388,6 @@ def _setup_service_account_credentials_from_dict(self, creds_data: dict) -> None
391388
except Exception as e:
392389
raise ValueError(f"Invalid service account credentials: {e}") from e
393390

394-
# Clear the original credentials dict from memory if it exists
395-
finally:
396-
if "creds_data" in locals():
397-
creds_data.clear()
398-
399391

400392
def setup_google_credentials(self) -> None:
401393
"""

0 commit comments

Comments
 (0)