Skip to content

Commit 2845036

Browse files
authored
refactor(SQO): Modularise codebase, create a single source of truth for config, add test framework, review all components. (#3)
* Rename * fix after rename * update * better spacing between functions * refactor config_loader * Create retry_decorator.py * Update slack notifier to use retry decorator * better spacing * Move _validate_required_fields to config_manager * move _load_config_and_return_validated to config manager module as load_and_validate_config * move _get_path_to_project_root to config_manager as get_project_root * move remaining configuration and setup functions to config_manager * Move function to data processor module Move _export_bigquery_data_as_csvs_and_return_lists_of_ineligible_and_eligible_indexers to data processor module as export_bigquery_data_as_csvs_and_return_indexer_lists * Move another function to data processor module Move _clean_old_date_directories to data processor module as clean_old_date_directories * add functions to data processor module * Move blockchain interaction functions to dedicated blockchain client module * Update bigquery_data_access_provider.py * Update scheduler.py * Update config_manager.py * Update service_quality_oracle.py * Update blockchain_client.py * Update helper module after moving functions to their own modules * feat: Overhaul Ruff Settings Update issuance_data_access_helper.py Update ruff_check_format_assets.sh update ruff settings Ruff selective turn off ruff ruff linting linting Update custom_formatter.py Update service_quality_oracle.py Update custom_formatter.py Update retry_decorator.py Update custom_formatter.py Update retry_decorator.py * audit cp * audit cp2 * audit cp3 * audit cp4 * Never try to run the scheduler for more than 7 days at a time on restart. * audit cp5 * ALL CAPS * Update config_loader.py * Update service_quality_oracle.py * Create configuration.py * Consolidate config_manager and config_loader into singular configuration module * rename functions * audit cp6 * rename data_processor to eligibility_pipeline * Minor refactor for efficiency * Update config.toml.example * audit cp7 * audit cp8 * audit cp9 * Update configuration.py * simplify logic * Ruff * Update README.md * Fix CI/CD errors, update workflow, create tests placeholders ruff Update configuration.py Fix CI error * move key validation into _setup_transaction_account
1 parent d50ab59 commit 2845036

28 files changed

+2045
-1752
lines changed

.github/workflows/ci.yml

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -66,22 +66,58 @@ jobs:
6666
cd src
6767
python -c "
6868
import sys; sys.path.insert(0, '..')
69-
from src.utils.config_loader import load_config
69+
from src.utils.configuration import load_config
7070
from src.utils.key_validator import validate_and_format_private_key
7171
print('Core modules import successfully')
7272
"
7373
7474
- name: Validate configuration
7575
run: |
7676
python -c "
77-
import tomli
78-
with open('config.toml.example', 'rb') as f:
79-
config = tomli.load(f)
80-
required = ['bigquery', 'blockchain', 'scheduling', 'secrets']
81-
for section in required:
82-
if section not in config:
83-
raise ValueError(f'Missing section: {section}')
84-
print('Configuration valid')
77+
import sys
78+
import os
79+
80+
# Add project root to path
81+
sys.path.insert(0, '.')
82+
83+
os.environ['BLOCKCHAIN_PRIVATE_KEY'] = '0x' + 'f' * 64
84+
os.environ['SLACK_WEBHOOK_URL'] = 'https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX'
85+
os.environ['STUDIO_API_KEY'] = 'api-key'
86+
os.environ['STUDIO_DEPLOY_KEY'] = 'deploy-key'
87+
os.environ['ARBITRUM_API_KEY'] = 'api-key'
88+
os.environ['ETHERSCAN_API_KEY'] = 'api-key'
89+
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '{}'
90+
91+
from src.utils.configuration import ConfigLoader, _validate_config
92+
93+
print('Validating config.toml.example...')
94+
95+
# Use the example file and run the full validation logic from our application
96+
loader = ConfigLoader(config_path='config.toml.example')
97+
config = loader.get_flat_config()
98+
99+
print('Patching config in-memory with dummy data for validation...')
100+
config_to_validate = config.copy()
101+
config_to_validate.update({
102+
'BIGQUERY_LOCATION_ID': 'dummy-location',
103+
'BIGQUERY_PROJECT_ID': 'dummy-project',
104+
'BIGQUERY_DATASET_ID': 'dummy-dataset',
105+
'BIGQUERY_TABLE_ID': 'dummy-table',
106+
'BLOCKCHAIN_CONTRACT_ADDRESS': '0x' + '0' * 40,
107+
'BLOCKCHAIN_FUNCTION_NAME': 'dummyFunction',
108+
'BLOCKCHAIN_CHAIN_ID': 1,
109+
'BLOCKCHAIN_RPC_URLS': ['http://dummy-rpc.com'],
110+
'SUBGRAPH_URL_PRE_PRODUCTION': 'http://dummy-subgraph.com',
111+
'SUBGRAPH_URL_PRODUCTION': 'http://dummy-subgraph.com',
112+
'SCHEDULED_RUN_TIME': '00:00',
113+
'BATCH_SIZE': 100,
114+
'MAX_AGE_BEFORE_DELETION': 100,
115+
'BIGQUERY_ANALYSIS_PERIOD_DAYS': 100,
116+
})
117+
118+
_validate_config(config_to_validate)
119+
120+
print('config.toml.example is structurally valid.')
85121
"
86122
87123
# =============================================================================

.github/workflows/tests.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ jobs:
3434
run: |
3535
if [ -d "tests" ] && [ "$(find tests -name "test_*.py" -o -name "*_test.py" | wc -l)" -gt 0 ]; then
3636
echo "Running tests"
37-
pytest tests/ -v --cov=src --cov-report=term-missing -p no:ethereum
37+
# Run pytest and allow exit code 5 (no tests found), but fail on any other error
38+
pytest tests/ -v --cov=src --cov-report=term-missing -p no:ethereum || ([ $? -eq 5 ] && echo "Pytest exited with 5 (No tests found), which is expected. Passing." || exit $?)
3839
else
3940
echo "No tests found. Test directory is empty or doesn't contain test files."
4041
echo "Tests will be skipped until test files are added."

Dockerfile

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Dockerfile for Service Quality Oracle
1+
# Dockerfile to create a clean, lightweight Docker Image for the Service Quality Oracle
22

33
# Use Python 3.9 slim as the base image for a lightweight container
44
FROM python:3.9-slim
@@ -10,7 +10,12 @@ LABEL description="Service Quality Oracle" \
1010
# Set working directory
1111
WORKDIR /app
1212

13-
# Set environment variables
13+
14+
# Setup enviroment variables:
15+
# 1. PYTHONDONTWRITEBYTECODE=1 - Prevent python from creating .pyc files
16+
# 2. PYTHONUNBUFFERED=1 - Send logs direct to console without buffering
17+
# 3. PYTHONPATH=/app - Add app directory to python import path
18+
# 4. TZ=UTC - Set timezone to UTC
1419
ENV PYTHONDONTWRITEBYTECODE=1 \
1520
PYTHONUNBUFFERED=1 \
1621
PYTHONPATH=/app \
@@ -40,18 +45,17 @@ COPY contracts/ ./contracts/
4045
COPY .gitignore ./
4146
COPY pyproject.toml ./
4247

43-
# Copy the scheduler to the root directory
44-
COPY src/models/scheduler.py ./
45-
4648
# Create healthcheck file
4749
RUN touch /app/healthcheck
4850

4951
# Use Tini as entrypoint for proper signal handling
5052
ENTRYPOINT ["/usr/bin/tini", "--"]
5153

52-
# Add healthcheck to verify the service is running
53-
HEALTHCHECK --interval=5m --timeout=30s --start-period=1m --retries=3 \
54-
CMD python -c "import os, time; assert os.path.exists('/app/healthcheck') and time.time() - os.path.getmtime('/app/healthcheck') < 3600, 'Healthcheck failed'" || exit 1
54+
# Add healthcheck to verify the service is running.
55+
# The scheduler updates the healthcheck file every minute.
56+
# We check every 2 minutes and assert the file was modified in the last 5 minutes (300s).
57+
HEALTHCHECK --interval=2m --timeout=30s --start-period=1m --retries=3 \
58+
CMD python -c "import os, time; assert os.path.exists('/app/healthcheck') and time.time() - os.path.getmtime('/app/healthcheck') < 300, 'Healthcheck failed'" || exit 1
5559

56-
# Run the scheduler
57-
CMD ["python", "scheduler.py"]
60+
# Run the scheduler as a module
61+
CMD ["python", "-m", "src.models.scheduler"]

README.md

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,19 @@ Please refer to the [ELIGIBILITY_CRITERIA.md](./ELIGIBILITY_CRITERIA.md) file to
4747

4848
## Data Flow
4949

50-
The application follows this data flow:
50+
The application follows a clear data flow, managed by a daily scheduler:
5151

52-
1. **BigQuery Data Acquisition**: The `bigquery_fetch_and_save_indexer_issuance_eligibility_data_finally_return_eligible_indexers` function in `issuance_data_access_helper.py` fetches fresh data from BigQuery, processes it to determine eligibility, and returns the eligibility data list that would then be posted on chain.
53-
- This function also ensures that data is saved to local files in dated directories for auditing/historical reference over the data retention period.
52+
1. **Scheduler (`scheduler.py`)**: This is the main entry point. It runs on a schedule (e.g., daily), manages the application lifecycle, and triggers the oracle run. It is also responsible for catching up on any missed runs.
5453

55-
2. **Blockchain Publication**: The eligible indexers list from step 1 is directly posted on-chain to a smart contract. Batching of transactions is performed if necessary.
54+
2. **Orchestrator (`service_quality_oracle.py`)**: For each run, this module orchestrates the end-to-end process by coordinating the other components.
55+
56+
3. **Data Fetching (`bigquery_data_access_provider.py`)**: The orchestrator calls this provider to execute a configurable SQL query against Google BigQuery, fetching the raw indexer performance data.
57+
58+
4. **Data Processing (`eligibility_pipeline.py`)**: The raw data is passed to this module, which processes it, filters for eligible and ineligible indexers, and generates CSV artifacts for auditing and record-keeping.
59+
60+
5. **Blockchain Submission (`blockchain_client.py`)**: The orchestrator takes the final list of eligible indexers and passes it to this client, which handles the complexities of batching, signing, and sending the transaction to the blockchain via RPC providers with built-in failover.
61+
62+
6. **Notifications (`slack_notifier.py`)**: Throughout the process, status updates (success, failure, warnings) are sent to Slack.
5663

5764
## CI/CD Pipeline
5865

config.toml.example

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@
66
# =============================================================================
77

88
[bigquery]
9-
BIGQUERY_LOCATION_ID = ""
10-
BIGQUERY_PROJECT_ID = ""
11-
BIGQUERY_DATASET_ID = ""
9+
BIGQUERY_LOCATION_ID = "US"
10+
BIGQUERY_PROJECT_ID = "graph-mainnet"
11+
BIGQUERY_DATASET_ID = "internal_metrics"
12+
BIGQUERY_TABLE_ID = "metrics_indexer_attempts"
1213

1314
[blockchain]
1415
BLOCKCHAIN_CONTRACT_ADDRESS = ""
@@ -20,6 +21,8 @@ BLOCKCHAIN_RPC_URLS = [
2021
"",
2122
""
2223
]
24+
BLOCK_EXPLORER_URL = "https://sepolia.arbiscan.io"
25+
TX_TIMEOUT_SECONDS = "30"
2326

2427
[scheduling]
2528
SCHEDULED_RUN_TIME = "10:00"
@@ -31,6 +34,13 @@ SUBGRAPH_URL_PRODUCTION = ""
3134
[processing]
3235
BATCH_SIZE = 125
3336
MAX_AGE_BEFORE_DELETION = 120
37+
BIGQUERY_ANALYSIS_PERIOD_DAYS = "28"
38+
39+
[eligibility_criteria]
40+
MIN_ONLINE_DAYS = "5"
41+
MIN_SUBGRAPHS = "10"
42+
MAX_LATENCY_MS = "5000"
43+
MAX_BLOCKS_BEHIND = "50000"
3444

3545
# =============================================================================
3646
# SENSITIVE CONFIGURATION

docker-compose.yml

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,14 @@
11
services:
2+
# Service Quality Oracle container
23
service-quality-oracle:
4+
5+
# Build the image from the Dockerfile in the current directory
36
build: .
7+
8+
# Set the container name
49
container_name: service-quality-oracle
10+
11+
# Set the image name
512
image: service-quality-oracle:latest
613

714
volumes:
@@ -16,9 +23,7 @@ services:
1623
- ./credentials.json:/app/credentials.json:ro
1724

1825
environment:
19-
- PYTHONPATH=/app
2026
- RUN_ON_STARTUP=true
21-
- TZ=UTC
2227

2328
# Setup enviroment variables
2429
# Environment variables go into process memory for this specific container only
@@ -45,15 +50,18 @@ services:
4550
reservations:
4651
memory: 512M
4752

53+
# Restart policy
4854
restart: unless-stopped
4955

56+
# Healthcheck to ensure the container is running
5057
healthcheck:
5158
test: ["CMD", "python", "-c", "import os, time; assert os.path.exists('/app/healthcheck') and time.time() - os.path.getmtime('/app/healthcheck') < 3600, 'Healthcheck failed'"]
5259
interval: 5m
5360
timeout: 30s
5461
retries: 3
5562
start_period: 1m
5663

64+
# Prevent log files from growing indefinitely and consuming disk space
5765
logging:
5866
driver: "json-file"
5967
options:

pyproject.toml

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,13 @@ target-version = "py39"
1212
fix = true
1313
fix-only = false
1414

15+
[tool.ruff.format]
16+
# Format SQL code in strings/docstrings
17+
docstring-code-format = false
18+
quote-style = "double"
19+
indent-style = "space"
20+
line-ending = "lf"
21+
1522
[tool.ruff.lint]
1623
# Enable rules including isort (I) for import sorting and additional fixes
1724
select = ["E", "W", "F", "I"]
@@ -34,7 +41,7 @@ dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$"
3441
[tool.ruff.lint.per-file-ignores]
3542
# Ignore E402 (import not at top) in scripts and specific modules
3643
"scripts/test_*.py" = ["E402"]
37-
"src/models/issuance_eligibility_oracle_core.py" = ["E402"]
44+
"src/models/service_quality_oracle.py" = ["E402"]
3845

3946
# Use unsafe fixes to address typing and other modernization issues
4047
[tool.ruff.lint.isort]
@@ -44,12 +51,6 @@ known-first-party = ["src"]
4451
# Unlike Flake8, default to a complexity level of 10.
4552
max-complexity = 10
4653

47-
[tool.ruff.format]
48-
# Format SQL code in strings/docstrings
49-
docstring-code-format = true
50-
quote-style = "double"
51-
indent-style = "space"
52-
5354
[tool.mypy]
5455
ignore_missing_imports = true
5556
no_strict_optional = true

scripts/custom_formatter.py

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
#!/usr/bin/env python3
2+
# -*- coding: utf-8 -*-
3+
import argparse
4+
import ast
5+
import sys
6+
from pathlib import Path
7+
8+
9+
class PythonFormatter:
10+
11+
def __init__(self, source_code: str):
12+
self.source_lines = source_code.splitlines()
13+
self.tree = ast.parse(source_code)
14+
self.node_parents = {
15+
child: parent for parent in ast.walk(self.tree) for child in ast.iter_child_nodes(parent)
16+
}
17+
self.disabled_ranges = self._find_disabled_ranges()
18+
19+
20+
def _find_disabled_ranges(self):
21+
ranges = []
22+
in_disabled_block = False
23+
start_line = 0
24+
for i, line in enumerate(self.source_lines):
25+
if "# fmt: off" in line:
26+
in_disabled_block = True
27+
start_line = i + 1
28+
elif "# fmt: on" in line:
29+
if in_disabled_block:
30+
ranges.append((start_line, i + 1))
31+
in_disabled_block = False
32+
return ranges
33+
34+
35+
def _is_in_disabled_range(self, lineno):
36+
for start, end in self.disabled_ranges:
37+
if start <= lineno <= end:
38+
return True
39+
return False
40+
41+
42+
def get_node_start_line(self, node):
43+
if node.decorator_list:
44+
return node.decorator_list[0].lineno
45+
return node.lineno
46+
47+
48+
def is_method(self, node) -> bool:
49+
return isinstance(self.node_parents.get(node), ast.ClassDef)
50+
51+
52+
def format(self) -> str:
53+
nodes = {}
54+
for node in ast.walk(self.tree):
55+
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
56+
start_line = self.get_node_start_line(node)
57+
nodes[start_line] = node
58+
59+
lines = list(self.source_lines)
60+
sorted_nodes = sorted(nodes.items(), key=lambda x: x[0], reverse=True)
61+
62+
for lineno, node in sorted_nodes:
63+
start_index = lineno - 1
64+
num_blank_lines = 0
65+
66+
# Skip formatting if node is inside a "fmt: off" block
67+
if self._is_in_disabled_range(lineno):
68+
continue
69+
70+
if isinstance(node, ast.ClassDef):
71+
num_blank_lines = 2
72+
elif isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
73+
if self.is_method(node):
74+
if node.name == "__init__":
75+
num_blank_lines = 1
76+
else:
77+
num_blank_lines = 2
78+
else:
79+
num_blank_lines = 2
80+
81+
i = start_index - 1
82+
while i > 0 and not lines[i].strip():
83+
i -= 1
84+
85+
if i < 0: # start of file
86+
i = -1 # will insert at 0
87+
88+
# For top-level nodes, we don't want to add spaces if it's the first thing in the file
89+
# after imports. Let's check if there's anything but imports above.
90+
is_truly_top_level = i == -1
91+
if not is_truly_top_level:
92+
# Count existing blank lines
93+
existing_blank_lines = 0
94+
for k in range(start_index - 1, i, -1):
95+
if not lines[k].strip():
96+
existing_blank_lines += 1
97+
98+
# Only add lines if there are not enough
99+
if existing_blank_lines < num_blank_lines:
100+
# remove existing blank lines
101+
del lines[i + 1 : start_index]
102+
# insert new blank lines
103+
for _ in range(num_blank_lines):
104+
lines.insert(i + 1, "")
105+
106+
result = "\n".join(line.rstrip() for line in lines)
107+
if result:
108+
result = result.strip() + "\n"
109+
110+
return result
111+
112+
113+
def main():
114+
parser = argparse.ArgumentParser(description="Python custom formatter.")
115+
parser.add_argument("files", nargs="+", type=Path)
116+
args = parser.parse_args()
117+
118+
for path in args.files:
119+
try:
120+
source = path.read_text()
121+
# Skip empty files
122+
if not source.strip():
123+
continue
124+
formatter = PythonFormatter(source)
125+
formatted_source = formatter.format()
126+
path.write_text(formatted_source)
127+
print(f"Formatted {path}")
128+
except Exception as e:
129+
print(f"Could not format {path}: {e}", file=sys.stderr)
130+
131+
132+
if __name__ == "__main__":
133+
main()

0 commit comments

Comments
 (0)