Skip to content

Commit fe599b7

Browse files
committed
feat(pricing): add concurrency to coingecko api calls
1 parent 3fbce91 commit fe599b7

File tree

5 files changed

+182
-29
lines changed

5 files changed

+182
-29
lines changed

app/api/pricing/coingecko.py

Lines changed: 51 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
import httpx
2+
import asyncio
23

34
from app.api.common.models import ChainId, CoinType
45
from app.config import settings
6+
from .constants import COINGECKO_CHUNK_SIZE, COINGECKO_MAX_CONCURRENT_REQUESTS
7+
from .utils import chunk_sequence
58

69
from .cache import CoinMapCache, PlatformMapCache, TokenPriceCache
710
from .models import (
@@ -80,38 +83,57 @@ async def get_prices(
8083
if not coingecko_ids:
8184
return results
8285

83-
# Fetch prices for all coingecko ids
84-
params = {
85-
"ids": ",".join(coingecko_ids),
86-
"vs_currencies": batch.vs_currency.value,
87-
"include_platform": True,
88-
}
86+
# Split coingecko_ids into chunks
87+
id_chunks = chunk_sequence(list(coingecko_ids), COINGECKO_CHUNK_SIZE)
88+
89+
# Process chunks in parallel with controlled concurrency
90+
semaphore = asyncio.Semaphore(COINGECKO_MAX_CONCURRENT_REQUESTS)
91+
92+
async def fetch_chunk(chunk: list[str]) -> dict:
93+
async with semaphore:
94+
params = {
95+
"ids": ",".join(chunk),
96+
"vs_currencies": batch.vs_currency.value,
97+
"include_platform": True,
98+
}
99+
async with self._create_client() as client:
100+
response = await client.get(
101+
f"{self.base_url}/simple/price", params=params
102+
)
103+
response.raise_for_status()
104+
return response.json()
89105

90-
coingecko_responses = []
91-
async with self._create_client() as client:
92-
response = await client.get(f"{self.base_url}/simple/price", params=params)
93-
response.raise_for_status()
94-
data = response.json()
106+
chunk_results = await asyncio.gather(
107+
*[fetch_chunk(chunk) for chunk in id_chunks], return_exceptions=True
108+
)
95109

96-
for request in batch_to_fetch.requests:
97-
if (
98-
id := await self._get_coingecko_id_from_request(
99-
request, platform_map, coin_map
100-
)
101-
) not in data:
102-
continue
103-
104-
try:
105-
item = TokenPriceResponse(
106-
**request.model_dump(),
107-
vs_currency=batch.vs_currency,
108-
price=float(data[id][batch.vs_currency.value]),
109-
cache_status=CacheStatus.MISS,
110-
)
111-
except (KeyError, ValueError):
112-
continue
110+
# Combine results from all chunks
111+
combined_data = {}
112+
for result in chunk_results:
113+
if isinstance(result, Exception):
114+
continue
115+
combined_data.update(result)
116+
117+
coingecko_responses = []
118+
for request in batch_to_fetch.requests:
119+
if (
120+
id := await self._get_coingecko_id_from_request(
121+
request, platform_map, coin_map
122+
)
123+
) not in combined_data:
124+
continue
125+
126+
try:
127+
item = TokenPriceResponse(
128+
**request.model_dump(),
129+
vs_currency=batch.vs_currency,
130+
price=float(combined_data[id][batch.vs_currency.value]),
131+
cache_status=CacheStatus.MISS,
132+
)
133+
except (KeyError, ValueError):
134+
continue
113135

114-
coingecko_responses.append(item)
136+
coingecko_responses.append(item)
115137

116138
await TokenPriceCache.set(coingecko_responses)
117139
results.extend(coingecko_responses)

app/api/pricing/constants.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# CoinGecko API constants
2+
COINGECKO_CHUNK_SIZE = 100
3+
COINGECKO_MAX_CONCURRENT_REQUESTS = 4
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import pytest
2+
from unittest.mock import AsyncMock, patch
3+
4+
from app.api.common.models import ChainId, CoinType
5+
from app.api.pricing.coingecko import CoinGeckoClient
6+
from app.api.pricing.models import BatchTokenPriceRequests, TokenPriceRequest
7+
8+
9+
@pytest.fixture
10+
def client():
11+
return CoinGeckoClient()
12+
13+
14+
@pytest.fixture
15+
def mock_httpx_client():
16+
with patch("httpx.AsyncClient") as mock:
17+
mock_client = AsyncMock()
18+
mock.return_value.__aenter__.return_value = mock_client
19+
yield mock_client
20+
21+
22+
@pytest.mark.asyncio
23+
async def test_get_prices_chunking(client, mock_httpx_client):
24+
# Create a batch with 7 requests (should create 3 chunks: 3, 3, 1)
25+
requests = [
26+
TokenPriceRequest(
27+
chain_id=ChainId.ETHEREUM,
28+
address=f"0x{i}",
29+
coin_type=CoinType.ETH,
30+
)
31+
for i in range(7)
32+
]
33+
batch = BatchTokenPriceRequests(requests=requests, vs_currency="usd")
34+
35+
with (
36+
patch("app.api.pricing.coingecko.TokenPriceCache.get") as mock_cache,
37+
patch("app.api.pricing.coingecko.TokenPriceCache.set", new_callable=AsyncMock),
38+
patch.object(client, "get_platform_map") as mock_platform_map,
39+
patch.object(client, "get_coin_map") as mock_coin_map,
40+
patch("app.api.pricing.coingecko.COINGECKO_CHUNK_SIZE", 3),
41+
):
42+
mock_cache.return_value = ([], batch)
43+
44+
mock_platform_map.return_value = {"ethereum": {"chain_id": "0x1"}}
45+
mock_coin_map.return_value = {"0x1": {f"0x{i}": f"token{i}" for i in range(7)}}
46+
47+
# Mock the HTTP response
48+
mock_response = AsyncMock()
49+
mock_response.json = lambda: {f"token{i}": {"usd": 1.0} for i in range(7)}
50+
mock_response.raise_for_status = lambda: None
51+
mock_httpx_client.get.return_value = mock_response
52+
53+
# Call get_prices
54+
results = await client.get_prices(batch)
55+
56+
# Verify the number of HTTP requests made (should be 3 chunks)
57+
assert mock_httpx_client.get.call_count == 3
58+
59+
# Verify the results
60+
assert len(results) == 7
61+
for result in results:
62+
assert result.price == 1.0
63+
assert result.cache_status == "MISS"
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import pytest
2+
3+
from app.api.pricing.utils import chunk_sequence
4+
5+
6+
def test_chunk_sequence_empty():
7+
result = chunk_sequence([], 3)
8+
assert result == []
9+
10+
11+
def test_chunk_sequence_exact_chunks():
12+
sequence = [1, 2, 3, 4, 5, 6]
13+
result = chunk_sequence(sequence, 3)
14+
assert result == [[1, 2, 3], [4, 5, 6]]
15+
16+
17+
def test_chunk_sequence_with_remainder():
18+
sequence = [1, 2, 3, 4, 5]
19+
result = chunk_sequence(sequence, 3)
20+
assert result == [[1, 2, 3], [4, 5]]
21+
22+
23+
def test_chunk_sequence_single_chunk():
24+
sequence = [1, 2]
25+
result = chunk_sequence(sequence, 3)
26+
assert result == [[1, 2]]
27+
28+
29+
def test_chunk_sequence_different_types():
30+
sequence = ["a", 1, True, "b", 2, False]
31+
result = chunk_sequence(sequence, 2)
32+
assert result == [["a", 1], [True, "b"], [2, False]]
33+
34+
35+
def test_chunk_sequence_invalid_chunk_size():
36+
with pytest.raises(ValueError):
37+
chunk_sequence([1, 2, 3], 0)
38+
with pytest.raises(ValueError):
39+
chunk_sequence([1, 2, 3], -1)

app/api/pricing/utils.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
from typing import TypeVar, Sequence
2+
3+
T = TypeVar("T")
4+
5+
6+
def chunk_sequence(sequence: Sequence[T], chunk_size: int) -> list[list[T]]:
7+
"""
8+
Split a sequence into chunks of specified size.
9+
10+
Args:
11+
sequence: The sequence to split into chunks
12+
chunk_size: The size of each chunk
13+
14+
Returns:
15+
A list of chunks, where each chunk is a list of items from the original sequence
16+
17+
Raises:
18+
ValueError: If chunk_size is less than or equal to 0
19+
"""
20+
if chunk_size <= 0:
21+
raise ValueError("chunk_size must be greater than 0")
22+
23+
chunks = [list(chunk) for chunk in zip(*[iter(sequence)] * chunk_size)]
24+
if len(sequence) % chunk_size != 0:
25+
chunks.append(list(sequence)[-(len(sequence) % chunk_size) :])
26+
return chunks

0 commit comments

Comments
 (0)