ASGI callable returned without completing response. #25615
Replies: 1 comment
-
To resolve the "ASGI callable returned without completing response" error in your QUART REST API running on Azure App Service with LangChain, ensure that your asynchronous functions are properly awaited and that the response is fully completed before returning. This error often occurs when an asynchronous function is not awaited, causing the ASGI server to think the response is incomplete. Here are some steps to address this issue:
Here is an example of how you might structure your asynchronous function to ensure it completes properly: from quart import Quart, request, jsonify, Response
from langchain_community.chat_models.azure_openai import AzureChatOpenAI
import logging
import traceback
app = Quart(__name__)
# Initialize your Azure OpenAI client
openai_client = AzureChatOpenAI(
azure_deployment="your_deployment_name",
openai_api_version="2023-12-01-preview",
openai_api_key="your_openai_api_key",
azure_endpoint="your_azure_endpoint"
)
@app.route('/policy', methods=['POST'])
async def policy():
data = await request.get_json()
prompt = data.get("prompt", "What are the company values?")
chat_history = data.get("chat_history", [])
username = data.get("username", "")
inputs = [item.get('input') for item in chat_history][-5:]
outputs = [item.get('output') for item in chat_history][-5:]
access_token_keyvault = request.headers.get('Authorization-KeyVault', '').split(' ')[1]
access_token_openai = request.headers.get('Authorization-OpenAI', '').split(' ')[1]
access_token_search = request.headers.get('Authorization-Search', '').split(' ')[1]
instrumentation_key = config.SHD_INSTRUMENTATION_KEY(access_token_keyvault)
async def generate():
start_time = time.time()
try:
async for chunk in policy_gpt_chain_lcel(prompt, username, inputs, outputs, access_token_keyvault, access_token_openai, access_token_search):
yield chunk
except Exception as e:
logging.error(f"An error occurred on app.py: {e} {traceback.format_exc()}")
log_exception(instrumentation_key, e)
yield f"An error occurred: {e}".encode()
finally:
end_time = time.time()
execution_time = (end_time - start_time) * 1000 # Convert to milliseconds
log_custom_metric(instrumentation_key, 'policy_lcel_quart_execution_time', execution_time)
logging.info("Generator completed")
return Response(generate(), content_type='text/event-stream')
async def policy_gpt_chain_lcel(prompt, username, inputs, outputs, access_token_keyvault, access_token_openai, access_token_search):
async def llm_thread_gpt_lcel(prompt, username, inputs, outputs):
try:
SHD_AZURE_OPENAI_ENDPOINT = config.SHD_AZURE_OPENAI_ENDPOINT(access_token_keyvault)
SHD_OPENAI_DEPLOYMENT_NAME = config.SHD_OPENAI_DEPLOYMENT_NAME(access_token_keyvault)
SHD_OPENAI_GPT_MODEL_NAME = config.SHD_OPENAI_GPT_MODEL_NAME(access_token_keyvault)
llm = AzureChatOpenAI(
azure_endpoint=SHD_AZURE_OPENAI_ENDPOINT,
openai_api_version="2023-03-15-preview",
deployment_name=SHD_OPENAI_DEPLOYMENT_NAME,
azure_ad_token=access_token_openai,
openai_api_type="Azure",
model_name=SHD_OPENAI_GPT_MODEL_NAME,
streaming=True,
temperature=0,
timeout=None,
max_retries=3
)
embeddings = setup_embeddings(access_token_keyvault, access_token_openai)
vector_store = setup_vector_store(embeddings, access_token_keyvault, access_token_search)
retriever = ArgenxUserRetrieverPolicyLCEL(vectorstore=vector_store, username=username)
demo_ephemeral_chat_history = ChatMessageHistory()
for i in range(len(inputs)):
demo_ephemeral_chat_history.add_user_message(inputs[i])
demo_ephemeral_chat_history.add_ai_message(outputs[i])
contextualize_q_system_prompt = (
"Given a chat history and the latest user question "
"which might reference context in the chat history, "
"formulate a standalone question which can be understood "
"without the chat history. Do NOT answer the question, "
"just reformulate it if needed and otherwise return it as is."
)
contextualize_q_prompt = ChatPromptTemplate.from_messages(
[
("system", contextualize_q_system_prompt),
MessagesPlaceholder("chat_history"),
("human", "{input} Please answer in HTML format"),
]
)
history_aware_retriever = create_history_aware_retriever(llm, retriever, contextualize_q_prompt)
qa_prompt = ChatPromptTemplate.from_messages(
[
("system", ANSWER_PROMPT_POLICY),
MessagesPlaceholder("chat_history"),
("human", "{input} Please answer in HTML format"),
]
)
document_prompt = PromptTemplate(
input_variables=["page_content", "source"],
template="{page_content} (Source: {source})"
)
question_answer_chain = create_stuff_documents_chain(llm, qa_prompt, document_prompt=document_prompt, document_variable_name="context") | StrOutputParser()
rag_chain_from_docs = create_retrieval_chain(history_aware_retriever, question_answer_chain)
conversational_rag_chain = RunnableWithMessageHistory(
rag_chain_from_docs,
lambda session_id: demo_ephemeral_chat_history,
input_messages_key="input",
history_messages_key="chat_history",
output_messages_key="answer",
)
async for chunk in accumulate_and_yield(conversational_rag_chain.astream({"input": prompt}, config={"configurable": {"session_id": "unbound"}})):
yield chunk
except Exception as e:
logging.error(f"An error occurred in llm_thread_gpt_lcel: {e}")
yield f"An error occurred: {e}".encode()
try:
async for chunk in llm_thread_gpt_lcel(prompt, username, inputs, outputs):
yield chunk
except Exception as e:
logging.error(f"An error occurred in policy_gpt_chain_lcel: {e}")
yield f"An error occurred: {e}".encode()
async def accumulate_and_yield(chunks):
try:
accumulated_answer = ""
accumulated_context = ""
in_context = False
source_detected = False
sources_content = ""
partial_sources = ""
async for chunk in chunks:
chunk_str = str(chunk)
if '{\'context\': [Document(metadata' in chunk_str:
in_context = True
start_index = chunk_str.index('{\'context\': [Document(metadata')
accumulated_context = chunk_str[start_index:]
if ']}' in chunk_str[start_index:]:
end_index = chunk_str.index(']}', start_index) + 2
accumulated_context = chunk_str[start_index:end_index]
in_context = False
json_context = convert_to_json(accumulated_context)
yield json_context.encode()
accumulated_context = ""
continue
if in_context:
accumulated_context += chunk_str
if ']}' in chunk_str:
end_index = chunk_str.index(']}') + 2
accumulated_context = accumulated_context[:end_index]
in_context = False
json_context = json.dumps(accumulated_context)
yield json_context.encode()
accumulated_context = ""
continue
if "{'answer':" in chunk_str:
answer_part = parse_json_string(chunk_str)
if answer_part is None:
logging.error(f"Failed to parse JSON from chunk: {chunk_str}")
continue
if 'answer' not in answer_part:
logging.error(f"'answer' key not found in parsed JSON: {answer_part}")
continue
answer_part = answer_part['answer']
partial_sources += answer_part
if 'SOURCES:' in partial_sources:
source_detected = True
parts = partial_sources.split('SOURCES:', 1)
accumulated_answer += parts[0]
sources_content = parts[1]
partial_sources = ""
else:
if source_detected:
sources_content += answer_part
else:
accumulated_answer += answer_part
if len(accumulated_answer) >= 8 and not source_detected:
try:
json_answer = json.dumps({'answer': accumulated_answer})
yield json_answer.encode()
accumulated_answer = ""
except (TypeError, ValueError, json.JSONDecodeError) as e:
logging.error(f"Failed to process chunk: {chunk_str}")
logging.exception(e)
continue
if accumulated_answer and not source_detected:
try:
json_answer = json.dumps({'answer': accumulated_answer})
yield json_answer.encode()
except (TypeError, ValueError, json.JSONDecodeError) as e:
logging.error(f"Failed to process final accumulated answer: {accumulated_answer}")
logging.exception(e)
if source_detected:
try:
json_sources = json.dumps({'SOURCES': sources_content})
yield json_sources.encode()
except (TypeError, ValueError, json.JSONDecodeError) as e:
logging.error(f"Failed to process sources content: {sources_content}")
logging.exception(e)
except Exception as e:
logging.error(f"An error occurred in accumulate_and_yield: {e} {traceback.format_exc()}")
yield f"An error occurred: {e} {traceback.format_exc()} ".encode()
finally:
yield b"" Ensure that all asynchronous calls are properly awaited and the response is fully completed before returning. This should help resolve the "ASGI callable returned without completing response" error. If the issue persists, it might be related to specific configurations or network issues with AzureOpenAI, which would require further investigation [1][2][3][4][5]. |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Checked other resources
Commit to Help
Example Code
Description
this is a QUART REST API running on Azure App Service with Langchain, and we are getting this error
2024-08-19T09:35:25.738554889Z [2024-08-19 09:35:25 +0000] [90] [ERROR] ASGI callable returned without completing response.
2024-08-19T09:35:25.745152178Z fail: Middleware[0]
2024-08-19T09:35:25.745193679Z Failed to forward request to http://169.254.129.7:8000. Encountered a System.Net.Http.HttpIOException exception after 60115.444ms with message: The response ended prematurely. (ResponseEnded). Check application logs to verify the application is properly handling HTTP traffic.
2024-08-19T09:35:25.801323034Z fail: Microsoft.AspNetCore.Server.Kestrel[13]
2024-08-19T09:35:25.801365435Z Connection id "0HN605PIS1CH1", Request id "0HN605PIS1CH1:00000013": An unhandled exception was thrown by the application.
2024-08-19T09:35:25.801373535Z System.InvalidOperationException: StatusCode cannot be set because the response has already started.
2024-08-19T09:35:25.801379235Z at Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http.HttpProtocol.ThrowResponseAlreadyStartedException(String value)
2024-08-19T09:35:25.801386335Z at Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http.HttpProtocol.Microsoft.AspNetCore.Http.Features.IHttpResponseFeature.set_StatusCode(Int32 value)
2024-08-19T09:35:25.801393535Z at Microsoft.Azure.AppService.Middleware.Forwarding.RequestForwarder.OnRequest(HttpContext context) in /__w/1/s/src/EasyAuth/Middleware.Forwarding/RequestForwarder.cs:line 90
2024-08-19T09:35:25.801399535Z at Microsoft.Azure.AppService.Middleware.NetCore.AppServiceMiddleware.InvokeAsync(HttpContext context) in /__w/1/s/src/EasyAuth/Microsoft.Azure.AppService.Middleware.NetCore/AppServiceMiddleware.cs:line 153
2024-08-19T09:35:25.801404936Z at Microsoft.Azure.AppService.MiddlewareShim.AutoHealing.AutoHealingMiddleware.Invoke(HttpContext context) in /__w/1/s/src/EasyAuth/Middleware.Host/AutoHealing/AutoHealingMiddleware.cs:line 54
2024-08-19T09:35:25.801411136Z at Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http.HttpProtocol.ProcessRequests[TContext](IHttpApplication`1 application)
We are wondering if this error comes from our AzureOpenAI connection or the chains from lanchain:
ASGI callable returned without completing response.
and if so, how we can control it or know the exact error.
System Info
Beta Was this translation helpful? Give feedback.
All reactions