Skip to content

fix: Add Missing Logs #1609

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion code/backend/batch/batch_push_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,22 @@ def _get_file_name_from_message(message_body) -> str:
)
def batch_push_results(msg: func.QueueMessage) -> None:
message_body = json.loads(msg.get_body().decode("utf-8"))
logger.debug("Process Document Event queue function triggered: %s", message_body)
logger.info("Process Document Event queue function triggered: %s", message_body)

event_type = message_body.get("eventType", "")
# We handle "" in this scenario for backwards compatibility
# This function is primarily triggered by an Event Grid queue message from the blob storage
# However, it can also be triggered using a legacy schema from BatchStartProcessing
if event_type in ("", "Microsoft.Storage.BlobCreated"):
logger.info("Handling 'Blob Created' event with message body: %s", message_body)
_process_document_created_event(message_body)

elif event_type == "Microsoft.Storage.BlobDeleted":
logger.info("Handling 'Blob Deleted' event with message body: %s", message_body)
_process_document_deleted_event(message_body)

else:
logger.exception("Received an unrecognized event type: %s", event_type)
raise NotImplementedError(f"Unknown event type received: {event_type}")


Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import logging
from azure.core.credentials import AzureKeyCredential
from azure.ai.formrecognizer import DocumentAnalysisClient
from azure.identity import DefaultAzureCredential
import html
import traceback
from .env_helper import EnvHelper

logger = logging.getLogger(__name__)


class AzureFormRecognizerClient:
def __init__(self) -> None:
Expand Down Expand Up @@ -75,6 +78,8 @@ def begin_analyze_document_from_url(
model_id = "prebuilt-layout" if use_layout else "prebuilt-read"

try:
logger.info("Method begin_analyze_document_from_url started")
logger.info(f"Model ID selected: {model_id}")
poller = self.document_analysis_client.begin_analyze_document_from_url(
model_id, document_url=source_url
)
Expand Down Expand Up @@ -144,4 +149,7 @@ def begin_analyze_document_from_url(

return page_map
except Exception as e:
logger.exception(f"Exception in begin_analyze_document_from_url: {e}")
raise ValueError(f"Error: {traceback.format_exc()}. Error: {e}")
finally:
logger.info("Method begin_analyze_document_from_url ended")
Original file line number Diff line number Diff line change
Expand Up @@ -190,21 +190,27 @@ def _set_new_config_properties(config: dict, default_config: dict):
@staticmethod
@functools.cache
def get_active_config_or_default():
logger.info("Method get_active_config_or_default started")
env_helper = EnvHelper()
config = ConfigHelper.get_default_config()

if env_helper.LOAD_CONFIG_FROM_BLOB_STORAGE:
logger.info("Loading configuration from Blob Storage")
blob_client = AzureBlobStorageClient(container_name=CONFIG_CONTAINER_NAME)

if blob_client.file_exists(CONFIG_FILE_NAME):
logger.info("Configuration file found in Blob Storage")
default_config = config
config_file = blob_client.download_file(CONFIG_FILE_NAME)
config = json.loads(config_file)

ConfigHelper._set_new_config_properties(config, default_config)
else:
logger.info("Returning default config")
logger.info(
"Configuration file not found in Blob Storage, using default configuration"
)

logger.info("Method get_active_config_or_default ended")
return Config(config)

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@ class IntegratedVectorizationEmbedder(EmbedderBase):
def __init__(self, env_helper: EnvHelper):
self.env_helper = env_helper
self.llm_helper: LLMHelper = LLMHelper()
logger.info("Initialized IntegratedVectorizationEmbedder.")

def embed_file(self, source_url: str, file_name: str = None):
logger.info(
f"Starting embed_file for source_url: {source_url}, file_name: {file_name}."
)
self.process_using_integrated_vectorization(source_url=source_url)

def process_using_integrated_vectorization(self, source_url: str):
logger.info(f"Starting integrated vectorization for source_url: {source_url}.")
config = ConfigHelper.get_active_config_or_default()
try:
search_datasource = AzureSearchDatasource(self.env_helper)
Expand All @@ -35,14 +40,20 @@ def process_using_integrated_vectorization(self, source_url: str):
self.env_helper.AZURE_SEARCH_INDEXER_NAME,
skillset_name=search_skillset_result.name,
)
logger.info("Integrated vectorization process completed successfully.")
return indexer_result
except Exception as e:
logger.error(f"Error processing {source_url}: {e}")
raise e

def reprocess_all(self):
logger.info("Starting reprocess_all operation.")
search_indexer = AzureSearchIndexer(self.env_helper)
if search_indexer.indexer_exists(self.env_helper.AZURE_SEARCH_INDEXER_NAME):
logger.info(
f"Running indexer: {self.env_helper.AZURE_SEARCH_INDEXER_NAME}."
)
search_indexer.run_indexer(self.env_helper.AZURE_SEARCH_INDEXER_NAME)
else:
logger.info("Indexer does not exist. Starting full processing.")
self.process_using_integrated_vectorization(source_url="all")
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

class PostgresEmbedder(EmbedderBase):
def __init__(self, blob_client: AzureBlobStorageClient, env_helper: EnvHelper):
logger.info("Initializing PostgresEmbedder.")
self.env_helper = env_helper
self.llm_helper = LLMHelper()
self.azure_postgres_helper = AzurePostgresHelper()
Expand All @@ -33,6 +34,7 @@ def __init__(self, blob_client: AzureBlobStorageClient, env_helper: EnvHelper):
self.embedding_configs[ext] = processor

def embed_file(self, source_url: str, file_name: str):
logger.info(f"Embedding file: {file_name} from source: {source_url}")
file_extension = file_name.split(".")[-1].lower()
embedding_config = self.embedding_configs.get(file_extension)
self.__embed(
Expand All @@ -48,32 +50,42 @@ def embed_file(self, source_url: str, file_name: str):
def __embed(
self, source_url: str, file_extension: str, embedding_config: EmbeddingConfig
):
logger.info(f"Starting embedding process for source: {source_url}")
documents_to_upload: List[SourceDocument] = []
if (
embedding_config.use_advanced_image_processing
and file_extension
in self.config.get_advanced_image_processing_image_types()
):
logger.error(
"Advanced image processing is not supported in PostgresEmbedder."
)
raise NotImplementedError(
"Advanced image processing is not supported in PostgresEmbedder."
)
else:
logger.info(f"Loading documents from source: {source_url}")
documents: List[SourceDocument] = self.document_loading.load(
source_url, embedding_config.loading
)
documents = self.document_chunking.chunk(
documents, embedding_config.chunking
)
logger.info("Chunked into document chunks.")

for document in documents:
documents_to_upload.append(self.__convert_to_search_document(document))

if documents_to_upload:
logger.info(
f"Uploading {len(documents_to_upload)} documents to vector store."
)
self.azure_postgres_helper.create_vector_store(documents_to_upload)
else:
logger.warning("No documents to upload.")

def __convert_to_search_document(self, document: SourceDocument):
logger.info(f"Generating embeddings for document ID: {document.id}")
embedded_content = self.llm_helper.generate_embeddings(document.content)
metadata = {
"id": document.id,
Expand All @@ -84,6 +96,7 @@ def __convert_to_search_document(self, document: SourceDocument):
"offset": document.offset,
"page_number": document.page_number,
}
logger.info(f"Metadata generated for document ID: {document.id}")
return {
"id": document.id,
"content": document.content,
Expand Down
13 changes: 13 additions & 0 deletions code/backend/batch/utilities/helpers/embedders/push_embedder.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

class PushEmbedder(EmbedderBase):
def __init__(self, blob_client: AzureBlobStorageClient, env_helper: EnvHelper):
logger.info("Initializing PushEmbedder")
self.env_helper = env_helper
self.llm_helper = LLMHelper()
self.azure_search_helper = AzureSearchHelper()
Expand All @@ -33,11 +34,14 @@ def __init__(self, blob_client: AzureBlobStorageClient, env_helper: EnvHelper):
self.blob_client = blob_client
self.config = ConfigHelper.get_active_config_or_default()
self.embedding_configs = {}
logger.info("Loading document processors")
for processor in self.config.document_processors:
ext = processor.document_type.lower()
self.embedding_configs[ext] = processor
logger.info("Document processors loaded")

def embed_file(self, source_url: str, file_name: str):
logger.info(f"Embedding file: {file_name} from URL: {source_url}")
file_extension = file_name.split(".")[-1].lower()
embedding_config = self.embedding_configs.get(file_extension)
self.__embed(
Expand All @@ -46,19 +50,22 @@ def embed_file(self, source_url: str, file_name: str):
embedding_config=embedding_config,
)
if file_extension != "url":
logger.info(f"Upserting blob metadata for file: {file_name}")
self.blob_client.upsert_blob_metadata(
file_name, {"embeddings_added": "true"}
)

def __embed(
self, source_url: str, file_extension: str, embedding_config: EmbeddingConfig
):
logger.info(f"Processing embedding for file extension: {file_extension}")
documents_to_upload: List[SourceDocument] = []
if (
embedding_config.use_advanced_image_processing
and file_extension
in self.config.get_advanced_image_processing_image_types()
):
logger.info(f"Using advanced image processing for: {source_url}")
caption = self.__generate_image_caption(source_url)
caption_vector = self.llm_helper.generate_embeddings(caption)

Expand All @@ -69,6 +76,7 @@ def __embed(
)
)
else:
logger.info(f"Loading documents from source: {source_url}")
documents: List[SourceDocument] = self.document_loading.load(
source_url, embedding_config.loading
)
Expand All @@ -81,6 +89,7 @@ def __embed(

# Upload documents (which are chunks) to search index in batches
if documents_to_upload:
logger.info("Uploading documents in batches")
batch_size = self.env_helper.AZURE_SEARCH_DOC_UPLOAD_BATCH_SIZE
search_client = self.azure_search_helper.get_search_client()
for i in range(0, len(documents_to_upload), batch_size):
Expand All @@ -93,6 +102,7 @@ def __embed(
logger.warning("No documents to upload.")

def __generate_image_caption(self, source_url):
logger.info(f"Generating image caption for URL: {source_url}")
model = self.env_helper.AZURE_OPENAI_VISION_MODEL
caption_system_message = """You are an assistant that generates rich descriptions of images.
You need to be accurate in the information you extract and detailed in the descriptons you generate.
Expand All @@ -116,9 +126,11 @@ def __generate_image_caption(self, source_url):

response = self.llm_helper.get_chat_completion(messages, model)
caption = response.choices[0].message.content
logger.info("Caption generation completed")
return caption

def __convert_to_search_document(self, document: SourceDocument):
logger.info(f"Converting document ID {document.id} to search document format")
embedded_content = self.llm_helper.generate_embeddings(document.content)
metadata = {
self.env_helper.AZURE_SEARCH_FIELDS_ID: document.id,
Expand Down Expand Up @@ -151,6 +163,7 @@ def __create_image_document(
content: str,
content_vector: List[float],
):
logger.info(f"Creating image document for source URL: {source_url}")
parsed_url = urlparse(source_url)

file_url = parsed_url.scheme + "://" + parsed_url.netloc + parsed_url.path
Expand Down
1 change: 1 addition & 0 deletions code/backend/batch/utilities/helpers/env_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ def __load_config(self, **kwargs) -> None:
self.SEMENTIC_KERNEL_SYSTEM_PROMPT = os.getenv(
"SEMENTIC_KERNEL_SYSTEM_PROMPT", ""
)
logger.info("Initializing EnvHelper completed")

def is_chat_model(self):
if "gpt-4" in self.AZURE_OPENAI_MODEL_NAME.lower():
Expand Down
6 changes: 6 additions & 0 deletions code/backend/batch/utilities/helpers/llm_helper.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from openai import AzureOpenAI
from typing import List, Union, cast
from langchain_openai import AzureChatOpenAI, AzureOpenAIEmbeddings
Expand All @@ -10,9 +11,12 @@
from azure.identity import DefaultAzureCredential
from .env_helper import EnvHelper

logger = logging.getLogger(__name__)


class LLMHelper:
def __init__(self):
logger.info("Initializing LLMHelper")
self.env_helper: EnvHelper = EnvHelper()
self.auth_type_keys = self.env_helper.is_auth_type_keys()
self.token_provider = self.env_helper.AZURE_TOKEN_PROVIDER
Expand All @@ -38,6 +42,8 @@ def __init__(self):
)
self.embedding_model = self.env_helper.AZURE_OPENAI_EMBEDDING_MODEL

logger.info("Initializing LLMHelper completed")

def get_llm(self):
if self.auth_type_keys:
return AzureChatOpenAI(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ async def orchestrate(
self, user_message: str, chat_history: List[dict], **kwargs: dict
) -> list[dict]:

logger.info("Method orchestrate of lang_chain_agent started")
# Call Content Safety tool
if self.config.prompts.enable_content_safety:
if response := self.call_content_safety_input(user_message):
Expand Down Expand Up @@ -122,4 +123,5 @@ async def orchestrate(
answer=answer.answer,
source_documents=answer.source_documents,
)
logger.info("Method orchestrate of lang_chain_agent ended")
return messages
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,12 @@ def __init__(self) -> None:
async def orchestrate(
self, user_message: str, chat_history: List[dict], **kwargs: dict
) -> list[dict]:
logger.info("Method orchestrate of open_ai_functions started")
# Call Content Safety tool
if self.config.prompts.enable_content_safety:
logger.info("Content Safety enabled. Checking input message...")
if response := self.call_content_safety_input(user_message):
logger.info("Content Safety check returned a response. Exiting method.")
return response

# Call function to determine route
Expand Down Expand Up @@ -143,6 +146,7 @@ async def orchestrate(
answer = Answer(question=user_message, answer=text)

if answer.answer is None:
logger.info("Answer is None")
answer.answer = "The requested information is not available in the retrieved data. Please try another query or topic."

# Call Content Safety tool
Expand All @@ -156,4 +160,5 @@ async def orchestrate(
answer=answer.answer,
source_documents=answer.source_documents,
)
logger.info("Method orchestrate of open_ai_functions ended")
return messages
Loading
Loading