Skip to content

refactor(SQO): Modularise codebase, create a single source of truth for config, add test framework, review all components. #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 48 commits into from
Jun 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
7bfaff7
Rename
MoonBoi9001 Jun 9, 2025
84b2025
fix after rename
MoonBoi9001 Jun 9, 2025
9a3b627
update
MoonBoi9001 Jun 9, 2025
448e4d3
better spacing between functions
MoonBoi9001 Jun 9, 2025
c54f7ac
refactor config_loader
MoonBoi9001 Jun 9, 2025
ba08fc1
Create retry_decorator.py
MoonBoi9001 Jun 9, 2025
dd68075
Update slack notifier to use retry decorator
MoonBoi9001 Jun 9, 2025
91e2add
better spacing
MoonBoi9001 Jun 9, 2025
174b8d2
Move _validate_required_fields to config_manager
MoonBoi9001 Jun 9, 2025
0f276ce
move _load_config_and_return_validated to config manager module as lo…
MoonBoi9001 Jun 9, 2025
b89b271
move _get_path_to_project_root to config_manager as get_project_root
MoonBoi9001 Jun 9, 2025
86306f7
move remaining configuration and setup functions to config_manager
MoonBoi9001 Jun 9, 2025
6deab53
Move function to data processor module
MoonBoi9001 Jun 9, 2025
61459d3
Move another function to data processor module
MoonBoi9001 Jun 9, 2025
1f59819
add functions to data processor module
MoonBoi9001 Jun 9, 2025
c16d00a
Move blockchain interaction functions to dedicated blockchain client …
MoonBoi9001 Jun 9, 2025
011c71b
Update bigquery_data_access_provider.py
MoonBoi9001 Jun 9, 2025
e4936ab
Update scheduler.py
MoonBoi9001 Jun 9, 2025
343924e
Update config_manager.py
MoonBoi9001 Jun 9, 2025
05d14c0
Update service_quality_oracle.py
MoonBoi9001 Jun 9, 2025
94e5295
Update blockchain_client.py
MoonBoi9001 Jun 9, 2025
a48461a
Update helper module after moving functions to their own modules
MoonBoi9001 Jun 9, 2025
388a538
feat: Overhaul Ruff Settings
MoonBoi9001 Jun 9, 2025
78c0b20
audit cp
MoonBoi9001 Jun 10, 2025
aa310b8
audit cp2
MoonBoi9001 Jun 10, 2025
e057fdc
audit cp3
MoonBoi9001 Jun 10, 2025
bcc2507
audit cp4
MoonBoi9001 Jun 10, 2025
2716d28
Never try to run the scheduler for more than 7 days at a time on rest…
MoonBoi9001 Jun 10, 2025
5320a4f
audit cp5
MoonBoi9001 Jun 10, 2025
1b60863
ALL CAPS
MoonBoi9001 Jun 10, 2025
2b2134d
Update config_loader.py
MoonBoi9001 Jun 10, 2025
f4c1f4d
Update service_quality_oracle.py
MoonBoi9001 Jun 10, 2025
7e12baa
Create configuration.py
MoonBoi9001 Jun 10, 2025
e6d8730
Consolidate config_manager and config_loader into singular configurat…
MoonBoi9001 Jun 10, 2025
f3a93e2
rename functions
MoonBoi9001 Jun 10, 2025
eb64bc1
audit cp6
MoonBoi9001 Jun 10, 2025
9327598
rename data_processor to eligibility_pipeline
MoonBoi9001 Jun 10, 2025
8e5c844
Minor refactor for efficiency
MoonBoi9001 Jun 10, 2025
522c97a
Update config.toml.example
MoonBoi9001 Jun 10, 2025
57910ba
audit cp7
MoonBoi9001 Jun 10, 2025
48ebc2c
audit cp8
MoonBoi9001 Jun 10, 2025
4cbb60f
audit cp9
MoonBoi9001 Jun 10, 2025
ac34363
Update configuration.py
MoonBoi9001 Jun 10, 2025
fbd02f8
simplify logic
MoonBoi9001 Jun 10, 2025
23a1bdd
Ruff
MoonBoi9001 Jun 10, 2025
b862dd5
Update README.md
MoonBoi9001 Jun 10, 2025
4968619
Fix CI/CD errors, update workflow, create tests placeholders
MoonBoi9001 Jun 10, 2025
617a008
move key validation into _setup_transaction_account
MoonBoi9001 Jun 11, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 45 additions & 9 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,22 +66,58 @@ jobs:
cd src
python -c "
import sys; sys.path.insert(0, '..')
from src.utils.config_loader import load_config
from src.utils.configuration import load_config
from src.utils.key_validator import validate_and_format_private_key
print('Core modules import successfully')
"

- name: Validate configuration
run: |
python -c "
import tomli
with open('config.toml.example', 'rb') as f:
config = tomli.load(f)
required = ['bigquery', 'blockchain', 'scheduling', 'secrets']
for section in required:
if section not in config:
raise ValueError(f'Missing section: {section}')
print('Configuration valid')
import sys
import os

# Add project root to path
sys.path.insert(0, '.')

os.environ['BLOCKCHAIN_PRIVATE_KEY'] = '0x' + 'f' * 64
os.environ['SLACK_WEBHOOK_URL'] = 'https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX'
os.environ['STUDIO_API_KEY'] = 'api-key'
os.environ['STUDIO_DEPLOY_KEY'] = 'deploy-key'
os.environ['ARBITRUM_API_KEY'] = 'api-key'
os.environ['ETHERSCAN_API_KEY'] = 'api-key'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '{}'

from src.utils.configuration import ConfigLoader, _validate_config

print('Validating config.toml.example...')

# Use the example file and run the full validation logic from our application
loader = ConfigLoader(config_path='config.toml.example')
config = loader.get_flat_config()

print('Patching config in-memory with dummy data for validation...')
config_to_validate = config.copy()
config_to_validate.update({
'BIGQUERY_LOCATION_ID': 'dummy-location',
'BIGQUERY_PROJECT_ID': 'dummy-project',
'BIGQUERY_DATASET_ID': 'dummy-dataset',
'BIGQUERY_TABLE_ID': 'dummy-table',
'BLOCKCHAIN_CONTRACT_ADDRESS': '0x' + '0' * 40,
'BLOCKCHAIN_FUNCTION_NAME': 'dummyFunction',
'BLOCKCHAIN_CHAIN_ID': 1,
'BLOCKCHAIN_RPC_URLS': ['http://dummy-rpc.com'],
'SUBGRAPH_URL_PRE_PRODUCTION': 'http://dummy-subgraph.com',
'SUBGRAPH_URL_PRODUCTION': 'http://dummy-subgraph.com',
'SCHEDULED_RUN_TIME': '00:00',
'BATCH_SIZE': 100,
'MAX_AGE_BEFORE_DELETION': 100,
'BIGQUERY_ANALYSIS_PERIOD_DAYS': 100,
})

_validate_config(config_to_validate)

print('config.toml.example is structurally valid.')
"

# =============================================================================
Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ jobs:
run: |
if [ -d "tests" ] && [ "$(find tests -name "test_*.py" -o -name "*_test.py" | wc -l)" -gt 0 ]; then
echo "Running tests"
pytest tests/ -v --cov=src --cov-report=term-missing -p no:ethereum
# Run pytest and allow exit code 5 (no tests found), but fail on any other error
pytest tests/ -v --cov=src --cov-report=term-missing -p no:ethereum || ([ $? -eq 5 ] && echo "Pytest exited with 5 (No tests found), which is expected. Passing." || exit $?)
else
echo "No tests found. Test directory is empty or doesn't contain test files."
echo "Tests will be skipped until test files are added."
Expand Down
24 changes: 14 additions & 10 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Dockerfile for Service Quality Oracle
# Dockerfile to create a clean, lightweight Docker Image for the Service Quality Oracle

# Use Python 3.9 slim as the base image for a lightweight container
FROM python:3.9-slim
Expand All @@ -10,7 +10,12 @@ LABEL description="Service Quality Oracle" \
# Set working directory
WORKDIR /app

# Set environment variables

# Setup enviroment variables:
# 1. PYTHONDONTWRITEBYTECODE=1 - Prevent python from creating .pyc files
# 2. PYTHONUNBUFFERED=1 - Send logs direct to console without buffering
# 3. PYTHONPATH=/app - Add app directory to python import path
# 4. TZ=UTC - Set timezone to UTC
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PYTHONPATH=/app \
Expand Down Expand Up @@ -40,18 +45,17 @@ COPY contracts/ ./contracts/
COPY .gitignore ./
COPY pyproject.toml ./

# Copy the scheduler to the root directory
COPY src/models/scheduler.py ./

# Create healthcheck file
RUN touch /app/healthcheck

# Use Tini as entrypoint for proper signal handling
ENTRYPOINT ["/usr/bin/tini", "--"]

# Add healthcheck to verify the service is running
HEALTHCHECK --interval=5m --timeout=30s --start-period=1m --retries=3 \
CMD python -c "import os, time; assert os.path.exists('/app/healthcheck') and time.time() - os.path.getmtime('/app/healthcheck') < 3600, 'Healthcheck failed'" || exit 1
# Add healthcheck to verify the service is running.
# The scheduler updates the healthcheck file every minute.
# We check every 2 minutes and assert the file was modified in the last 5 minutes (300s).
HEALTHCHECK --interval=2m --timeout=30s --start-period=1m --retries=3 \
CMD python -c "import os, time; assert os.path.exists('/app/healthcheck') and time.time() - os.path.getmtime('/app/healthcheck') < 300, 'Healthcheck failed'" || exit 1

# Run the scheduler
CMD ["python", "scheduler.py"]
# Run the scheduler as a module
CMD ["python", "-m", "src.models.scheduler"]
15 changes: 11 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,19 @@ Please refer to the [ELIGIBILITY_CRITERIA.md](./ELIGIBILITY_CRITERIA.md) file to

## Data Flow

The application follows this data flow:
The application follows a clear data flow, managed by a daily scheduler:

1. **BigQuery Data Acquisition**: The `bigquery_fetch_and_save_indexer_issuance_eligibility_data_finally_return_eligible_indexers` function in `issuance_data_access_helper.py` fetches fresh data from BigQuery, processes it to determine eligibility, and returns the eligibility data list that would then be posted on chain.
- This function also ensures that data is saved to local files in dated directories for auditing/historical reference over the data retention period.
1. **Scheduler (`scheduler.py`)**: This is the main entry point. It runs on a schedule (e.g., daily), manages the application lifecycle, and triggers the oracle run. It is also responsible for catching up on any missed runs.

2. **Blockchain Publication**: The eligible indexers list from step 1 is directly posted on-chain to a smart contract. Batching of transactions is performed if necessary.
2. **Orchestrator (`service_quality_oracle.py`)**: For each run, this module orchestrates the end-to-end process by coordinating the other components.

3. **Data Fetching (`bigquery_data_access_provider.py`)**: The orchestrator calls this provider to execute a configurable SQL query against Google BigQuery, fetching the raw indexer performance data.

4. **Data Processing (`eligibility_pipeline.py`)**: The raw data is passed to this module, which processes it, filters for eligible and ineligible indexers, and generates CSV artifacts for auditing and record-keeping.

5. **Blockchain Submission (`blockchain_client.py`)**: The orchestrator takes the final list of eligible indexers and passes it to this client, which handles the complexities of batching, signing, and sending the transaction to the blockchain via RPC providers with built-in failover.

6. **Notifications (`slack_notifier.py`)**: Throughout the process, status updates (success, failure, warnings) are sent to Slack.

## CI/CD Pipeline

Expand Down
16 changes: 13 additions & 3 deletions config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
# =============================================================================

[bigquery]
BIGQUERY_LOCATION_ID = ""
BIGQUERY_PROJECT_ID = ""
BIGQUERY_DATASET_ID = ""
BIGQUERY_LOCATION_ID = "US"
BIGQUERY_PROJECT_ID = "graph-mainnet"
BIGQUERY_DATASET_ID = "internal_metrics"
BIGQUERY_TABLE_ID = "metrics_indexer_attempts"

[blockchain]
BLOCKCHAIN_CONTRACT_ADDRESS = ""
Expand All @@ -20,6 +21,8 @@ BLOCKCHAIN_RPC_URLS = [
"",
""
]
BLOCK_EXPLORER_URL = "https://sepolia.arbiscan.io"
TX_TIMEOUT_SECONDS = "30"

[scheduling]
SCHEDULED_RUN_TIME = "10:00"
Expand All @@ -31,6 +34,13 @@ SUBGRAPH_URL_PRODUCTION = ""
[processing]
BATCH_SIZE = 125
MAX_AGE_BEFORE_DELETION = 120
BIGQUERY_ANALYSIS_PERIOD_DAYS = "28"

[eligibility_criteria]
MIN_ONLINE_DAYS = "5"
MIN_SUBGRAPHS = "10"
MAX_LATENCY_MS = "5000"
MAX_BLOCKS_BEHIND = "50000"

# =============================================================================
# SENSITIVE CONFIGURATION
Expand Down
12 changes: 10 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
services:
# Service Quality Oracle container
service-quality-oracle:

# Build the image from the Dockerfile in the current directory
build: .

# Set the container name
container_name: service-quality-oracle

# Set the image name
image: service-quality-oracle:latest

volumes:
Expand All @@ -16,9 +23,7 @@ services:
- ./credentials.json:/app/credentials.json:ro

environment:
- PYTHONPATH=/app
- RUN_ON_STARTUP=true
- TZ=UTC

# Setup enviroment variables
# Environment variables go into process memory for this specific container only
Expand All @@ -45,15 +50,18 @@ services:
reservations:
memory: 512M

# Restart policy
restart: unless-stopped

# Healthcheck to ensure the container is running
healthcheck:
test: ["CMD", "python", "-c", "import os, time; assert os.path.exists('/app/healthcheck') and time.time() - os.path.getmtime('/app/healthcheck') < 3600, 'Healthcheck failed'"]
interval: 5m
timeout: 30s
retries: 3
start_period: 1m

# Prevent log files from growing indefinitely and consuming disk space
logging:
driver: "json-file"
options:
Expand Down
15 changes: 8 additions & 7 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ target-version = "py39"
fix = true
fix-only = false

[tool.ruff.format]
# Format SQL code in strings/docstrings
docstring-code-format = false
quote-style = "double"
indent-style = "space"
line-ending = "lf"

[tool.ruff.lint]
# Enable rules including isort (I) for import sorting and additional fixes
select = ["E", "W", "F", "I"]
Expand All @@ -34,7 +41,7 @@ dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$"
[tool.ruff.lint.per-file-ignores]
# Ignore E402 (import not at top) in scripts and specific modules
"scripts/test_*.py" = ["E402"]
"src/models/issuance_eligibility_oracle_core.py" = ["E402"]
"src/models/service_quality_oracle.py" = ["E402"]

# Use unsafe fixes to address typing and other modernization issues
[tool.ruff.lint.isort]
Expand All @@ -44,12 +51,6 @@ known-first-party = ["src"]
# Unlike Flake8, default to a complexity level of 10.
max-complexity = 10

[tool.ruff.format]
# Format SQL code in strings/docstrings
docstring-code-format = true
quote-style = "double"
indent-style = "space"

[tool.mypy]
ignore_missing_imports = true
no_strict_optional = true
Expand Down
133 changes: 133 additions & 0 deletions scripts/custom_formatter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import ast
import sys
from pathlib import Path


class PythonFormatter:

def __init__(self, source_code: str):
self.source_lines = source_code.splitlines()
self.tree = ast.parse(source_code)
self.node_parents = {
child: parent for parent in ast.walk(self.tree) for child in ast.iter_child_nodes(parent)
}
self.disabled_ranges = self._find_disabled_ranges()


def _find_disabled_ranges(self):
ranges = []
in_disabled_block = False
start_line = 0
for i, line in enumerate(self.source_lines):
if "# fmt: off" in line:
in_disabled_block = True
start_line = i + 1
elif "# fmt: on" in line:
if in_disabled_block:
ranges.append((start_line, i + 1))
in_disabled_block = False
return ranges


def _is_in_disabled_range(self, lineno):
for start, end in self.disabled_ranges:
if start <= lineno <= end:
return True
return False


def get_node_start_line(self, node):
if node.decorator_list:
return node.decorator_list[0].lineno
return node.lineno


def is_method(self, node) -> bool:
return isinstance(self.node_parents.get(node), ast.ClassDef)


def format(self) -> str:
nodes = {}
for node in ast.walk(self.tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
start_line = self.get_node_start_line(node)
nodes[start_line] = node

lines = list(self.source_lines)
sorted_nodes = sorted(nodes.items(), key=lambda x: x[0], reverse=True)

for lineno, node in sorted_nodes:
start_index = lineno - 1
num_blank_lines = 0

# Skip formatting if node is inside a "fmt: off" block
if self._is_in_disabled_range(lineno):
continue

if isinstance(node, ast.ClassDef):
num_blank_lines = 2
elif isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
if self.is_method(node):
if node.name == "__init__":
num_blank_lines = 1
else:
num_blank_lines = 2
else:
num_blank_lines = 2

i = start_index - 1
while i > 0 and not lines[i].strip():
i -= 1

if i < 0: # start of file
i = -1 # will insert at 0

# For top-level nodes, we don't want to add spaces if it's the first thing in the file
# after imports. Let's check if there's anything but imports above.
is_truly_top_level = i == -1
if not is_truly_top_level:
# Count existing blank lines
existing_blank_lines = 0
for k in range(start_index - 1, i, -1):
if not lines[k].strip():
existing_blank_lines += 1

# Only add lines if there are not enough
if existing_blank_lines < num_blank_lines:
# remove existing blank lines
del lines[i + 1 : start_index]
# insert new blank lines
for _ in range(num_blank_lines):
lines.insert(i + 1, "")

result = "\n".join(line.rstrip() for line in lines)
if result:
result = result.strip() + "\n"

return result


def main():
parser = argparse.ArgumentParser(description="Python custom formatter.")
parser.add_argument("files", nargs="+", type=Path)
args = parser.parse_args()

for path in args.files:
try:
source = path.read_text()
# Skip empty files
if not source.strip():
continue
formatter = PythonFormatter(source)
formatted_source = formatter.format()
path.write_text(formatted_source)
print(f"Formatted {path}")
except Exception as e:
print(f"Could not format {path}: {e}", file=sys.stderr)


if __name__ == "__main__":
main()
Loading