Skip to content

feat: catch up logs just in time for agent decision #11

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 109 additions & 31 deletions cow-trader/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
import json
import os
from pathlib import Path
from typing import Annotated, Dict, List
from typing import Dict, List

import click
import numpy as np
import pandas as pd
import requests
from ape import Contract, accounts, chain
Expand All @@ -14,7 +15,6 @@
from pydantic import BaseModel
from pydantic_ai import Agent
from silverback import SilverbackBot, StateSnapshot
from taskiq import Context, TaskiqDepends

# Initialize bot
bot = SilverbackBot()
Expand Down Expand Up @@ -81,11 +81,13 @@ class TradeMetrics(BaseModel):
max_price: float
volume_buy: float
volume_sell: float
order_imbalance: float
up_moves_ratio: float
max_up_streak: int
max_down_streak: int
trade_count: int


def compute_metrics(df: pd.DataFrame, lookback_blocks: int = 15000) -> List[TradeMetrics]:
def _compute_metrics(df: pd.DataFrame, lookback_blocks: int = 15000) -> List[TradeMetrics]:
"""Compute trading metrics for all token pairs in filtered DataFrame"""
if df.empty:
return []
Expand All @@ -100,28 +102,66 @@ def compute_metrics(df: pd.DataFrame, lookback_blocks: int = 15000) -> List[Trad
metrics_list = []

for _, pair in pairs_df.iterrows():
pair_df = filtered_df[
(filtered_df.token_a == pair.token_a) & (filtered_df.token_b == pair.token_b)
]

volume_buy = pair_df.buyAmount.astype(float).sum()
volume_sell = pair_df.sellAmount.astype(float).sum()

volume_sum = volume_buy + volume_sell
order_imbalance = ((volume_buy - volume_sell) / volume_sum) if volume_sum != 0 else 0

metrics = TradeMetrics(
token_a=pair.token_a,
token_b=pair.token_b,
last_price=pair_df.price.iloc[-1],
min_price=pair_df.price.min(),
max_price=pair_df.price.max(),
volume_buy=volume_buy,
volume_sell=volume_sell,
order_imbalance=order_imbalance,
trade_count=len(pair_df),
)
metrics_list.append(metrics)
try:
pair_df = filtered_df[
(filtered_df.token_a == pair.token_a) & (filtered_df.token_b == pair.token_b)
].sort_values("block_number")

pair_df = pair_df[pair_df.price.notna()]

if pair_df.empty:
continue

try:
volume_buy = pair_df.buyAmount.astype(float).sum()
volume_sell = pair_df.sellAmount.astype(float).sum()
except (ValueError, TypeError):
volume_buy = volume_sell = 0.0

prices = pair_df.price.values

up_moves_ratio = 0.5
max_up_streak = 0
max_down_streak = 0

if len(prices) >= 2:
price_changes = np.sign(np.diff(prices))
non_zero_moves = price_changes[price_changes != 0]

if len(non_zero_moves) > 0:
up_moves_ratio = np.mean(non_zero_moves > 0)

if len(price_changes) > 1:
try:
change_points = np.where(price_changes[1:] != price_changes[:-1])[0] + 1
if len(change_points) > 0:
streaks = np.split(price_changes, change_points)
max_up_streak = max(
(len(s) for s in streaks if len(s) > 0 and s[0] > 0), default=0
)
max_down_streak = max(
(len(s) for s in streaks if len(s) > 0 and s[0] < 0), default=0
)
except Exception:
pass

metrics = TradeMetrics(
token_a=pair.token_a,
token_b=pair.token_b,
last_price=float(prices[-1]),
min_price=float(np.min(prices)),
max_price=float(np.max(prices)),
volume_buy=float(volume_buy),
volume_sell=float(volume_sell),
up_moves_ratio=float(up_moves_ratio),
max_up_streak=int(max_up_streak),
max_down_streak=int(max_down_streak),
trade_count=len(pair_df),
)
metrics_list.append(metrics)
except Exception as e:
click.echo(f"Error processing pair {pair.token_a}-{pair.token_b}: {str(e)}")
continue

return metrics_list

Expand Down Expand Up @@ -197,14 +237,23 @@ def create_trade_context(lookback_blocks: int = 15000) -> TradeContext:
trades_df = _load_trades_db()
decisions_df = _load_decisions_db()

prior_decisions = decisions_df.tail(3).copy()
prior_decisions["metrics_snapshot"] = prior_decisions["metrics_snapshot"].apply(json.loads)

return TradeContext(
token_balances=_get_token_balances(),
metrics=compute_metrics(trades_df, lookback_blocks),
prior_decisions=decisions_df.tail(10).to_dict("records"),
metrics=_compute_metrics(trades_df, lookback_blocks),
prior_decisions=prior_decisions.to_dict("records"),
lookback_blocks=lookback_blocks,
)


@trading_agent.tool_plain
def get_minimum_token_balances() -> Dict[str, float]:
"""Get dictionary of minimum required balances for all monitored tokens"""
return {addr: float(amount) for addr, amount in MINIMUM_TOKEN_BALANCES.items()}


def _build_decision(
block_number: int, response: AgentResponse, metrics: List[TradeMetrics]
) -> AgentDecision:
Expand Down Expand Up @@ -481,6 +530,23 @@ def _process_historical_trades(
return trades


def _catch_up_trades(current_block: int, next_decision_block: int, buffer_blocks: int = 5) -> None:
"""
Catch up on trade events from last processed block until shortly before next decision
"""
trades_df = _load_trades_db()
last_processed_block = trades_df["block_number"].max() if not trades_df.empty else START_BLOCK

target_block = min(current_block, next_decision_block - buffer_blocks)

if target_block <= last_processed_block:
return

_process_historical_trades(
GPV2_SETTLEMENT_CONTRACT, start_block=last_processed_block + 1, stop_block=target_block
)


def _extend_historical_trades() -> None:
"""Extend trades.csv data further back in history"""
trades_df = _load_trades_db()
Expand Down Expand Up @@ -642,24 +708,35 @@ def bot_startup(startup_state: StateSnapshot):
"""Initialize bot state and historical data"""
block_db = _load_block_db()
last_processed_block = block_db["last_processed_block"]
_save_block_db({"last_processed_block": chain.blocks.head.number})

_process_historical_trades(
GPV2_SETTLEMENT_CONTRACT,
start_block=last_processed_block,
stop_block=chain.blocks.head.number,
)

_save_block_db({"last_processed_block": chain.blocks.head.number})
bot.state.last_extension_block = chain.blocks.head.number
bot.state.agent = trading_agent

decisions_df = _load_decisions_db()
if decisions_df.empty:
bot.state.next_decision_block = chain.blocks.head.number
else:
bot.state.next_decision_block = decisions_df.iloc[-1].block_number + TRADING_BLOCK_COOLDOWN

return {"message": "Starting...", "block_number": startup_state.last_block_seen}


@bot.on_(chain.blocks)
def exec_block(block: BlockAPI, context: Annotated[Context, TaskiqDepends()]):
def exec_block(block: BlockAPI):
"""Execute block handler with structured decision flow"""
_save_block_db({"last_processed_block": block.number})

if (bot.state.next_decision_block - block.number) <= 5:
_catch_up_trades(
current_block=block.number, next_decision_block=bot.state.next_decision_block
)

decisions_df = _load_decisions_db()

if decisions_df.empty:
Expand Down Expand Up @@ -703,6 +780,7 @@ def exec_block(block: BlockAPI, context: Annotated[Context, TaskiqDepends()]):

decision.valid = _validate_decision(decision, trade_ctx)
_save_decision(decision)
bot.state.next_decision_block = block.number + TRADING_BLOCK_COOLDOWN

if decision.valid:
order_uid, error = create_and_submit_order(
Expand Down
53 changes: 25 additions & 28 deletions cow-trader/system_prompt.txt
Original file line number Diff line number Diff line change
@@ -1,35 +1,32 @@
You are an analytical trading system that autonomously evaluates market
conditions and historical trade outcomes for CoW Swap.

Analysis:
- Evaluate price trends, trade volumes, order imbalances, and trade counts.
- Determine whether current market conditions favor actively trading volatile
token pairs for higher risk/reward, or suggest a safe exit (i.e., staying out
of the volatile market by opting for stable coins).
Data Analysis Steps:
1. Check Current Balances (from token_balances):
- Get current token balances and minimum thresholds
- Only consider tokens where balance > minimum required
- Convert addresses to names using get_token_name()

Before trading:
- Always convert token addresses to human-readable names using get_token_name().
- Analyze key metrics for all monitored tokens:
- Price trends and volatility
- Trading volumes (buy/sell)
- Order imbalances
- Recent trade counts
- Assess whether the market favours trading volatile tokens or seeking stable
coin safety.
- Review prior trade outcomes to identify successful patterns.
- When executing a trade, sell your entire position of the selected token and
convert it completely to the specified counter token.
- Trading decisions occur every 360 blocks (~30 minutes).
2. Analyze Trade Metrics (from metrics):
- Focus on pairs with sufficient balance
- Review price trends, volatility
- Check trading volumes and imbalances
- Consider recent trade counts

Trading Decision:
- You have three paths:
1. Do not trade.
2. Trade volatile tokens (GNO, COW, SAFE, WETH).
3. Trade a stable coin (WXDAI).
3. Review Prior Decisions (from prior_decisions):
- Evaluate success of recent trades
- Learn from profitable/unprofitable decisions
- Adjust strategy based on outcomes

Decide:
- Should we trade? (boolean decision)
- If yes, which token to sell and which token to buy (provide actual addresses).
Trading Decision:
1. Select tokens with sufficient balance
2. Choose best pair based on metrics and past performance
3. Decide:
- Should trade? (boolean)
- If yes, which token to sell/buy (addresses)

Reasoning must include only the relevant quantitative metrics. No general market
commentary or disclaimers.
Reasoning format (be concise):
1. 1-2 key metrics that influenced the decision
2. Your interpretation of the metrics
3. Prior performance insight
4. Trade decision rationale
Loading