|
| 1 | +import base64 |
| 2 | +import json |
| 3 | +import logging |
| 4 | +from typing import Dict, Any, List, Optional |
| 5 | +from solana_agent import AutoTool, ToolRegistry |
| 6 | +import httpx |
| 7 | +from cryptography.hazmat.primitives import hashes, serialization |
| 8 | +from cryptography.hazmat.primitives.asymmetric import ec |
| 9 | +from sakit.utils.wallet import SolanaWalletClient |
| 10 | +from sakit.utils.swap import TradeManager |
| 11 | + |
| 12 | + |
| 13 | +def canonicalize(obj): |
| 14 | + return json.dumps(obj, sort_keys=True, separators=(",", ":")) |
| 15 | + |
| 16 | + |
| 17 | +def get_authorization_signature(url, body, privy_app_id, privy_auth_key): |
| 18 | + payload = { |
| 19 | + "version": 1, |
| 20 | + "method": "POST", |
| 21 | + "url": url, |
| 22 | + "body": body, |
| 23 | + "headers": {"privy-app-id": privy_app_id}, |
| 24 | + } |
| 25 | + serialized_payload = canonicalize(payload) |
| 26 | + private_key_string = privy_auth_key.replace("wallet-auth:", "") |
| 27 | + private_key_pem = ( |
| 28 | + f"-----BEGIN PRIVATE KEY-----\n{private_key_string}\n-----END PRIVATE KEY-----" |
| 29 | + ) |
| 30 | + private_key = serialization.load_pem_private_key( |
| 31 | + private_key_pem.encode("utf-8"), password=None |
| 32 | + ) |
| 33 | + signature = private_key.sign( |
| 34 | + serialized_payload.encode("utf-8"), ec.ECDSA(hashes.SHA256()) |
| 35 | + ) |
| 36 | + return base64.b64encode(signature).decode("utf-8") |
| 37 | + |
| 38 | + |
| 39 | +async def get_privy_embedded_wallet( |
| 40 | + user_id: str, app_id: str, app_secret: str |
| 41 | +) -> Optional[Dict[str, str]]: |
| 42 | + url = f"https://auth.privy.io/api/v1/users/{user_id}" |
| 43 | + headers = {"privy-app-id": app_id} |
| 44 | + auth = (app_id, app_secret) |
| 45 | + async with httpx.AsyncClient() as client: |
| 46 | + resp = await client.get(url, headers=headers, auth=auth, timeout=10) |
| 47 | + if resp.status_code != 200: |
| 48 | + logging.error(f"Privy API error: {resp.text}") |
| 49 | + resp.raise_for_status() |
| 50 | + data = resp.json() |
| 51 | + for acct in data.get("linked_accounts", []): |
| 52 | + if acct.get("connector_type") == "embedded" and acct.get("delegated"): |
| 53 | + return {"wallet_id": acct["id"], "public_key": acct["public_key"]} |
| 54 | + return None |
| 55 | + |
| 56 | + |
| 57 | +async def privy_sign_and_send( |
| 58 | + wallet_id: str, encoded_tx: str, app_id: str, app_secret: str, privy_auth_key: str |
| 59 | +) -> Dict[str, Any]: |
| 60 | + url = f"https://api.privy.io/v1/wallets/{wallet_id}/rpc" |
| 61 | + auth_string = f"{app_id}:{app_secret}" |
| 62 | + encoded_auth = base64.b64encode(auth_string.encode()).decode() |
| 63 | + body = { |
| 64 | + "method": "signAndSendTransaction", |
| 65 | + "caip2": "solana:5eykt4UsFv8P8NJdTREpY1vzqKqZKvdp", |
| 66 | + "params": {"transaction": encoded_tx, "encoding": "base64"}, |
| 67 | + } |
| 68 | + signature = get_authorization_signature( |
| 69 | + url=url, body=body, privy_app_id=app_id, privy_auth_key=privy_auth_key |
| 70 | + ) |
| 71 | + headers = { |
| 72 | + "Authorization": f"Basic {encoded_auth}", |
| 73 | + "privy-app-id": app_id, |
| 74 | + "privy-authorization-signature": signature, |
| 75 | + "Content-Type": "application/json", |
| 76 | + } |
| 77 | + async with httpx.AsyncClient() as client: |
| 78 | + resp = await client.post(url, headers=headers, json=body, timeout=20) |
| 79 | + if resp.status_code != 200: |
| 80 | + logging.error(f"Privy API error: {resp.text}") |
| 81 | + resp.raise_for_status() |
| 82 | + return resp.json() |
| 83 | + |
| 84 | + |
| 85 | +class PrivySwapTool(AutoTool): |
| 86 | + def __init__(self, registry: Optional[ToolRegistry] = None): |
| 87 | + super().__init__( |
| 88 | + name="privy_swap", |
| 89 | + description="Swap tokens using Jupiter via Privy delegated wallet.", |
| 90 | + registry=registry, |
| 91 | + ) |
| 92 | + self.app_id = None |
| 93 | + self.app_secret = None |
| 94 | + self.signing_key = None |
| 95 | + self.rpc_url = None |
| 96 | + self.jupiter_url = None |
| 97 | + self.fee_payer = None |
| 98 | + |
| 99 | + def get_schema(self) -> Dict[str, Any]: |
| 100 | + return { |
| 101 | + "type": "object", |
| 102 | + "properties": { |
| 103 | + "user_id": {"type": "string", "description": "Privy user id (did)"}, |
| 104 | + "output_mint": { |
| 105 | + "type": "string", |
| 106 | + "description": "The mint address of the token to receive.", |
| 107 | + }, |
| 108 | + "input_amount": { |
| 109 | + "type": "number", |
| 110 | + "description": "The amount of the input token to swap.", |
| 111 | + }, |
| 112 | + "input_mint": { |
| 113 | + "type": "string", |
| 114 | + "description": "The mint address of the token to swap from.", |
| 115 | + }, |
| 116 | + }, |
| 117 | + "required": ["user_id", "output_mint", "input_amount", "input_mint"], |
| 118 | + "additionalProperties": False, |
| 119 | + } |
| 120 | + |
| 121 | + def configure(self, config: Dict[str, Any]) -> None: |
| 122 | + tool_cfg = config.get("tools", {}).get("privy_swap", {}) |
| 123 | + self.app_id = tool_cfg.get("app_id") |
| 124 | + self.app_secret = tool_cfg.get("app_secret") |
| 125 | + self.signing_key = tool_cfg.get("signing_key") |
| 126 | + self.rpc_url = tool_cfg.get("rpc_url") |
| 127 | + self.jupiter_url = tool_cfg.get("jupiter_url") |
| 128 | + self.fee_payer = tool_cfg.get("fee_payer") |
| 129 | + |
| 130 | + async def execute( |
| 131 | + self, |
| 132 | + user_id: str, |
| 133 | + output_mint: str, |
| 134 | + input_amount: float, |
| 135 | + input_mint: str, |
| 136 | + slippage_bps: int = 300, |
| 137 | + ) -> Dict[str, Any]: |
| 138 | + if not all( |
| 139 | + [ |
| 140 | + self.app_id, |
| 141 | + self.app_secret, |
| 142 | + self.signing_key, |
| 143 | + self.rpc_url, |
| 144 | + self.fee_payer, |
| 145 | + ] |
| 146 | + ): |
| 147 | + return {"status": "error", "message": "Privy config missing."} |
| 148 | + wallet_info = await get_privy_embedded_wallet( |
| 149 | + user_id, self.app_id, self.app_secret |
| 150 | + ) |
| 151 | + if not wallet_info: |
| 152 | + return { |
| 153 | + "status": "error", |
| 154 | + "message": "No delegated embedded wallet found for user.", |
| 155 | + } |
| 156 | + wallet_id = wallet_info["wallet_id"] |
| 157 | + try: |
| 158 | + wallet = SolanaWalletClient( |
| 159 | + self.rpc_url, None, wallet_info["public_key"], self.fee_payer |
| 160 | + ) |
| 161 | + transaction = await TradeManager.trade( |
| 162 | + wallet, |
| 163 | + output_mint, |
| 164 | + input_amount, |
| 165 | + input_mint, |
| 166 | + slippage_bps, |
| 167 | + self.jupiter_url, |
| 168 | + True, |
| 169 | + ) |
| 170 | + encoded_transaction = base64.b64encode(bytes(transaction)).decode("utf-8") |
| 171 | + result = await privy_sign_and_send( |
| 172 | + wallet_id, |
| 173 | + encoded_transaction, |
| 174 | + self.app_id, |
| 175 | + self.app_secret, |
| 176 | + self.signing_key, |
| 177 | + ) |
| 178 | + return {"status": "success", "result": result} |
| 179 | + except Exception as e: |
| 180 | + logging.exception(f"Privy swap failed: {str(e)}") |
| 181 | + return {"status": "error", "message": str(e)} |
| 182 | + |
| 183 | + |
| 184 | +class PrivySwapPlugin: |
| 185 | + def __init__(self): |
| 186 | + self.name = "privy_swap" |
| 187 | + self.config = None |
| 188 | + self.tool_registry = None |
| 189 | + self._tool = None |
| 190 | + |
| 191 | + @property |
| 192 | + def description(self): |
| 193 | + return "Plugin for swapping tokens using Jupiter via Privy delegated wallet." |
| 194 | + |
| 195 | + def initialize(self, tool_registry: ToolRegistry) -> None: |
| 196 | + self.tool_registry = tool_registry |
| 197 | + self._tool = PrivySwapTool(registry=tool_registry) |
| 198 | + |
| 199 | + def configure(self, config: Dict[str, Any]) -> None: |
| 200 | + self.config = config |
| 201 | + if self._tool: |
| 202 | + self._tool.configure(self.config) |
| 203 | + |
| 204 | + def get_tools(self) -> List[AutoTool]: |
| 205 | + return [self._tool] if self._tool else [] |
| 206 | + |
| 207 | + |
| 208 | +def get_plugin(): |
| 209 | + return PrivySwapPlugin() |
0 commit comments