Skip to content

Commit a4df7af

Browse files
feat(socks): per-connection per-channel subscription limits (#3131)
- Added new configuration options in config.ts for maximum allowed subscriptions per connection by channel - Updated the subscribe flow to enforce these limits: When exceeded, emit an error message to the client and increment a new Datadog metric: subscriptions_limit_reached with tags {channel, instance} - Added corresponding unit tests verifying correct limit enforcement and error messaging behavior - Added Python test script to stress test subscription limits
1 parent c0f39ed commit a4df7af

File tree

4 files changed

+358
-12
lines changed

4 files changed

+358
-12
lines changed

indexer/services/socks/__tests__/lib/subscriptions.test.ts

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ describe('Subscriptions', () => {
3131
let mockWs: WebSocket;
3232
let sendMessageMock: jest.Mock;
3333
let sendMessageStringMock: jest.Mock;
34+
let decrementSubscriptionsSpy: jest.SpyInstance;
35+
let incrementSubscriptionsSpy: jest.SpyInstance;
36+
let removeSubscriptionsSpy: jest.SpyInstance;
3437
let rateLimiterSpy: jest.SpyInstance;
3538
let axiosRequestMock: jest.Mock;
3639

@@ -116,6 +119,9 @@ describe('Subscriptions', () => {
116119
mockWs = new WebSocket(null);
117120
sendMessageMock = (sendMessage as jest.Mock);
118121
sendMessageStringMock = (sendMessageString as jest.Mock);
122+
decrementSubscriptionsSpy = jest.spyOn(Subscriptions.prototype, 'decrementSubscriptions');
123+
incrementSubscriptionsSpy = jest.spyOn(Subscriptions.prototype, 'incrementSubscriptions');
124+
removeSubscriptionsSpy = jest.spyOn(Subscriptions.prototype, 'removeSubscriptions');
119125
rateLimiterSpy = jest.spyOn(RateLimiter.prototype, 'rateLimit');
120126
axiosRequestMock = (axiosRequest as jest.Mock);
121127
axiosRequestMock.mockClear();
@@ -124,6 +130,10 @@ describe('Subscriptions', () => {
124130

125131
afterEach(() => {
126132
jest.useRealTimers();
133+
decrementSubscriptionsSpy.mockRestore();
134+
incrementSubscriptionsSpy.mockRestore();
135+
removeSubscriptionsSpy.mockRestore();
136+
rateLimiterSpy.mockRestore();
127137
});
128138

129139
describe('subscribe', () => {
@@ -227,6 +237,80 @@ describe('Subscriptions', () => {
227237
).rejects.toEqual(new Error(`Invalid channel: ${invalidChannel}`));
228238
});
229239

240+
it('sends error message if v4_accounts channel subscription limit exceeded', async () => {
241+
const limit = config.V4_ACCOUNTS_CHANNEL_LIMIT;
242+
incrementSubscriptionsSpy.mockImplementation(() => limit + 1);
243+
await subscriptions.subscribe(
244+
mockWs,
245+
Channel.V4_ACCOUNTS,
246+
connectionId,
247+
initialMsgId,
248+
mockSubaccountId,
249+
false,
250+
);
251+
252+
expect(incrementSubscriptionsSpy).toHaveBeenCalledTimes(1);
253+
expect(incrementSubscriptionsSpy).toHaveBeenCalledWith(Channel.V4_ACCOUNTS, connectionId);
254+
255+
expect(sendMessageMock).toHaveBeenCalledTimes(1);
256+
expect(sendMessageMock).toHaveBeenCalledWith(
257+
mockWs,
258+
connectionId,
259+
expect.objectContaining({
260+
message: expect.stringContaining(
261+
`Per-connection subscription limit reached for ${Channel.V4_ACCOUNTS} (limit=${limit}).`,
262+
),
263+
}),
264+
);
265+
});
266+
267+
it('sends error message if v4_accounts channel subscription limit exceeded by inflight requests', async () => {
268+
const limit = config.V4_ACCOUNTS_CHANNEL_LIMIT;
269+
incrementSubscriptionsSpy.mockReturnValueOnce(limit);
270+
incrementSubscriptionsSpy.mockReturnValueOnce(limit + 1);
271+
incrementSubscriptionsSpy.mockReturnValueOnce(limit + 2);
272+
await Promise.all([
273+
subscriptions.subscribe(
274+
mockWs,
275+
Channel.V4_ACCOUNTS,
276+
connectionId,
277+
initialMsgId,
278+
mockSubaccountId,
279+
false,
280+
),
281+
subscriptions.subscribe(
282+
mockWs,
283+
Channel.V4_ACCOUNTS,
284+
connectionId,
285+
initialMsgId + 1,
286+
mockSubaccountId1,
287+
false,
288+
),
289+
subscriptions.subscribe(
290+
mockWs,
291+
Channel.V4_ACCOUNTS,
292+
connectionId,
293+
initialMsgId + 2,
294+
mockSubaccountId1,
295+
false,
296+
),
297+
]);
298+
299+
expect(sendMessageMock).toHaveBeenCalledTimes(2);
300+
expect(sendMessageMock).toHaveBeenCalledWith(
301+
mockWs,
302+
connectionId,
303+
expect.objectContaining({
304+
message: expect.stringContaining(
305+
`Per-connection subscription limit reached for ${Channel.V4_ACCOUNTS} (limit=${limit}).`,
306+
),
307+
}),
308+
);
309+
310+
expect(decrementSubscriptionsSpy).toHaveBeenCalledTimes(2);
311+
expect(decrementSubscriptionsSpy).toHaveBeenCalledWith(Channel.V4_ACCOUNTS, connectionId);
312+
});
313+
230314
it('sends error message if rate limit exceeded', async () => {
231315
rateLimiterSpy.mockImplementation(() => 1);
232316
await subscriptions.subscribe(
@@ -421,8 +505,11 @@ describe('Subscriptions', () => {
421505

422506
subscriptions.remove(connectionId);
423507

508+
expect(removeSubscriptionsSpy).toHaveBeenCalledWith(connectionId);
509+
424510
for (const channel of Object.values(Channel)) {
425511
expect(subscriptions.subscriptions[channel][singleIds[channel]]).toHaveLength(0);
512+
expect(subscriptions.subsByChannelByConnectionId[channel][connectionId]).toBe(undefined);
426513
}
427514
expect(subscriptions.subscriptionLists[connectionId]).toBeUndefined();
428515
});
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
# /// script
2+
# requires-python = ">=3.10"
3+
# dependencies = [
4+
# "requests>=2.31.0",
5+
# "websockets>=12.0",
6+
# ]
7+
# ///
8+
"""
9+
Async script: fetch markets metadata, then subscribe to every market on the chosen channel.
10+
If a limit error is returned, log how many subscriptions succeeded and exit.
11+
"""
12+
13+
import asyncio
14+
import json
15+
import logging
16+
import re
17+
import sys
18+
19+
import requests
20+
import websockets
21+
22+
MARKETS_URL = "https://indexer.v4mainnet.dydx.exchange/v4/perpetualMarkets"
23+
WS_URL = "wss://indexer.v4mainnet.dydx.exchange/v4/ws"
24+
25+
logging.basicConfig(
26+
level=logging.INFO,
27+
format="%(asctime)s %(levelname)s %(message)s",
28+
datefmt="%H:%M:%S",
29+
)
30+
log = logging.getLogger("dydx-limit-test")
31+
32+
33+
def fetch_tickers(markets_url: str) -> list[str]:
34+
"""Fetch markets and return list of ticker strings.
35+
Handles pure JSON or HTML-with-<pre> wrapper.
36+
"""
37+
resp = requests.get(markets_url, timeout=20)
38+
try:
39+
data = resp.json()
40+
except ValueError:
41+
# Fallback: extract JSON from <pre>...</pre> if server wrapped it
42+
m = re.search(r"<pre>(\{.*\})</pre>", resp.text, flags=re.S | re.I)
43+
if not m:
44+
raise
45+
data = json.loads(m.group(1))
46+
markets = data.get("markets") or {}
47+
# ticker is the id used for subscriptions
48+
tickers = [str(v["ticker"]) for v in markets.values() if "ticker" in v]
49+
# Deduplicate and sort for stable order
50+
tickers = sorted(set(tickers))
51+
if not tickers:
52+
raise RuntimeError("No tickers found from metadata")
53+
return tickers
54+
55+
56+
async def subscribe_until_limit(
57+
ws_url: str,
58+
channel: str,
59+
tickers: list[str],
60+
number: int | None,
61+
subscribe_delay: float,
62+
gather_delay: float,
63+
) -> None:
64+
"""Subscribe to each ticker on the given channel until a limit error occurs.
65+
On limit error: log count and exit(0).
66+
"""
67+
stop_event = asyncio.Event()
68+
successful: set[str] = set()
69+
70+
async with websockets.connect(ws_url, max_queue=None, ping_interval=20) as ws:
71+
# Receiver: track success and detect limit errors
72+
async def recv_loop():
73+
try:
74+
while not stop_event.is_set():
75+
raw = await ws.recv()
76+
msg = json.loads(raw)
77+
mtype = msg.get("type", "")
78+
if mtype == "error":
79+
# Detect any limit-related error
80+
# Match common fields/messages without relying on exact code
81+
text = json.dumps(msg).lower()
82+
if "limit" in text or "too many" in text:
83+
log.error("Limit error received: %s", msg)
84+
stop_event.set()
85+
return
86+
else:
87+
log.warning("Non-limit error: %s", msg)
88+
else:
89+
# Count a subscription as successful when we first see either a 'subscribed'
90+
# ack or any first data message for that channel+id.
91+
if msg.get("channel") == channel:
92+
msg_id = msg.get("id")
93+
if msg_id is not None:
94+
mid = str(msg_id)
95+
if mid not in successful:
96+
# Accept either explicit ack ('subscribed') or first data
97+
successful.add(mid)
98+
except websockets.ConnectionClosedOK:
99+
pass
100+
except websockets.ConnectionClosedError as e:
101+
log.error("WebSocket closed with error: %s", e)
102+
stop_event.set()
103+
104+
# Sender: sequentially subscribe to each market id until a limit error is hit
105+
async def send_loop():
106+
try:
107+
for ticker in tickers[slice(0, number)]:
108+
if stop_event.is_set():
109+
break
110+
sub = {"type": "subscribe", "channel": channel, "id": ticker}
111+
await ws.send(json.dumps(sub))
112+
# Gentle pacing to avoid instant flood; adjust if needed
113+
await asyncio.sleep(subscribe_delay)
114+
finally:
115+
# Allow receiver to drain any final messages
116+
await asyncio.sleep(gather_delay)
117+
stop_event.set()
118+
119+
recv_task = asyncio.create_task(recv_loop())
120+
send_task = asyncio.create_task(send_loop())
121+
122+
await stop_event.wait()
123+
# Cancel outstanding tasks and close
124+
recv_task.cancel()
125+
send_task.cancel()
126+
# Report result and exit if this stop was caused by a limit error
127+
count = len(successful)
128+
log.info(
129+
"Channel '%s' subscriptions succeeded before limit: %d", channel, count
130+
)
131+
# Exit immediately per requirement
132+
sys.exit(0)
133+
134+
135+
def parse_args():
136+
import argparse
137+
138+
parser = argparse.ArgumentParser(description="dYdX v4 WS limit tester")
139+
parser.add_argument(
140+
"--channel",
141+
default="v4_orderbook",
142+
choices=["v4_orderbook", "v4_trades"],
143+
help="Channel to subscribe on (default: v4_orderbook)",
144+
)
145+
parser.add_argument(
146+
"--number",
147+
default=None,
148+
type=int,
149+
help="Number of markets to subscribe to (default: all)",
150+
)
151+
parser.add_argument(
152+
"--markets-url",
153+
default=MARKETS_URL,
154+
help=f"URL to fetch markets from (default: {MARKETS_URL})",
155+
)
156+
parser.add_argument(
157+
"--ws-url",
158+
default=WS_URL,
159+
help=f"Endpoint to connect to (default: {WS_URL})",
160+
)
161+
parser.add_argument(
162+
"--subscribe-delay",
163+
type=float,
164+
default=0.1,
165+
help="Delay between subscription requests (s) (default: 0.1s)",
166+
)
167+
parser.add_argument(
168+
"--gather-delay",
169+
type=float,
170+
default=1,
171+
help="Wait following subscription requests to collect responses (s) (default: 1s)",
172+
)
173+
return parser.parse_args()
174+
175+
176+
def main():
177+
args = parse_args()
178+
tickers = fetch_tickers(args.markets_url)
179+
log.info("Fetched %d tickers", len(tickers))
180+
log.info(
181+
f"Subscribing to {len(tickers) if args.number is None else args.number} markets",
182+
)
183+
asyncio.run(
184+
subscribe_until_limit(
185+
args.ws_url,
186+
args.channel,
187+
tickers,
188+
args.number,
189+
args.subscribe_delay,
190+
args.gather_delay,
191+
)
192+
)
193+
194+
195+
if __name__ == "__main__":
196+
main()

indexer/services/socks/src/config.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,14 @@ export const configSchema = {
5656

5757
// Metrics
5858
LARGEST_SUBSCRIBER_METRIC_INTERVAL_MS: parseInteger({ default: 60 * 1000 }), // 1 minute
59+
60+
// Per-Channel Limits
61+
V4_ACCOUNTS_CHANNEL_LIMIT: parseInteger({ default: 256 }),
62+
V4_CANDLES_CHANNEL_LIMIT: parseInteger({ default: 32 }),
63+
V4_MARKETS_CHANNEL_LIMIT: parseInteger({ default: 32 }),
64+
V4_ORDERBOOK_CHANNEL_LIMIT: parseInteger({ default: 32 }),
65+
V4_PARENT_ACCOUNTS_CHANNEL_LIMIT: parseInteger({ default: 256 }),
66+
V4_TRADES_CHANNEL_LIMIT: parseInteger({ default: 32 }),
5967
};
6068

6169
////////////////////////////////////////////////////////////////////////////////

0 commit comments

Comments
 (0)