langgraph streaming vs llm streaming #30284
Replies: 1 comment 9 replies
-
I found a similar open issue related to streaming responses using FastAPI with LangChain. The suggested solution involves setting up a streaming response using Server-Sent Events (SSE) with FastAPI. Here's a brief overview of the approach:
Here's a simplified example of how you might implement this: from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import asyncio
app = FastAPI()
async def stream_messages():
while True:
yield "data: Some message\n\n"
await asyncio.sleep(1)
@app.get("/stream")
async def stream(request: Request):
async def event_generator():
async for message in stream_messages():
if await request.is_disconnected():
break
yield message
return StreamingResponse(event_generator(), media_type="text/event-stream") To integrate this with your LangGraph setup, you can modify the If you're experiencing issues with streaming in FastAPI's Swagger UI but not in the terminal, ensure that your FastAPI setup is correctly configured to handle streaming responses and that the client (Swagger UI) supports SSE. To continue talking to Dosu, mention @dosu. Help Dosu learn! Give it feedback: Great Response | Irrelevant Answer | Incorrect Sources | Too Verbose | Hallucination | Bug Report | Other |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
@dosu
i build a langgraph
where i `class ProductState(TypedDict):
question: str
k: int # Dynamic number of documents to retrieve.
context: List[Document] # Holds up to k retrieved product documents.
final_ids_output: str # The final output after generating the recommendation.
loop_step: int
-------------------------------------------------------------------
4) Retrieval: Retrieve k Products in One Call
-------------------------------------------------------------------
def retrieve_products(state: ProductState, faiss_store) -> ProductState:
"""
Uses the FAISS store retriever to fetch up to k product documents.
"""
try:
k = state.get("k", 200) # Use the dynamic k value (default to 200)
retriever = faiss_store.as_retriever(search_kwargs={'k': k})
docs = retriever.invoke(state["question"])
logger.info(f"Retrieved {len(docs)} product documents with k={k}.")
return {"context": docs}
except Exception as e:
logger.error(f"Error retrieving {k} product docs: {e}", exc_info=True)
return {"context": []}
-------------------------------------------------------------------
5) Generate Recommendation and List Retrieved Product IDs
-------------------------------------------------------------------
def generate_recommendation(state: ProductState, llm) -> ProductState:
docs = state.get("context", [])
if not docs:
logger.warning("No product documents retrieved for recommendation generation.")
state["final_ids_output"] = "No relevant product IDs found."
return state
-------------------------------------------------------------------
6) Build LangGraph Workflow
-------------------------------------------------------------------
def build_product_graph(faiss_store, llm):
"""
Build a LangGraph pipeline with two nodes:
1. Retrieve k product documents.
2. Generate a recommendation and list the retrieved product IDs.
"""
graph_builder = StateGraph(ProductState)
graph_builder.add_node("retrieve", lambda s: retrieve_products(s, faiss_store))
graph_builder.add_node("generate_recommendation", lambda s: generate_recommendation(s, llm))
graph_builder.add_edge(START, "retrieve")
graph_builder.add_edge("retrieve", "generate_recommendation")
compiled_graph = graph_builder.compile()
logger.info("LangGraph product pipeline compiled successfully.")
return compiled_graph`
Beta Was this translation helpful? Give feedback.
All reactions