diff --git a/cow-trader/bot.py b/cow-trader/bot.py index 661caae..e0e5a0e 100644 --- a/cow-trader/bot.py +++ b/cow-trader/bot.py @@ -1,10 +1,9 @@ import json import os from pathlib import Path -from typing import Annotated, Dict +from typing import Annotated, Dict, List import click -import numpy as np import pandas as pd import requests from ape import Contract, accounts, chain @@ -27,8 +26,13 @@ SAFE_ADDRESS = "0x5aFE3855358E112B5647B952709E6165e1c1eEEe" # PLACEHOLDER TOKEN_ALLOWLIST_ADDRESS = "0xe7f1725E7734CE288F8367e1Bb143E90bb3F0512" GPV2_SETTLEMENT_ADDRESS = "0x9008D19f58AAbD9eD0D60971565AA8510560ab41" -GNO_ADDRESS = "0x9C58BAcC331c9aa871AFD802DB6379a98e80CEdb" -COW_ADDRESS = "0x177127622c4A00F3d409B75571e12cB3c8973d3c" +GNO = "0x9C58BAcC331c9aa871AFD802DB6379a98e80CEdb" +COW = "0x177127622c4A00F3d409B75571e12cB3c8973d3c" +WETH = "0x6A023CCd1ff6F2045C3309768eAd9E68F978f6e1" +SAFE = "0x4d18815D14fe5c3304e87B3FA18318baa5c23820" +WXDAI = "0xe91D153E0b41518A2Ce8Dd3D7944Fa863463a97d" + +MONITORED_TOKENS = [GNO, COW, WETH, SAFE, WXDAI] # ABI @@ -52,21 +56,20 @@ def _load_abi(abi_name: str) -> Dict: # Variables START_BLOCK = int(os.environ.get("START_BLOCK", chain.blocks.head.number)) +HISTORICAL_BLOCK_STEP = int(os.environ.get("HISTORICAL_BLOCK_STEP", 720)) +EXTENSION_INTERVAL = int(os.environ.get("EXTENSION_INTERVAL", 6)) # Local storage helper functions -def _load_trades_db() -> Dict: - """ - Load trades database from CSV file or create new if doesn't exist. - Returns dict with trade data indexed by block number. - """ +def _load_trades_db() -> pd.DataFrame: + """Load trades database from CSV file or create new if doesn't exist""" dtype = { + "block_number": int, "owner": str, "sellToken": str, "buyToken": str, - "sellAmount": object, - "buyAmount": object, - "block_number": np.int64, + "sellAmount": str, + "buyAmount": str, } df = ( @@ -74,7 +77,7 @@ def _load_trades_db() -> Dict: if os.path.exists(TRADE_FILEPATH) else pd.DataFrame(columns=dtype.keys()).astype(dtype) ) - return df.to_dict("records") + return df def _save_trades_db(trades_dict: Dict) -> None: @@ -132,8 +135,24 @@ def _save_orders_db(df: pd.DataFrame) -> None: # Historical log helper functions +def get_canonical_pair(token_a: str, token_b: str) -> tuple[str, str]: + """Return tokens in canonical order (alphabetically by address)""" + return (token_a, token_b) if token_a.lower() < token_b.lower() else (token_b, token_a) + + +def calculate_price(sell_amount: str, buy_amount: str) -> float: + """Calculate price from amounts""" + return int(sell_amount) / int(buy_amount) + + def _process_trade_log(log) -> Dict: - """Process trade log and return formatted dictionary entry""" + """Process trade log with price calculation""" + token_a, token_b = get_canonical_pair(log.sellToken, log.buyToken) + price = calculate_price(log.sellAmount, log.buyAmount) + + if token_a != log.sellToken: + price = 1 / price + return { "block_number": log.block_number, "owner": log.owner, @@ -141,16 +160,18 @@ def _process_trade_log(log) -> Dict: "buyToken": log.buyToken, "sellAmount": str(log.sellAmount), "buyAmount": str(log.buyAmount), + "token_a": token_a, + "token_b": token_b, + "price": price, } -def _get_historical_gno_trades( +def _get_historical_trades( settlement_contract, - gno_address: str, start_block: int, stop_block: int = chain.blocks.head.number, ): - """Get historical GNO trades from start_block to stop_block""" + """Get historical trades for monitored token pairs""" log_filter = LogFilter( addresses=[settlement_contract.address], events=[settlement_contract.Trade.abi], @@ -159,23 +180,48 @@ def _get_historical_gno_trades( ) for log in accounts.provider.get_contract_logs(log_filter): - if log.sellToken == gno_address or log.buyToken == gno_address: + if log.sellToken in MONITORED_TOKENS and log.buyToken in MONITORED_TOKENS: yield log -def _process_historical_gno_trades( - settlement_contract, gno_address: str, start_block: int, stop_block: int -) -> Dict: - """Process historical GNO trades and store in database""" - trades_db = _load_trades_db() +def _process_historical_trades( + settlement_contract, start_block: int, stop_block: int +) -> List[Dict]: + """Process historical trades and store in database""" + trades = [] + + for log in _get_historical_trades(settlement_contract, start_block, stop_block): + trades.append(_process_trade_log(log)) - for log in _get_historical_gno_trades( - settlement_contract, gno_address, start_block, stop_block - ): - trades_db.append(_process_trade_log(log)) + if trades: + existing_trades = _load_trades_db() + all_trades = pd.concat([existing_trades, pd.DataFrame(trades)], ignore_index=True) - _save_trades_db(trades_db) - return trades_db + _save_trades_db(all_trades) + + return trades + + +def extend_historical_trades() -> None: + """Extend trades.csv data further back in history""" + trades_df = _load_trades_db() + + if len(trades_df) == 0: + oldest_block = chain.blocks.head.number + else: + oldest_block = trades_df["block_number"].min() + + new_trades = _process_historical_trades( + GPV2_SETTLEMENT_CONTRACT, + start_block=oldest_block - HISTORICAL_BLOCK_STEP, + stop_block=oldest_block - 1, + ) + + new_trades_df = pd.DataFrame(new_trades) + all_trades = pd.concat([new_trades_df, trades_df]) + all_trades = all_trades.sort_values("block_number", ascending=True) + + _save_trades_db(all_trades) # CoW Swap trading helper functions @@ -313,30 +359,37 @@ def create_and_submit_order( # Silverback bot @bot.on_startup() -def app_startup(startup_state: StateSnapshot): +def bot_startup(startup_state: StateSnapshot): + """Initialize bot state and historical data""" block_db = _load_block_db() last_processed_block = block_db["last_processed_block"] - _process_historical_gno_trades( + _process_historical_trades( GPV2_SETTLEMENT_CONTRACT, - GNO_ADDRESS, start_block=last_processed_block, stop_block=chain.blocks.head.number, ) _save_block_db({"last_processed_block": chain.blocks.head.number}) - + bot.state.last_extension_block = chain.blocks.head.number return {"message": "Starting...", "block_number": startup_state.last_block_seen} @bot.on_(chain.blocks) def exec_block(block: BlockAPI, context: Annotated[Context, TaskiqDepends()]): - """Execute block handler""" - order_uid, error = create_and_submit_order( - sell_token=GNO_ADDRESS, buy_token=COW_ADDRESS, sell_amount="20000000000000000000" - ) - - if error: - click.echo(f"Order failed: {error}") - else: - click.echo(f"Order submitted successfully. UID: {order_uid}") + _save_block_db({"last_processed_block": block.number}) + + if block.number - bot.state.last_extension_block >= EXTENSION_INTERVAL: + extend_historical_trades() + bot.state.last_extension_block = block.number + + +# """Execute block handler""" +# order_uid, error = create_and_submit_order( +# sell_token=GNO, buy_token=COW, sell_amount="20000000000000000000" +# ) +# +# if error: +# click.echo(f"Order failed: {error}") +# else: +# click.echo(f"Order submitted successfully. UID: {order_uid}")