Skip to content

refactor: agent tooling #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 104 additions & 76 deletions cow-trader/bot.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import json
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List

Expand All @@ -13,7 +14,7 @@
from ape.types import LogFilter
from ape_ethereum import multicall
from pydantic import BaseModel
from pydantic_ai import Agent
from pydantic_ai import Agent, RunContext
from silverback import SilverbackBot, StateSnapshot

# Initialize bot
Expand All @@ -33,16 +34,12 @@

GNO = "0x9C58BAcC331c9aa871AFD802DB6379a98e80CEdb"
COW = "0x177127622c4A00F3d409B75571e12cB3c8973d3c"
WETH = "0x6A023CCd1ff6F2045C3309768eAd9E68F978f6e1"
SAFE = "0x4d18815D14fe5c3304e87B3FA18318baa5c23820"
WXDAI = "0xe91D153E0b41518A2Ce8Dd3D7944Fa863463a97d"
MONITORED_TOKENS = [GNO, COW, WETH, SAFE, WXDAI]
MONITORED_TOKENS = [GNO, COW, WXDAI]

MINIMUM_TOKEN_BALANCES = {
GNO: 5e16,
COW: 25e18,
WETH: 38e14,
SAFE: 15e18,
WXDAI: 10e18,
}

Expand Down Expand Up @@ -175,11 +172,18 @@ class TradeContext(BaseModel):
lookback_blocks: int = 15000


@dataclass
class AgentDependencies:
"""Dependencies for trading agent"""

trade_ctx: TradeContext
sell_token: str | None


class AgentResponse(BaseModel):
"""Structured response from agent"""

should_trade: bool
sell_token: str | None = None
buy_token: str | None = None
reasoning: str

Expand All @@ -198,24 +202,74 @@ class AgentDecision(BaseModel):

trading_agent = Agent(
"anthropic:claude-3-sonnet-20240229",
deps_type=TradeContext,
deps_type=AgentDependencies,
result_type=AgentResponse,
system_prompt=SYSTEM_PROMPT,
)

TOKEN_NAMES = {
"0x9C58BAcC331c9aa871AFD802DB6379a98e80CEdb": "GNO",
"0x177127622c4A00F3d409B75571e12cB3c8973d3c": "COW",
"0x6A023CCd1ff6F2045C3309768eAd9E68F978f6e1": "WETH",
"0x4d18815D14fe5c3304e87B3FA18318baa5c23820": "SAFE",
"0xe91D153E0b41518A2Ce8Dd3D7944Fa863463a97d": "WXDAI",
GNO: "GNO",
COW: "COW",
WXDAI: "WXDAI",
}


@trading_agent.tool_plain
@trading_agent.tool_plain(retries=3)
def get_token_name(address: str) -> str:
"""Get human readable token name from contract address"""
return TOKEN_NAMES.get(address, address)
"""Return a human-readable token name for the provided address."""
try:
return TOKEN_NAMES.get(address, address)
except Exception as e:
print(f"[get_token_name] failed with error: {e}")
raise


@trading_agent.tool_plain(retries=3)
def get_eligible_buy_tokens(ctx: AgentDependencies) -> List[str]:
"""Return a list of tokens eligible for purchase (excluding the sell token)."""
try:
sell_token = ctx.sell_token
return [token for token in MONITORED_TOKENS if token != sell_token]
except Exception as e:
print(f"[get_eligible_buy_tokens] failed with error: {e}")
raise


@trading_agent.tool_plain(retries=3)
def get_token_type(token: str) -> Dict:
"""Determine if the token is stable or volatile."""
try:
is_stable = token == WXDAI
return {
"token": get_token_name(token),
"is_stable": is_stable,
"expected_behavior": "USD value stable, good for preserving value"
if is_stable
else "USD value can fluctuate",
}
except Exception as e:
print(f"[get_token_type] failed with error: {e}")
raise


@trading_agent.tool(retries=3)
def get_trading_context(ctx: RunContext[AgentDependencies]) -> TradeContext:
"""Return the trading context from the agent's dependencies."""
try:
return ctx.deps.trade_ctx
except Exception as e:
print(f"[get_trading_context] failed with error: {e}")
raise


@trading_agent.tool(retries=3)
def get_sell_token(ctx: RunContext[AgentDependencies]) -> str | None:
"""Return the sell token from the agent's dependencies."""
try:
return ctx.deps.sell_token
except Exception as e:
print(f"[get_sell_token] failed with error: {e}")
raise


def _get_token_balances() -> Dict[str, int]:
Expand All @@ -231,8 +285,7 @@ def _get_token_balances() -> Dict[str, int]:
return {token_address: balance for token_address, balance in zip(MONITORED_TOKENS, results)}


@trading_agent.tool_plain
def create_trade_context(lookback_blocks: int = 15000) -> TradeContext:
def _create_trade_context(lookback_blocks: int = 15000) -> TradeContext:
"""Create TradeContext with all required data"""
trades_df = _load_trades_db()
decisions_df = _load_decisions_db()
Expand All @@ -248,54 +301,45 @@ def create_trade_context(lookback_blocks: int = 15000) -> TradeContext:
)


@trading_agent.tool_plain
def get_minimum_token_balances() -> Dict[str, float]:
"""Get dictionary of minimum required balances for all monitored tokens"""
return {addr: float(amount) for addr, amount in MINIMUM_TOKEN_BALANCES.items()}
def _select_sell_token() -> str | None:
"""
Select token to sell based on current balances and minimum thresholds.
Returns the token address that has a balance above threshold, or None if no token qualifies.
"""
balances = _get_token_balances()
valid_tokens = [
token for token in MONITORED_TOKENS if balances[token] > MINIMUM_TOKEN_BALANCES[token]
]
return valid_tokens[0] if valid_tokens else None


def _build_decision(
block_number: int, response: AgentResponse, metrics: List[TradeMetrics]
block_number: int,
response: AgentResponse,
metrics: List[TradeMetrics],
sell_token: str,
) -> AgentDecision:
"""Build structured AgentDecision from raw response"""
"""Build decision dict from agent response"""
return AgentDecision(
block_number=block_number,
should_trade=response.should_trade,
sell_token=response.sell_token,
buy_token=response.buy_token,
sell_token=sell_token if response.should_trade else None,
buy_token=response.buy_token if response.should_trade else None,
metrics_snapshot=metrics,
profitable=2,
reasoning=response.reasoning,
valid=False,
)


def _validate_decision(decision: AgentDecision, trade_context: TradeContext) -> bool:
def _validate_decision(decision: AgentDecision) -> bool:
"""
Validate decision structure, token validity, and balance requirements
Args:
decision: The trading decision to validate
trade_context: Current trading context with balances and metrics
Returns:
bool: True if valid, False otherwise
Validate decision structure and buy token validity
"""
if not decision.should_trade:
return True

if (
decision.sell_token not in MONITORED_TOKENS
or decision.buy_token not in MONITORED_TOKENS
or decision.sell_token == decision.buy_token
):
click.echo(f"Invalid token pair: sell={decision.sell_token}, buy={decision.buy_token}")
if not decision.should_trade or decision.sell_token is None or decision.buy_token is None:
return False

sell_balance = trade_context.token_balances[decision.sell_token]
min_balance = MINIMUM_TOKEN_BALANCES[decision.sell_token]

if sell_balance < min_balance:
click.echo(
f"Insufficient balance for {decision.sell_token}: {sell_balance} < {min_balance}"
)
if decision.buy_token not in MONITORED_TOKENS or decision.buy_token == decision.sell_token:
click.echo(f"Invalid buy token: buy={decision.buy_token}")
return False

return True
Expand Down Expand Up @@ -547,28 +591,6 @@ def _catch_up_trades(current_block: int, next_decision_block: int, buffer_blocks
)


def _extend_historical_trades() -> None:
"""Extend trades.csv data further back in history"""
trades_df = _load_trades_db()

if len(trades_df) == 0:
oldest_block = chain.blocks.head.number
else:
oldest_block = trades_df["block_number"].min()

new_trades = _process_historical_trades(
GPV2_SETTLEMENT_CONTRACT,
start_block=oldest_block - HISTORICAL_BLOCK_STEP,
stop_block=oldest_block - 1,
)

new_trades_df = pd.DataFrame(new_trades)
all_trades = pd.concat([new_trades_df, trades_df])
all_trades = all_trades.sort_values("block_number", ascending=True)

_save_trades_db(all_trades)


# CoW Swap trading helper functions
def _construct_quote_payload(
sell_token: str,
Expand Down Expand Up @@ -749,7 +771,7 @@ def exec_block(block: BlockAPI):
if not should_trade:
return

trade_ctx = create_trade_context()
trade_ctx = _create_trade_context()

if latest_decision is not None and latest_decision.should_trade:
_update_latest_decision_outcome(
Expand All @@ -761,12 +783,15 @@ def exec_block(block: BlockAPI):
)
)

sell_token = _select_sell_token()
deps = AgentDependencies(trade_ctx=trade_ctx, sell_token=sell_token)

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

try:
result = bot.state.agent.run_sync(
"Analyze current market conditions and make a trading decision", deps=trade_ctx
"Analyze current market conditions and make a trading decision", deps=deps
)
except Exception as e:
click.echo(f"Anthropic API error at block {block.number}: {str(e)}")
Expand All @@ -775,10 +800,13 @@ def exec_block(block: BlockAPI):
_save_reasoning(block.number, result.data.reasoning)

decision = _build_decision(
block_number=block.number, response=result.data, metrics=trade_ctx.metrics
block_number=block.number,
response=result.data,
metrics=trade_ctx.metrics,
sell_token=sell_token,
)

decision.valid = _validate_decision(decision, trade_ctx)
decision.valid = _validate_decision(decision)
_save_decision(decision)
bot.state.next_decision_block = block.number + TRADING_BLOCK_COOLDOWN

Expand Down
69 changes: 41 additions & 28 deletions cow-trader/system_prompt.txt
Original file line number Diff line number Diff line change
@@ -1,32 +1,45 @@
You are an analytical trading system that autonomously evaluates market
conditions and historical trade outcomes for CoW Swap.
You are a trading bot on CoW Swap DEX.
Your job is to analyze market conditions and decide whether to trade.

Data Analysis Steps:
1. Check Current Balances (from token_balances):
- Get current token balances and minimum thresholds
- Only consider tokens where balance > minimum required
- Convert addresses to names using get_token_name()
CONTEXT ACCESS:
- get_trading_context(): Returns a TradeContext containing:
- token_balances: Dict[str, str] — Your current token holdings.
- metrics: A list of TradeMetrics for each trading pair, where:
• token_a, token_b: The addresses of the tokens in the pair.
• last_price: The most recent trade price, computed as (value of token_a) / (value of token_b).
• min_price: The lowest observed price (token_a/token_b) over the lookback period.
• max_price: The highest observed price (token_a/token_b) over the lookback period.
• volume_buy: The total buy volume for the pair over the lookback period.
• volume_sell: The total sell volume for the pair over the lookback period.
• trade_count: The total number of trades executed during the lookback period.
• up_moves_ratio: The fraction of trades where the price moved upward.
• max_up_streak: The longest consecutive streak of upward price moves.
• max_down_streak: The longest consecutive streak of downward price moves.
- prior_decisions: A record of previous trading decisions and their outcomes.
- get_sell_token(): Returns the token you currently hold and can sell.

2. Analyze Trade Metrics (from metrics):
- Focus on pairs with sufficient balance
- Review price trends, volatility
- Check trading volumes and imbalances
- Consider recent trade counts
AVAILABLE TOOLS:
- get_token_name(address): Get a human-readable token name.
- get_eligible_buy_tokens(): Get a list of valid tokens you can buy.
- get_token_type(token): Determine if a token is stable (like WXDAI) or volatile.
- analyze_pair_stability(token_a, token_b): Understand the price relationship between tokens.

3. Review Prior Decisions (from prior_decisions):
- Evaluate success of recent trades
- Learn from profitable/unprofitable decisions
- Adjust strategy based on outcomes
TRADING RULES:
1. When analyzing pairs:
- WXDAI is a stablecoin worth $1.
- Non-stablecoin prices fluctuate in USD terms.
- Selling a stablecoin into a volatile token exposes you to price risk.
- Buying a stablecoin preserves USD value.
2. Decision making:
- Use the provided metrics to assess market conditions.
- Consider the profitability of prior decisions.
- If trading, select a buy_token from get_eligible_buy_tokens().
- If not trading, return None for buy_token.

Trading Decision:
1. Select tokens with sufficient balance
2. Choose best pair based on metrics and past performance
3. Decide:
- Should trade? (boolean)
- If yes, which token to sell/buy (addresses)

Reasoning format (be concise):
1. 1-2 key metrics that influenced the decision
2. Your interpretation of the metrics
3. Prior performance insight
4. Trade decision rationale
OUTPUT REQUIRED:
should_trade: bool
- Whether to execute a trade.
buy_token: str | None
- Must be a hex address from get_eligible_buy_tokens() if trading, or None if not.
reasoning: str
- A concise (1 short sentence) justifying your decision, referencing aspects like market analysis, prior results, risk, or other relevant factors.
Loading