Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 51 additions & 29 deletions app/api/pricing/coingecko.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import httpx
import asyncio

from app.api.common.models import ChainId, CoinType
from app.config import settings
from .constants import COINGECKO_CHUNK_SIZE, COINGECKO_MAX_CONCURRENT_REQUESTS
from .utils import chunk_sequence

from .cache import CoinMapCache, PlatformMapCache, TokenPriceCache
from .models import (
Expand Down Expand Up @@ -80,38 +83,57 @@ async def get_prices(
if not coingecko_ids:
return results

# Fetch prices for all coingecko ids
params = {
"ids": ",".join(coingecko_ids),
"vs_currencies": batch.vs_currency.value,
"include_platform": True,
}
# Split coingecko_ids into chunks
id_chunks = chunk_sequence(list(coingecko_ids), COINGECKO_CHUNK_SIZE)

# Process chunks in parallel with controlled concurrency
semaphore = asyncio.Semaphore(COINGECKO_MAX_CONCURRENT_REQUESTS)

async def fetch_chunk(chunk: list[str]) -> dict:
async with semaphore:
params = {
"ids": ",".join(chunk),
"vs_currencies": batch.vs_currency.value,
"include_platform": True,
}
async with self._create_client() as client:
response = await client.get(
f"{self.base_url}/simple/price", params=params
)
response.raise_for_status()
return response.json()

coingecko_responses = []
async with self._create_client() as client:
response = await client.get(f"{self.base_url}/simple/price", params=params)
response.raise_for_status()
data = response.json()
chunk_results = await asyncio.gather(
*[fetch_chunk(chunk) for chunk in id_chunks], return_exceptions=True
)

for request in batch_to_fetch.requests:
if (
id := await self._get_coingecko_id_from_request(
request, platform_map, coin_map
)
) not in data:
continue

try:
item = TokenPriceResponse(
**request.model_dump(),
vs_currency=batch.vs_currency,
price=float(data[id][batch.vs_currency.value]),
cache_status=CacheStatus.MISS,
)
except (KeyError, ValueError):
continue
# Combine results from all chunks
combined_data = {}
for result in chunk_results:
if isinstance(result, Exception):
continue
combined_data.update(result)

coingecko_responses = []
for request in batch_to_fetch.requests:
if (
id := await self._get_coingecko_id_from_request(
request, platform_map, coin_map
)
) not in combined_data:
continue

try:
item = TokenPriceResponse(
**request.model_dump(),
vs_currency=batch.vs_currency,
price=float(combined_data[id][batch.vs_currency.value]),
cache_status=CacheStatus.MISS,
)
except (KeyError, ValueError):
continue

coingecko_responses.append(item)
coingecko_responses.append(item)

await TokenPriceCache.set(coingecko_responses)
results.extend(coingecko_responses)
Expand Down
3 changes: 3 additions & 0 deletions app/api/pricing/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# CoinGecko API constants
COINGECKO_CHUNK_SIZE = 100
COINGECKO_MAX_CONCURRENT_REQUESTS = 4
63 changes: 63 additions & 0 deletions app/api/pricing/tests/test_coingecko.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import pytest
from unittest.mock import AsyncMock, patch

from app.api.common.models import ChainId, CoinType
from app.api.pricing.coingecko import CoinGeckoClient
from app.api.pricing.models import BatchTokenPriceRequests, TokenPriceRequest


@pytest.fixture
def client():
return CoinGeckoClient()


@pytest.fixture
def mock_httpx_client():
with patch("httpx.AsyncClient") as mock:
mock_client = AsyncMock()
mock.return_value.__aenter__.return_value = mock_client
yield mock_client


@pytest.mark.asyncio
async def test_get_prices_chunking(client, mock_httpx_client):
# Create a batch with 7 requests (should create 3 chunks: 3, 3, 1)
requests = [
TokenPriceRequest(
chain_id=ChainId.ETHEREUM,
address=f"0x{i}",
coin_type=CoinType.ETH,
)
for i in range(7)
]
batch = BatchTokenPriceRequests(requests=requests, vs_currency="usd")

with (
patch("app.api.pricing.coingecko.TokenPriceCache.get") as mock_cache,
patch("app.api.pricing.coingecko.TokenPriceCache.set", new_callable=AsyncMock),
patch.object(client, "get_platform_map") as mock_platform_map,
patch.object(client, "get_coin_map") as mock_coin_map,
patch("app.api.pricing.coingecko.COINGECKO_CHUNK_SIZE", 3),
):
mock_cache.return_value = ([], batch)

mock_platform_map.return_value = {"ethereum": {"chain_id": "0x1"}}
mock_coin_map.return_value = {"0x1": {f"0x{i}": f"token{i}" for i in range(7)}}

# Mock the HTTP response
mock_response = AsyncMock()
mock_response.json = lambda: {f"token{i}": {"usd": 1.0} for i in range(7)}
mock_response.raise_for_status = lambda: None
mock_httpx_client.get.return_value = mock_response

# Call get_prices
results = await client.get_prices(batch)

# Verify the number of HTTP requests made (should be 3 chunks)
assert mock_httpx_client.get.call_count == 3

# Verify the results
assert len(results) == 7
for result in results:
assert result.price == 1.0
assert result.cache_status == "MISS"
39 changes: 39 additions & 0 deletions app/api/pricing/tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import pytest

from app.api.pricing.utils import chunk_sequence


def test_chunk_sequence_empty():
result = chunk_sequence([], 3)
assert result == []


def test_chunk_sequence_exact_chunks():
sequence = [1, 2, 3, 4, 5, 6]
result = chunk_sequence(sequence, 3)
assert result == [[1, 2, 3], [4, 5, 6]]


def test_chunk_sequence_with_remainder():
sequence = [1, 2, 3, 4, 5]
result = chunk_sequence(sequence, 3)
assert result == [[1, 2, 3], [4, 5]]


def test_chunk_sequence_single_chunk():
sequence = [1, 2]
result = chunk_sequence(sequence, 3)
assert result == [[1, 2]]


def test_chunk_sequence_different_types():
sequence = ["a", 1, True, "b", 2, False]
result = chunk_sequence(sequence, 2)
assert result == [["a", 1], [True, "b"], [2, False]]


def test_chunk_sequence_invalid_chunk_size():
with pytest.raises(ValueError):
chunk_sequence([1, 2, 3], 0)
with pytest.raises(ValueError):
chunk_sequence([1, 2, 3], -1)
26 changes: 26 additions & 0 deletions app/api/pricing/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from typing import TypeVar, Sequence

T = TypeVar("T")


def chunk_sequence(sequence: Sequence[T], chunk_size: int) -> list[list[T]]:
"""
Split a sequence into chunks of specified size.

Args:
sequence: The sequence to split into chunks
chunk_size: The size of each chunk

Returns:
A list of chunks, where each chunk is a list of items from the original sequence

Raises:
ValueError: If chunk_size is less than or equal to 0
"""
if chunk_size <= 0:
raise ValueError("chunk_size must be greater than 0")

chunks = [list(chunk) for chunk in zip(*[iter(sequence)] * chunk_size)]
if len(sequence) % chunk_size != 0:
chunks.append(list(sequence)[-(len(sequence) % chunk_size) :])
return chunks