Replies: 1 comment
-
To add an import asyncio
from typing import List, Optional
import aiohttp
import requests
from langchain_core.callbacks import (
AsyncCallbackManagerForRetrieverRun,
CallbackManagerForRetrieverRun,
)
from langchain_core.documents import Document
from langchain_core.retrievers import BaseRetriever
class ChatGPTPluginRetriever(BaseRetriever):
"""`ChatGPT plugin` retriever."""
url: str
"""URL of the ChatGPT plugin."""
bearer_token: str
"""Bearer token for the ChatGPT plugin."""
top_k: int = 3
"""Number of documents to return."""
filter: Optional[dict] = None
"""Filter to apply to the results."""
aiosession: Optional[aiohttp.ClientSession] = None
"""Aiohttp session to use for requests."""
class Config:
arbitrary_types_allowed = True
def _get_relevant_documents(
self, query: str, *, run_manager: CallbackManagerForRetrieverRun
) -> List[Document]:
url, json, headers = self._create_request(query)
response = requests.post(url, json=json, headers=headers)
results = response.json()["results"][0]["results"]
docs = []
for d in results:
content = d.pop("text")
metadata = d.pop("metadata", d)
if metadata.get("source_id"):
metadata["source"] = metadata.pop("source_id")
docs.append(Document(page_content=content, metadata=metadata))
return docs
async def _aget_relevant_documents(
self, query: str, *, run_manager: AsyncCallbackManagerForRetrieverRun
) -> List[Document]:
url, json, headers = self._create_request(query)
if not self.aiosession:
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, json=json) as response:
res = await response.json()
else:
async with self.aiosession.post(
url, headers=headers, json=json
) as response:
res = await response.json()
results = res["results"][0]["results"]
docs = []
for d in results:
content = d.pop("text")
metadata = d.pop("metadata", d)
if metadata.get("source_id"):
metadata["source"] = metadata.pop("source_id")
docs.append(Document(page_content=content, metadata=metadata))
return docs
def _create_request(self, query: str) -> tuple[str, dict, dict]:
url = f"{self.url}/query"
json = {
"queries": [
{
"query": query,
"filter": self.filter,
"top_k": self.top_k,
}
]
}
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.bearer_token}",
}
return url, json, headers This method logs the retrieved documents and updates the metrics accordingly. It also formats the documents' metadata and logs the response to the callback manager. Additionally, the async def on_retriever_end(
self,
documents: Sequence[Document],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
**kwargs: Any,
) -> None:
retrieval_run = self._complete_retrieval_run(
documents=documents,
run_id=run_id,
**kwargs,
)
tasks = [self._end_trace(retrieval_run), self._on_retriever_end(retrieval_run)]
await asyncio.gather(*tasks) This method processes the retrieved documents and completes the retrieval run, ensuring that all necessary callbacks are executed [1][2]. |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
Checked other resources
Commit to Help
Example Code
Description
Hi everyone,
I'm currently working with LangChain and trying to figure out how to add a callback to retrieve documents once the retriever finishes. In LangChain JS, I can do something like above code snippet.
I would like to implement something similar in LangChain Python, but I'm not sure how to set up the on_retriever_end equivalent callback for retrieving documents.
Does anyone have an example or a suggested way to handle this in Python?
Thanks for the help!
System Info
langchain==0.2.14
langchain-anthropic==0.1.23
langchain-cohere==0.2.3
langchain-community==0.2.12
langchain-core==0.2.35
langchain-experimental==0.0.64
langchain-google-genai==1.0.10
langchain-ollama==0.1.1
langchain-openai==0.1.19
langchain-text-splitters==0.2.2
langchainhub==0.1.21
langfuse==2.44.1
langgraph==0.1.8
langsmith==0.1.106
Beta Was this translation helpful? Give feedback.
All reactions