Skip to content

feat: add historical price monitoring #9

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 9, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 95 additions & 42 deletions cow-trader/bot.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import json
import os
from pathlib import Path
from typing import Annotated, Dict
from typing import Annotated, Dict, List

import click
import numpy as np
import pandas as pd
import requests
from ape import Contract, accounts, chain
Expand All @@ -27,8 +26,13 @@
SAFE_ADDRESS = "0x5aFE3855358E112B5647B952709E6165e1c1eEEe" # PLACEHOLDER
TOKEN_ALLOWLIST_ADDRESS = "0xe7f1725E7734CE288F8367e1Bb143E90bb3F0512"
GPV2_SETTLEMENT_ADDRESS = "0x9008D19f58AAbD9eD0D60971565AA8510560ab41"
GNO_ADDRESS = "0x9C58BAcC331c9aa871AFD802DB6379a98e80CEdb"
COW_ADDRESS = "0x177127622c4A00F3d409B75571e12cB3c8973d3c"
GNO = "0x9C58BAcC331c9aa871AFD802DB6379a98e80CEdb"
COW = "0x177127622c4A00F3d409B75571e12cB3c8973d3c"
WETH = "0x6A023CCd1ff6F2045C3309768eAd9E68F978f6e1"
SAFE = "0x4d18815D14fe5c3304e87B3FA18318baa5c23820"
WXDAI = "0xe91D153E0b41518A2Ce8Dd3D7944Fa863463a97d"

MONITORED_TOKENS = [GNO, COW, WETH, SAFE, WXDAI]


# ABI
Expand All @@ -52,29 +56,28 @@ def _load_abi(abi_name: str) -> Dict:

# Variables
START_BLOCK = int(os.environ.get("START_BLOCK", chain.blocks.head.number))
HISTORICAL_BLOCK_STEP = int(os.environ.get("HISTORICAL_BLOCK_STEP", 720))
EXTENSION_INTERVAL = int(os.environ.get("EXTENSION_INTERVAL", 6))


# Local storage helper functions
def _load_trades_db() -> Dict:
"""
Load trades database from CSV file or create new if doesn't exist.
Returns dict with trade data indexed by block number.
"""
def _load_trades_db() -> pd.DataFrame:
"""Load trades database from CSV file or create new if doesn't exist"""
dtype = {
"block_number": int,
"owner": str,
"sellToken": str,
"buyToken": str,
"sellAmount": object,
"buyAmount": object,
"block_number": np.int64,
"sellAmount": str,
"buyAmount": str,
}

df = (
pd.read_csv(TRADE_FILEPATH, dtype=dtype)
if os.path.exists(TRADE_FILEPATH)
else pd.DataFrame(columns=dtype.keys()).astype(dtype)
)
return df.to_dict("records")
return df


def _save_trades_db(trades_dict: Dict) -> None:
Expand Down Expand Up @@ -132,25 +135,43 @@ def _save_orders_db(df: pd.DataFrame) -> None:


# Historical log helper functions
def get_canonical_pair(token_a: str, token_b: str) -> tuple[str, str]:
"""Return tokens in canonical order (alphabetically by address)"""
return (token_a, token_b) if token_a.lower() < token_b.lower() else (token_b, token_a)


def calculate_price(sell_amount: str, buy_amount: str) -> float:
"""Calculate price from amounts"""
return int(sell_amount) / int(buy_amount)


def _process_trade_log(log) -> Dict:
"""Process trade log and return formatted dictionary entry"""
"""Process trade log with price calculation"""
token_a, token_b = get_canonical_pair(log.sellToken, log.buyToken)
price = calculate_price(log.sellAmount, log.buyAmount)

if token_a != log.sellToken:
price = 1 / price

return {
"block_number": log.block_number,
"owner": log.owner,
"sellToken": log.sellToken,
"buyToken": log.buyToken,
"sellAmount": str(log.sellAmount),
"buyAmount": str(log.buyAmount),
"token_a": token_a,
"token_b": token_b,
"price": price,
}


def _get_historical_gno_trades(
def _get_historical_trades(
settlement_contract,
gno_address: str,
start_block: int,
stop_block: int = chain.blocks.head.number,
):
"""Get historical GNO trades from start_block to stop_block"""
"""Get historical trades for monitored token pairs"""
log_filter = LogFilter(
addresses=[settlement_contract.address],
events=[settlement_contract.Trade.abi],
Expand All @@ -159,23 +180,48 @@ def _get_historical_gno_trades(
)

for log in accounts.provider.get_contract_logs(log_filter):
if log.sellToken == gno_address or log.buyToken == gno_address:
if log.sellToken in MONITORED_TOKENS and log.buyToken in MONITORED_TOKENS:
yield log


def _process_historical_gno_trades(
settlement_contract, gno_address: str, start_block: int, stop_block: int
) -> Dict:
"""Process historical GNO trades and store in database"""
trades_db = _load_trades_db()
def _process_historical_trades(
settlement_contract, start_block: int, stop_block: int
) -> List[Dict]:
"""Process historical trades and store in database"""
trades = []

for log in _get_historical_trades(settlement_contract, start_block, stop_block):
trades.append(_process_trade_log(log))

for log in _get_historical_gno_trades(
settlement_contract, gno_address, start_block, stop_block
):
trades_db.append(_process_trade_log(log))
if trades:
existing_trades = _load_trades_db()
all_trades = pd.concat([existing_trades, pd.DataFrame(trades)], ignore_index=True)

_save_trades_db(trades_db)
return trades_db
_save_trades_db(all_trades)

return trades


def extend_historical_trades() -> None:
"""Extend trades.csv data further back in history"""
trades_df = _load_trades_db()

if len(trades_df) == 0:
oldest_block = chain.blocks.head.number
else:
oldest_block = trades_df["block_number"].min()

new_trades = _process_historical_trades(
GPV2_SETTLEMENT_CONTRACT,
start_block=oldest_block - HISTORICAL_BLOCK_STEP,
stop_block=oldest_block - 1,
)

new_trades_df = pd.DataFrame(new_trades)
all_trades = pd.concat([new_trades_df, trades_df])
all_trades = all_trades.sort_values("block_number", ascending=True)

_save_trades_db(all_trades)


# CoW Swap trading helper functions
Expand Down Expand Up @@ -313,30 +359,37 @@ def create_and_submit_order(

# Silverback bot
@bot.on_startup()
def app_startup(startup_state: StateSnapshot):
def bot_startup(startup_state: StateSnapshot):
"""Initialize bot state and historical data"""
block_db = _load_block_db()
last_processed_block = block_db["last_processed_block"]

_process_historical_gno_trades(
_process_historical_trades(
GPV2_SETTLEMENT_CONTRACT,
GNO_ADDRESS,
start_block=last_processed_block,
stop_block=chain.blocks.head.number,
)

_save_block_db({"last_processed_block": chain.blocks.head.number})

bot.state.last_extension_block = chain.blocks.head.number
return {"message": "Starting...", "block_number": startup_state.last_block_seen}


@bot.on_(chain.blocks)
def exec_block(block: BlockAPI, context: Annotated[Context, TaskiqDepends()]):
"""Execute block handler"""
order_uid, error = create_and_submit_order(
sell_token=GNO_ADDRESS, buy_token=COW_ADDRESS, sell_amount="20000000000000000000"
)

if error:
click.echo(f"Order failed: {error}")
else:
click.echo(f"Order submitted successfully. UID: {order_uid}")
_save_block_db({"last_processed_block": block.number})

if block.number - bot.state.last_extension_block >= EXTENSION_INTERVAL:
extend_historical_trades()
bot.state.last_extension_block = block.number


# """Execute block handler"""
# order_uid, error = create_and_submit_order(
# sell_token=GNO, buy_token=COW, sell_amount="20000000000000000000"
# )
#
# if error:
# click.echo(f"Order failed: {error}")
# else:
# click.echo(f"Order submitted successfully. UID: {order_uid}")