Skip to content

refactor: bot issues #15

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cow-trader/.python-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.11
3.10
18 changes: 17 additions & 1 deletion cow-trader/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,23 @@

Automated CoW Swap trading agent built with Silverback SDK

## Run
## Getting Started

This project utilises uv, see [docs](https://docs.astral.sh/uv/getting-started/) for installation.

Create venv and install dependencies:

```bash
uv sync --all-extras --dev
```

Install all Ape framework plugins:

```bash
ape plugins install .
```

## Run Silverback Bot

```bash
silverback run --network gnosis:mainnet:alchemy --account cow-agent
Expand Down
236 changes: 164 additions & 72 deletions cow-trader/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List
from typing import Annotated, Dict, List

import click
import numpy as np
Expand All @@ -16,6 +16,7 @@
from pydantic import BaseModel
from pydantic_ai import Agent, RunContext
from silverback import SilverbackBot, StateSnapshot
from taskiq import Context, TaskiqDepends, TaskiqState

# Initialize bot
bot = SilverbackBot()
Expand Down Expand Up @@ -230,11 +231,11 @@ def get_token_name(address: str) -> str:
raise


@trading_agent.tool_plain(retries=3)
def get_eligible_buy_tokens(ctx: AgentDependencies) -> List[str]:
@trading_agent.tool(retries=3)
def get_eligible_buy_tokens(ctx: RunContext[AgentDependencies]) -> List[str]:
"""Return a list of tokens eligible for purchase (excluding the sell token)."""
try:
sell_token = ctx.sell_token
sell_token = ctx.deps.sell_token
return [token for token in MONITORED_TOKENS if token != sell_token]
except Exception as e:
print(f"[get_eligible_buy_tokens] failed with error: {e}")
Expand Down Expand Up @@ -304,11 +305,10 @@ def _get_token_balances() -> Dict[str, int]:
return {token_address: balance for token_address, balance in zip(MONITORED_TOKENS, results)}


def _create_trade_context(lookback_blocks: int = 15000) -> TradeContext:
def _create_trade_context(
trades_df: pd.DataFrame, decisions_df: pd.DataFrame, lookback_blocks: int = 15000
) -> TradeContext:
"""Create TradeContext with all required data"""
trades_df = _load_trades_db()
decisions_df = _load_decisions_db()

prior_decisions = decisions_df.tail(3).copy()
prior_decisions["metrics_snapshot"] = prior_decisions["metrics_snapshot"].apply(json.loads)

Expand Down Expand Up @@ -383,32 +383,36 @@ def _save_decision(decision: AgentDecision) -> pd.DataFrame:
return decisions_df


def _update_latest_decision_outcome(final_price: float) -> pd.DataFrame:
def _update_latest_decision_outcome(
decisions_df: pd.DataFrame, final_price: float | None = None
) -> pd.DataFrame:
"""Update most recent decision with outcome data"""
decisions_df = _load_decisions_db()

if decisions_df.empty:
return decisions_df

latest_idx = decisions_df.index[-1]
latest_decision = decisions_df.iloc[-1]

if latest_decision.should_trade:
metrics = json.loads(latest_decision.metrics_snapshot)
initial_price = next(
m["last_price"]
for m in metrics
if m["token_a"] == latest_decision.sell_token
and m["token_b"] == latest_decision.buy_token
)
if final_price is None:
return decisions_df
else:
metrics = json.loads(latest_decision.metrics_snapshot)
initial_price = next(
m["last_price"]
for m in metrics
if m["token_a"] == latest_decision.sell_token
and m["token_b"] == latest_decision.buy_token
)

profitable = (
final_price > initial_price
if latest_decision.sell_token == metrics[0]["token_a"]
else final_price < initial_price
)
profitable = (
final_price > initial_price
if latest_decision.sell_token == metrics[0]["token_a"]
else final_price < initial_price
)

decisions_df.loc[latest_idx, "profitable"] = int(profitable)

decisions_df.loc[latest_idx, "profitable"] = profitable
_save_decisions_db(decisions_df)

return decisions_df
Expand Down Expand Up @@ -452,20 +456,20 @@ def _save_trades_db(trades_dict: Dict) -> None:
df.to_csv(TRADE_FILEPATH, index=False)


def _load_block_db() -> Dict:
def _load_block_db() -> int:
"""Load the last processed block from CSV file or create new if doesn't exist"""
df = (
pd.read_csv(BLOCK_FILEPATH)
if os.path.exists(BLOCK_FILEPATH)
else pd.DataFrame({"last_processed_block": [START_BLOCK]})
)
return {"last_processed_block": df["last_processed_block"].iloc[0]}
return df["last_processed_block"].iloc[0]


def _save_block_db(data: Dict):
def _save_block_db(block_number: int) -> None:
"""Save the last processed block to CSV file"""
os.makedirs(os.path.dirname(BLOCK_FILEPATH), exist_ok=True)
df = pd.DataFrame([data])
df = pd.DataFrame({"last_processed_block": [block_number]})
df.to_csv(BLOCK_FILEPATH, index=False)


Expand Down Expand Up @@ -783,95 +787,183 @@ def bot_startup(startup_state: StateSnapshot):
if PROMPT_AUTOSIGN and click.confirm("Enable autosign?"):
bot.signer.set_autosign(enabled=True)

# Process historical trades
block_db = _load_block_db()
last_processed_block = block_db["last_processed_block"]
_save_block_db({"last_processed_block": chain.blocks.head.number})

last_processed_block = block_db
_save_block_db(chain.blocks.head.number)
_process_historical_trades(
GPV2_SETTLEMENT_CONTRACT,
start_block=last_processed_block,
stop_block=chain.blocks.head.number,
)

bot.state.agent = trading_agent
# Initialize bot state

decisions_df = _load_decisions_db()
if decisions_df.empty:
bot.state.next_decision_block = chain.blocks.head.number
else:
bot.state.next_decision_block = decisions_df.iloc[-1].block_number + TRADING_BLOCK_COOLDOWN

bot.state.can_trade = False
bot.state.sell_token = None

return {"message": "Starting...", "block_number": startup_state.last_block_seen}


@bot.on_(chain.blocks)
def exec_block(block: BlockAPI):
"""Execute block handler with structured decision flow"""
_save_block_db({"last_processed_block": block.number})
@bot.on_worker_startup()
def worker_startup(state: TaskiqState):
"""Initialize worker state"""
state.agent = trading_agent
state.trades_df = _load_trades_db()
state.decisions_df = _load_decisions_db()

if (bot.state.next_decision_block - block.number) <= 5:
_catch_up_trades(
current_block=block.number, next_decision_block=bot.state.next_decision_block
)

decisions_df = _load_decisions_db()
@bot.on_(chain.blocks)
def update_state(block: BlockAPI, context: Annotated[Context, TaskiqDepends()]):
"""Update trade history and decision outcomes"""
click.echo(f"\n[{block.number}] Starting state update...")
_save_block_db(block.number)
bot.state.can_trade = False
click.echo(f"[{block.number}] State: trade={bot.state.can_trade}, sell={bot.state.sell_token}")

if block.number < bot.state.next_decision_block:
click.echo(f"[{block.number}] Skip - next decision at {bot.state.next_decision_block}")
return {"message": "Skipped - before cooldown", "block": block.number}

click.echo(f"[{block.number}] Past cooldown, catching up trades...")
_catch_up_trades(current_block=block.number, next_decision_block=bot.state.next_decision_block)

bot.state.sell_token = _select_sell_token()
click.echo(f"[{block.number}] Sell token: {bot.state.sell_token}")

if not bot.state.sell_token:
click.echo(f"[{block.number}] No eligible sell tokens found")
return {"message": "No eligible sell tokens", "block": block.number}

if context.state.decisions_df.empty:
click.echo(f"[{block.number}] No previous decisions, enabling trading")
bot.state.can_trade = True
return {"message": "No previous decisions", "can_trade": True}

latest_decision = context.state.decisions_df.iloc[-1]
msg = (
f"[{block.number}] Latest: "
f"trade={latest_decision.should_trade}, "
f"block={latest_decision.block_number}"
)
click.echo(msg)

if decisions_df.empty:
should_trade = True
latest_decision = None
else:
latest_decision = decisions_df.iloc[-1]
should_trade = block.number - latest_decision.block_number >= TRADING_BLOCK_COOLDOWN
if not latest_decision.should_trade:
click.echo(f"[{block.number}] Last decision wasn't a trade, enabling trading")
bot.state.can_trade = True
return {
"message": "Last decision was not a trade",
"can_trade": True,
"last_decision_block": latest_decision.block_number,
}

if not should_trade:
return
click.echo(f"[{block.number}] Creating trade context for outcome update...")
trade_ctx = _create_trade_context(
trades_df=context.state.trades_df, decisions_df=context.state.decisions_df
)

trade_ctx = _create_trade_context()
matching_metrics = [
m.last_price
for m in trade_ctx.metrics
if m.token_a == latest_decision.sell_token and m.token_b == latest_decision.buy_token
]

if latest_decision is not None and latest_decision.should_trade:
_update_latest_decision_outcome(
final_price=next(
m.last_price
for m in trade_ctx.metrics
if m.token_a == latest_decision.sell_token
and m.token_b == latest_decision.buy_token
)
if not matching_metrics:
click.echo(
f"[{block.number}] No metrics {latest_decision.sell_token}-{latest_decision.buy_token}"
)
context.state.decisions_df = _update_latest_decision_outcome(
decisions_df=context.state.decisions_df,
final_price=None,
)
bot.state.can_trade = True
return {
"message": "Marked as unknown outcome",
"block": block.number,
"can_trade": True,
}

click.echo(f"[{block.number}] Updating previous decision outcome...")
context.state.decisions_df = _update_latest_decision_outcome(
decisions_df=context.state.decisions_df, final_price=matching_metrics[0]
)

bot.state.can_trade = True
click.echo(f"[{block.number}] State: trade={bot.state.can_trade}, sell={bot.state.sell_token}")
return {
"message": "Updated previous decision outcome",
"can_trade": True,
"last_decision_block": latest_decision.block_number,
}


sell_token = _select_sell_token()
deps = AgentDependencies(trade_ctx=trade_ctx, sell_token=sell_token)
@bot.on_(chain.blocks)
def make_trading_decision(block: BlockAPI, context: Annotated[Context, TaskiqDepends()]):
"""Make and execute trading decisions"""
click.echo(f"\n[{block.number}] Starting trading decision...")
click.echo(f"[{block.number}] State: trade={bot.state.can_trade}, sell={bot.state.sell_token}")

if not bot.state.can_trade:
click.echo(f"[{block.number}] Trading not enabled, skipping")
return {"message": "Trading not enabled", "block": block.number}

click.echo(f"[{block.number}] Creating trade context...")
trade_ctx = _create_trade_context(
trades_df=context.state.trades_df, decisions_df=context.state.decisions_df
)

click.echo(f"[{block.number}] Running agent with sell_token={bot.state.sell_token}...")
deps = AgentDependencies(trade_ctx=trade_ctx, sell_token=bot.state.sell_token)

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

try:
result = bot.state.agent.run_sync(
"Analyze current market conditions and make a trading decision", deps=deps
)
except Exception as e:
click.echo(f"Anthropic API error at block {block.number}: {str(e)}")
return
result = context.state.agent.run_sync(
"Analyze current market conditions and make a trading decision", deps=deps
)

click.echo(
f"[{block.number}] Agent: trade={result.data.should_trade}, buy={result.data.buy_token}"
)
_save_reasoning(block.number, result.data.reasoning)

decision = _build_decision(
block_number=block.number,
response=result.data,
metrics=trade_ctx.metrics,
sell_token=sell_token,
sell_token=bot.state.sell_token,
)

decision.valid = _validate_decision(decision)
click.echo(f"[{block.number}] Decision valid={decision.valid}")
_save_decision(decision)
bot.state.next_decision_block = block.number + TRADING_BLOCK_COOLDOWN

if decision.valid:
if decision.valid and decision.should_trade:
click.echo(f"[{block.number}] Order: {decision.sell_token} -> {decision.buy_token}")
order_uid, error = create_submit_and_sign_order(
sell_token=decision.sell_token,
buy_token=decision.buy_token,
sell_amount=trade_ctx.token_balances[decision.sell_token],
)
if error:
click.echo(f"Order failed: {error}")
click.echo(f"[{block.number}] Order failed: {error}")
else:
click.echo(f"Order submitted successfully. UID: {order_uid}")
click.echo(f"[{block.number}] Order: {order_uid}")

bot.state.next_decision_block = block.number + TRADING_BLOCK_COOLDOWN
click.echo(f"[{block.number}] Next decision: {bot.state.next_decision_block}")

return {
"message": "Trading decision made",
"block": block.number,
"should_trade": decision.should_trade,
"sell_token": decision.sell_token,
"buy_token": decision.buy_token,
"next_decision_block": bot.state.next_decision_block,
}
4 changes: 2 additions & 2 deletions cow-trader/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ name = "cow-agent"
version = "0.1.0"
description = "Automated CoW Swap trading agent built with Silverback SDK"
readme = "README.md"
requires-python = ">=3.11"
requires-python = ">=3.10,<3.11"
dependencies = ["eth-ape>=0.8.25", "pydantic-ai>=0.0.23", "silverback>=0.7.0"]

[tool.uv]
dev-dependencies = ["pre-commit>=4.0.1", "ruff>=0.9.1"]

[tool.ruff]
line-length = 100
target-version = "py311"
target-version = "py310"

[tool.ruff.lint]
select = [
Expand Down
Loading
Loading