Skip to content

Commit d0d7c05

Browse files
authored
Try to fix async requests getting stuck (#466)
1 parent 89509d8 commit d0d7c05

File tree

2 files changed

+10
-1
lines changed

2 files changed

+10
-1
lines changed

model-engine/model_engine_server/core/celery/app.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -476,6 +476,11 @@ def _get_broker_endpoint_and_transport_options(
476476
# Going to try this with defaults first.
477477
out_broker_transport_options["region"] = os.environ.get("AWS_REGION", "us-west-2")
478478

479+
# changing wait_time_seconds from the default of 10 based on https://github.com/celery/celery/discussions/7283
480+
# goal is to prevent async requests from being stuck in pending when workers die; the hypothesis is that this is caused by SQS long polling
481+
out_broker_transport_options["wait_time_seconds"] = 0
482+
out_broker_transport_options["polling_interval"] = 5
483+
479484
# NOTE: The endpoints should ideally use predefined queues. However, the sender probably needs the flexibility
480485
# of not requiring predefined queues.
481486
# assert (

model-engine/model_engine_server/inference/forwarding/echo_server.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
"""
44
import argparse
55
import subprocess
6+
import time
67

78
from fastapi import FastAPI, Request
89
from fastapi.responses import JSONResponse
@@ -19,7 +20,10 @@ def healthcheck():
1920

2021
@app.post("/predict")
2122
async def predict(request: Request):
22-
return await request.json()
23+
dictionary = await request.json()
24+
if "delay" in dictionary:
25+
time.sleep(dictionary["delay"])
26+
return dictionary
2327

2428

2529
@app.post("/predict500")

0 commit comments

Comments
 (0)