LangChain throws an exception when using with AzureCosmosNoSQLDB #25469
-
Checked other resources
Commit to Help
Example Code#Imports
_ONLY_CLEAR = False
from html import entities
import warnings
#warnings.filterwarnings('error')
import spacy
import os, sys, datetime, pathlib
from os import path
import fitz
import srsly
import langchain_openai
import pypdf
import tracemalloc
import openai
from openai import OpenAI, embeddings
from azure.cosmos import CosmosClient, PartitionKey, DatabaseProxy, container
import langchain, langchain_core, langchain_community
from langchain import chains, schema
from langchain_core.documents import Document
from langchain.schema import document as Document
from langchain.chains import summarize, retrieval_qa
from langchain.chains.retrieval_qa.base import RetrievalQA
from langchain.chains.summarize import load_summarize_chain, map_reduce_prompt, refine_prompts
from langchain.chains import mapreduce
from langchain.chains.combine_documents.map_reduce import MapReduceDocumentsChain
from langchain.chains.combine_documents.reduce import ReduceDocumentsChain
from langchain.chains.combine_documents.stuff import StuffDocumentsChain
from langchain.text_splitter import NLTKTextSplitter, RecursiveCharacterTextSplitter
from langchain_text_splitters import SpacyTextSplitter
from langchain.chains.llm import LLMChain
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain import hub
from langchain_community.document_loaders import PyPDFLoader, TextLoader
from langchain_community.embeddings import OpenAIEmbeddings
from langchain_community.vectorstores import AzureCosmosDBVectorSearch, azure_cosmos_db, AzureCosmosDBNoSqlVectorSearch
from langchain_community.llms import OpenAI
from langchain_openai import ChatOpenAI
'''Prompts'''
#Map
_MAP_PROMPT_TEMPLATE = '''Write a concise summary of the following chunk of text that includes the main points and any important details.
{text}
helpful answer:'''
# Reduce
_COMBINED_PROMPT_TEMPLATE = """Condense the following text into several key points of interest as required.
Return your response as separate lines of text, each covering a single key point.
{text}
helpful answer:"""
_CRITICAL_TERM_SYSTEM_PROMPT = {
'role' : 'system',
'content' : 'The AI assistant is very intelligent and helpful and will return precise answers as exact text without extra generative descriptions and rambling.',
}
_CRITICAL_TERM_USER_MESSAGE = '''Return the catchy critical words / phrases from the following TEXT separated by commas.
\nTEXT: {}\n'''
_CRITICAL_TERM_ASSISTANT_PROMPT = {
'role' : 'assistant',
'content' : '?',
}
api_key = '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'
nlp = spacy.load("en_core_web_lg")
# Set the API key for the OpenAI client
openai.api_key = api_key
if api_key is None:
raise ValueError("API key is not set. Please set the OPENAI_API_KEY environment variable.")
_CLIENT = openai.OpenAI(api_key=api_key)
#Add your open AI key to the system variable and access it.
_MODEL = 'gpt-4o'
_EMBEDDING_MODEL = 'text-embedding-ada-002'
_SPACY_MODEL = 'en_core_web_lg'
_LLM = ChatOpenAI(temperature=0, model_name= _MODEL, openai_api_key=api_key)
_COSMOS_URL = '''https://my-langchain.documents.azure.com:221/'''
_COSMOS_DB_NAME = '''my-langchain'''
_COSMOS_CONTAINER_NAME = '''langchain-data'''
_PARTITION_KEY = PartitionKey(path="/partitionKey")
_COSMOS_PRIMARY_KEY = '''cui1dGVxDmQIHyS5mS6zpCnZO5BXybZYPlGOipZHcGVFC6dThCWn7VL0BpmnMEHNY5uWloRcBecpACDbtEz1jw=='''
#DB policies.
_CONTAINER_POLICY = {
"vectorEmbeddings": [
{
"path" : "/*",
"dataType" : "float32",
"distanceFunction" : "cosine",
"dimensions": 1536,
}
]
}
_INDEX_POLICY = {
"indexingMode" : "consistent",
"automatic" : True,
"includedPaths" : [
{
"path" : "/*"
}
],
"vectorIndexes": [
{
"path": "/*",
"type": "flat"
}
]
}
_DB_PROPERTIES = {
'id' : _COSMOS_DB_NAME,
}
_CONTAINER_PROPERTIES = {
'id' : _COSMOS_CONTAINER_NAME,
'partition_key' : _PARTITION_KEY,
'indexingPolicy' : _INDEX_POLICY,
}
#DB declarations.
#First, initialize the azure Cosmos vector database.
_COSMOS_CLIENT = CosmosClient(_COSMOS_URL, _COSMOS_PRIMARY_KEY)
_COSMOS_DB = _COSMOS_CLIENT.create_database_if_not_exists(id = _COSMOS_DB_NAME)
_COSMOS_CONTAINER = _COSMOS_DB.create_container_if_not_exists(
id = _COSMOS_CONTAINER_NAME,
partition_key = _PARTITION_KEY,
indexing_policy = _INDEX_POLICY,
vector_embedding_policy = _CONTAINER_POLICY,
offer_throughput = 400
)
_TEXT_SPLITTER = RecursiveCharacterTextSplitter(chunk_size = 1024, chunk_overlap = 256)
_SRC_PATH = './Yuvaraj Res.pdf'
#_TMP_TEXT_FILE = Pathlib.Path('./Yuvaraj Res.txt')
#_METADATA_REFERENCE = 'Yuvaraj Res.pdf ; {}'
_QUESTIONS = [
'What technologies did the candidate work with?',
'What is the candidate\'s educational background?',
'How many years of experience does the candidate have in software development?',
'Has the candidate worked in any managerial roles?',
'What projects has the candidate been involved in?'
]
def main():
'''Main Entry'''
#Load the PDF document.
#Load the pdf document and read page by page.
pdf_document = fitz.open(_SRC_PATH)
#Iterate through pages.
for page_num in range(len(pdf_document)):
#Read the page
page =pdf_document.load_page(page_num)
page_text = page.get_text()
#First summarise the text
summary = summarise_text(page_text)
#Break the summary into separate docs.
lines = summary.split('\n')
#[print(line) for line in lines]
#Create the chunks, embed the chunks, and insert them in the database.
line_no = 0
chunk_no = 0
for line in lines:
chunks = split_and_chunk_text(line)
#print('\n', line)
for chunk in chunks:
#print(chunk)
#Generate the embeddings
embeddings = generate_embeddings(chunk)
#Insert the vectors into the cosmos database.
insert_vector(line_no, chunk_no, chunk, embeddings)
chunk_no += 1
line_no += 1
# Initialize LangChain components
openai_embeddings = OpenAIEmbeddings(model=_EMBEDDING_MODEL, api_key=api_key)
vector_store = AzureCosmosDBNoSqlVectorSearch(
cosmos_client = _COSMOS_CLIENT,
embedding = openai_embeddings,
vector_embedding_policy = _CONTAINER_POLICY,
indexing_policy = _INDEX_POLICY,
cosmos_container_properties = _CONTAINER_PROPERTIES,
cosmos_database_properties = _DB_PROPERTIES,
database_name = _COSMOS_DB_NAME,
container_name = _COSMOS_CONTAINER_NAME,
create_container = False
)
db_retriever = vector_store.as_retriever(search_type="similarity", search_kwargs={"k":2})
chain = RetrievalQA.from_chain_type(llm = _LLM, chain_type = 'map_reduce', retriever =db_retriever, return_source_documents = True)
for question in _QUESTIONS:
answer = chain.invoke({'query' : question})
print('q: ', question)
print('Ans: ', answer)
return
def summarise_text(page_text : str) -> str:
'''Loads the specified text into a Langchain document object, splits into pages, summarizes each page, and returns a combined output.'''
condensed_text = ''
doc = Document.Document(page_content = page_text)
docs = [doc]
#Split the document using Spacy text splitter and use it.
doc_splitter = SpacyTextSplitter(pipeline = _SPACY_MODEL, max_length = 10000)
pages = doc_splitter.split_documents(docs)
#Map Reduce code
map_prompt = PromptTemplate.from_template(template = _MAP_PROMPT_TEMPLATE)
combine_prompt = PromptTemplate.from_template( template = _COMBINED_PROMPT_TEMPLATE)
#Run the Chain.
# Generate summaries using MapReduce method
#After defining prompts, initialize the associated map_reduce_chain.
map_reduce_chain = load_summarize_chain(
llm = _LLM,
chain_type="map_reduce",
map_prompt=map_prompt,
combine_prompt=combine_prompt,
return_intermediate_steps=True,
)
#Then, you generate summaries using the chain. Notice that LangChain use a tokenizer (from transformer library) with 1024 token limit by default.
map_reduce_outputs = map_reduce_chain({"input_documents": pages}, return_only_outputs = True)
condensed_text = get_text_from_output(map_reduce_outputs)
return condensed_text
def get_text_from_output(map_reduce_outputs):
'''Extracts the text from the output from LangChain invoke'''
condensed_text = ''
chars = map_reduce_outputs['output_text']
for c in chars:
condensed_text += c
#print(condensed_text)
return condensed_text
def generate_embeddings(text):
'''Function to generate embeddings'''
ret_embeddings = []
response = _CLIENT.embeddings.create(input = text, model =_EMBEDDING_MODEL)
for embedding in response.data:
ret_embeddings.append(embedding.embedding)
return ret_embeddings
def vector_exists(embedding):
'''Function to check if vector already exists in Cosmos DB'''
query = "SELECT * FROM c WHERE c.embedding = @embedding"
parameters = [{"name": "@embedding", "value": embedding}]
items = list(_COSMOS_CONTAINER.query_items(query=query, parameters=parameters, enable_cross_partition_query=True))
return len(items) > 0
def insert_vector(line_no, ent_no, text, embedding):
'''# Function to insert vector into Cosmos DB'''
dt = datetime.datetime.now()
time_info = ('{}{}{}{}{}{}').format(dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second)
srcfile = _SRC_PATH.replace('.', '').replace('/', '')
data_id = ('{}{}{}{}').format(time_info, srcfile, line_no, ent_no)
#print(data_id)
if not vector_exists(embedding):
_COSMOS_CONTAINER.create_item(body={
'id' : str(data_id),
'text': text,
'embedding': embedding
})
return
def get_critical_terms(text):
'''Chat GPT call to get the critical phrases from the document.'''
message = _CRITICAL_TERM_USER_MESSAGE.format(text)
prompt = []
prompt.append(_CRITICAL_TERM_SYSTEM_PROMPT)
prompt.append({
'role' : 'user',
'content' : message,
})
prompt.append(_CRITICAL_TERM_ASSISTANT_PROMPT)
response = _CLIENT.chat.completions.create(
model = _MODEL,
messages = prompt,
temperature = 0,
max_tokens = 2000,
top_p = 1,
frequency_penalty = 0,
presence_penalty = 0
)
response_text = response.choices[0].message.content
#print(text, '\n\t', response_text)
return response_text
def split_and_chunk_text(text):
'''Splits and chunks the specified text'''
return _TEXT_SPLITTER.split_text(text)
def delete_items_from_all_partitions():
for item in _COSMOS_CONTAINER.query_items(
query = 'SELECT DISTINCT c.partitionKey FROM c', #'SELECT * FROM c',
enable_cross_partition_query=True):
#_COSMOS_CONTAINER.delete_item(item, partition_key = _PARTITION_KEY)
_COSMOS_CONTAINER.delete_all_items_by_partition_key(item)
print(f'Deleted partition: ', item)
return
'''
Required for all python programs.
'''
if __name__ == '__main__':
tracemalloc.start()
print('Starting')
if not _ONLY_CLEAR:
main()
#Release all the vectors.
if _COSMOS_CONTAINER != None:
delete_items_from_all_partitions()
tracemalloc.stop()
input('Done')
Traceback (most recent call last):
|
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 1 reply
-
To resolve the "CosmosHttpResponseError: (BadRequest) One of the input values is invalid" exception when using the LangChain Python package with Azure Cosmos NoSQL Vector Database for Question Answering, ensure that the following conditions are met:
Here is the relevant code snippet that checks these conditions: if self._create_container:
if (
indexing_policy["vectorIndexes"] is None
or len(indexing_policy["vectorIndexes"]) == 0
):
raise ValueError(
"vectorIndexes cannot be null or empty in the indexing_policy."
)
if (
vector_embedding_policy is None
or len(vector_embedding_policy["vectorEmbeddings"]) == 0
):
raise ValueError(
"vectorEmbeddings cannot be null "
"or empty in the vector_embedding_policy."
)
if self._cosmos_container_properties["partition_key"] is None:
raise ValueError(
"partition_key cannot be null or empty for a container."
)
if vector_search is None:
vector_search = VectorSearch(
algorithms=[
HnswAlgorithmConfiguration(
name="default",
kind=VectorSearchAlgorithmKind.HNSW,
parameters=HnswParameters(
m=4,
ef_construction=400,
ef_search=500,
metric=VectorSearchAlgorithmMetric.COSINE,
),
),
ExhaustiveKnnAlgorithmConfiguration(
name="default_exhaustive_knn",
kind=VectorSearchAlgorithmKind.EXHAUSTIVE_KNN,
parameters=ExhaustiveKnnParameters(
metric=VectorSearchAlgorithmMetric.COSINE
),
),
],
profiles=[
VectorSearchProfile(
name="myHnswProfile",
algorithm_configuration_name="default",
),
VectorSearchProfile(
name="myExhaustiveKnnProfile",
algorithm_configuration_name="default_exhaustive_knn",
),
],
) Additionally, ensure that the from langchain_community.vectorstores import AzureCosmosDBVectorSearch
from langchain_community.embeddings.openai import OpenAIEmbeddings
from pymongo import MongoClient
mongo_client = MongoClient("<YOUR-CONNECTION-STRING>")
collection = mongo_client["<db_name>"]["<collection_name>"]
embeddings = OpenAIEmbeddings()
vectorstore = AzureCosmosDBVectorSearch(collection, embeddings) Ensure that these fields and configurations are correctly set in your setup to avoid the "BadRequest" error [1][2][3]. |
Beta Was this translation helpful? Give feedback.
To resolve the "CosmosHttpResponseError: (BadRequest) One of the input values is invalid" exception when using the LangChain Python package with Azure Cosmos NoSQL Vector Database for Question Answering, ensure that the following conditions are met:
indexing_policy
must have a non-null and non-emptyvectorIndexes
field.vector_embedding_policy
must have a non-null and non-emptyvectorEmbeddings
field.cosmos_container_properties
must have a non-nullpartition_key
.vector_search
configuration is properly set up with the necessary algorithms and profiles.Here is the r…