How to track the time execution of different langchain components without Langsmith. #26140
Unanswered
levalencia
asked this question in
Q&A
Replies: 1 comment 2 replies
-
To track the execution time of different LangChain components without using Langsmith, you can use Python's Example CodeBelow is an example of how you can modify your existing code to include timing for each component: import time
import logging
async def policy_gpt_chain_lcel(prompt, username, inputs, outputs, access_token_keyvault, access_token_openai, access_token_search):
async def llm_thread_gpt_lcel(prompt, username, inputs, outputs):
try:
# Start timing for the entire process
start_time = time.time()
# Initialize AzureChatOpenAI
SHD_AZURE_OPENAI_ENDPOINT = config.SHD_AZURE_OPENAI_ENDPOINT(access_token_keyvault)
SHD_OPENAI_DEPLOYMENT_NAME = config.SHD_OPENAI_DEPLOYMENT_NAME(access_token_keyvault)
SHD_OPENAI_GPT_MODEL_NAME = config.SHD_OPENAI_GPT_MODEL_NAME(access_token_keyvault)
llm = AzureChatOpenAI(
azure_endpoint=SHD_AZURE_OPENAI_ENDPOINT,
openai_api_version="2023-03-15-preview",
deployment_name=SHD_OPENAI_DEPLOYMENT_NAME,
azure_ad_token=access_token_openai,
openai_api_type="Azure",
model_name=SHD_OPENAI_GPT_MODEL_NAME,
streaming=True, # Set ChainStreamHandler as callback
temperature=0,
timeout=300,
max_retries=10
)
# Measure time to first token
tft_start_time = time.time()
# Initialize embeddings and vector store
embeddings = setup_embeddings(access_token_keyvault, access_token_openai)
vector_store = setup_vector_store(embeddings, access_token_keyvault, access_token_search)
retriever = ArgenxUserRetrieverPolicyLCEL(vectorstore=vector_store, username=username)
# Measure retriever initialization time
retriever_time = time.time() - tft_start_time
# Initialize chat history
demo_ephemeral_chat_history = ChatMessageHistory()
for i in range(len(inputs)):
demo_ephemeral_chat_history.add_user_message(inputs[i])
demo_ephemeral_chat_history.add_ai_message(outputs[i])
# Initialize prompts and chains
contextualize_q_system_prompt = (
"Given a chat history and the latest user question "
"which might reference context in the chat history, "
"formulate a standalone question which can be understood "
"without the chat history. Do NOT answer the question, "
"just reformulate it if needed and otherwise return it as is."
)
contextualize_q_prompt = ChatPromptTemplate.from_messages(
[
("system", contextualize_q_system_prompt),
MessagesPlaceholder("chat_history"),
("human", "{input} Please answer in HTML format except for the SOURCES: section and make sure to always start your citations in the answer with the number [1]"),
]
)
history_aware_retriever = create_history_aware_retriever(llm, retriever, contextualize_q_prompt)
qa_prompt = ChatPromptTemplate.from_messages(
[
("system", ANSWER_PROMPT_POLICY),
MessagesPlaceholder("chat_history"),
("human", "{input} Please answer in HTML format except for the SOURCES: section and make sure to always start your citations in the answer with the number [1]"),
]
)
# Define your legacy DOCUMENT_PROMPT
document_prompt = PromptTemplate(
input_variables=["page_content", "source"],
template="{page_content} (Source: {source})"
)
question_answer_chain = create_stuff_documents_chain(llm, qa_prompt, document_prompt=document_prompt, document_variable_name="context") | JsonOutputParser()
rag_chain_from_docs = create_retrieval_chain(history_aware_retriever, question_answer_chain)
conversational_rag_chain = RunnableWithMessageHistory(
rag_chain_from_docs,
lambda session_id: demo_ephemeral_chat_history,
input_messages_key="input",
history_messages_key="chat_history",
output_messages_key="answer",
)
# Measure chain initialization time
chain_init_time = time.time() - retriever_time - tft_start_time
context_yielded = False
accumulated_chunk = ""
sourcing_detected = False
context_detected = False
async for event in conversational_rag_chain.astream_events({"input": prompt}, config={"configurable": {"session_id": "unbound"}}, version="v2"):
kind = event["event"]
if kind == "on_parser_stream":
if context_detected:
chunk_value = event["data"]["chunk"].replace("\n", "\\n")
accumulated_chunk += chunk_value
if len(accumulated_chunk) < 16:
continue
if sourcing_detected:
if "|||" in accumulated_chunk:
pipes_index = accumulated_chunk.index("|||")
content_after_pipes = accumulated_chunk[pipes_index+3:]
yield f'sources: {content_after_pipes} [END]'.encode()
else:
yield f'sources: {accumulated_chunk} [END]'.encode()
accumulated_chunk = ""
else:
while len(accumulated_chunk) > 16:
split_index = min(16, len(accumulated_chunk))
if "|||" in accumulated_chunk:
pipes_index = accumulated_chunk.index("|||")
content_before_pipes = accumulated_chunk[:pipes_index]
yield f'answer: {content_before_pipes} [END]'.encode()
yield 'endanswer: endanswer [END]'.encode()
sourcing_detected = True
break
else:
if (accumulated_chunk[:split_index].endswith("\\") or
accumulated_chunk[:split_index].endswith("\\n") or
accumulated_chunk[:split_index].endswith("\\\\") or
accumulated_chunk[:split_index].endswith("\\n\\") or
accumulated_chunk[:split_index].endswith("\\n\\n")):
if len(accumulated_chunk) > split_index:
split_index += 1
while (split_index < len(accumulated_chunk) and
(accumulated_chunk[:split_index].endswith("\\") or
accumulated_chunk[:split_index].endswith("\\n") or
accumulated_chunk[:split_index].endswith("\\\\"))):
split_index += 1
if not accumulated_chunk.endswith("|") and not accumulated_chunk.endswith("||"):
chunk_to_yield = accumulated_chunk[:split_index]
yield f'answer: {chunk_to_yield} [END]'.encode()
accumulated_chunk = accumulated_chunk[split_index:]
else:
logging.info('chunk ends with | or ||, accumulate it with next')
break
elif kind == "on_chain_end" and not context_yielded:
output_data = event['data'].get('output')
from langchain.schema import Document
if isinstance(output_data, list):
context_list = []
for document in output_data:
if isinstance(document, Document):
context_data = document.metadata
context_list.append(context_data)
if context_list:
context_detected = True
context_yielded = True
yield f'data: {{"context": "{context_list}"}} \n\n'.encode()
if accumulated_chunk:
if sourcing_detected:
yield f'sources: {accumulated_chunk} [END] '.encode()
yield 'endsources: endsources [END]'.encode()
else:
yield f'answer: {accumulated_chunk} [END]'.encode()
yield 'endanswer: endanswer [END]'.encode()
else:
if sourcing_detected:
yield 'endsources: endsources [END]'.encode()
else:
yield 'endanswer: endanswer [END]'.encode()
# Measure total execution time
total_time = time.time() - start_time
logging.info(f"Total execution time: {total_time} seconds")
logging.info(f"Retriever initialization time: {retriever_time} seconds")
logging.info(f"Chain initialization time: {chain_init_time} seconds")
logging.info(f"Time to first token: {tft_start_time - start_time} seconds")
except Exception as e:
logging.error(f"An error occurred in llm_thread_gpt_lcel: {e}")
yield f"error: {e} [END]".encode()
try:
async for chunk in llm_thread_gpt_lcel(prompt, username, inputs, outputs):
yield chunk
except Exception as e:
logging.error(f"An error occurred in policy_gpt_chain_lcel: {e}")
yield f"error: {e}".encode() Explanation
By adding these timing measurements, you can track the execution time of different components in your LangChain application. |
Beta Was this translation helpful? Give feedback.
2 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
-
Checked other resources
Commit to Help
Example Code
Description
I want to measure how much time each component takes to execute
System Info
System Information
Package Information
Packages not installed (Not Necessarily a Problem)
The following packages were not found:
Beta Was this translation helpful? Give feedback.
All reactions