Skip to content

perf: retry logic #138

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
324 changes: 191 additions & 133 deletions src/backend/sql_agents/convert_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ async def convert_script(

# Setup the group chat for the agents
chat = CommsManager(sql_agents.idx_agents).group_chat
#retry logic comms manager
comms_manager = CommsManager(
sql_agents.idx_agents,
max_retries=3, # Retry up to 5 times for rate limits
initial_delay=0.2, # Start with 1 second delay
backoff_factor=1.2, # Double delay each retry
)

# send websocket notification that file processing has started
send_status_update(
Expand All @@ -63,160 +70,211 @@ async def convert_script(
current_migration = "No migration"
is_complete: bool = False
while not is_complete:
await chat.add_chat_message(
await comms_manager.group_chat.add_chat_message(
ChatMessageContent(role=AuthorRole.USER, content=source_script)
)
carry_response = None
async for response in chat.invoke():
# TEMPORARY: awaiting bug fix for rate limits
await asyncio.sleep(5)
carry_response = response
if response.role == AuthorRole.ASSISTANT.value:
# Our process can terminate with either of these as the last response
# before syntax check
match response.name:
case AgentType.MIGRATOR.value:
result = MigratorResponse.model_validate_json(
response.content or ""
)
if result.input_error or result.rai_error:
# If there is an error in input, we end the processing here.
# We do not include this in termination to avoid forking the chat process.
description = {
"role": response.role,
"name": response.name or "*",
"content": response.content,
}
await batch_service.create_file_log(
str(file.file_id),
description,
current_migration,
LogType.ERROR,
AgentType(response.name),
AuthorRole(response.role),
try:

async for response in comms_manager.async_invoke():
# TEMPORARY: awaiting bug fix for rate limits
# await asyncio.sleep(5)
carry_response = response
if response.role == AuthorRole.ASSISTANT.value:
# Our process can terminate with either of these as the last response
# before syntax check
match response.name:
case AgentType.MIGRATOR.value:
result = MigratorResponse.model_validate_json(
response.content or ""
)
current_migration = None
break
case AgentType.SYNTAX_CHECKER.value:
result = SyntaxCheckerResponse.model_validate_json(
response.content.lower() or ""
)
# If there are no syntax errors, we can move to the semantic verifier
# We provide both scripts by injecting them into the chat history
if result.syntax_errors == []:
chat.history.add_message(
ChatMessageContent(
role=AuthorRole.USER,
name="candidate",
content=(
f"source_script: {source_script}, \n "
+ f"migrated_script: {current_migration}"
),
if result.input_error or result.rai_error:
# If there is an error in input, we end the processing here.
# We do not include this in termination to avoid forking the chat process.
description = {
"role": response.role,
"name": response.name or "*",
"content": response.content,
}
await batch_service.create_file_log(
str(file.file_id),
description,
current_migration,
LogType.ERROR,
AgentType(response.name),
AuthorRole(response.role),
)
current_migration = None
break
case AgentType.SYNTAX_CHECKER.value:
result = SyntaxCheckerResponse.model_validate_json(
response.content.lower() or ""
)
case AgentType.PICKER.value:
result = PickerResponse.model_validate_json(
response.content or ""
)
current_migration = result.picked_query
case AgentType.FIXER.value:
result = FixerResponse.model_validate_json(
response.content or ""
)
current_migration = result.fixed_query
case AgentType.SEMANTIC_VERIFIER.value:
logger.info(
"Semantic verifier agent response: %s", response.content
)
result = SemanticVerifierResponse.model_validate_json(
response.content or ""
)

# If the semantic verifier agent returns a difference, we need to report it
if len(result.differences) > 0:
description = {
"role": AuthorRole.ASSISTANT.value,
"name": AgentType.SEMANTIC_VERIFIER.value,
"content": "\n".join(result.differences),
}
logger.info(
"Semantic verification had issues. Pass with warnings."
)
# send status update to the client of type in progress with agent status
send_status_update(
status=FileProcessUpdate(
file.batch_id,
file.file_id,
ProcessStatus.COMPLETED,
AgentType.SEMANTIC_VERIFIER,
result.summary,
FileResult.WARNING,
),
# If there are no syntax errors, we can move to the semantic verifier
# We provide both scripts by injecting them into the chat history
if result.syntax_errors == []:
comms_manager.group_chat.history.add_message(
ChatMessageContent(
role=AuthorRole.USER,
name="candidate",
content=(
f"source_script: {source_script}, \n "
+ f"migrated_script: {current_migration}"
),
)
)
case AgentType.PICKER.value:
result = PickerResponse.model_validate_json(
response.content or ""
)
await batch_service.create_file_log(
str(file.file_id),
description,
current_migration,
LogType.WARNING,
AgentType.SEMANTIC_VERIFIER,
AuthorRole.ASSISTANT,
current_migration = result.picked_query
case AgentType.FIXER.value:
result = FixerResponse.model_validate_json(
response.content or ""
)

elif response == "":
# If the semantic verifier agent returns an empty response
current_migration = result.fixed_query
case AgentType.SEMANTIC_VERIFIER.value:
logger.info(
"Semantic verification had no return value. Pass with warnings."
"Semantic verifier agent response: %s", response.content
)
# send status update to the client of type in progress with agent status
send_status_update(
status=FileProcessUpdate(
file.batch_id,
file.file_id,
ProcessStatus.COMPLETED,
result = SemanticVerifierResponse.model_validate_json(
response.content or ""
)

# If the semantic verifier agent returns a difference, we need to report it
if len(result.differences) > 0:
description = {
"role": AuthorRole.ASSISTANT.value,
"name": AgentType.SEMANTIC_VERIFIER.value,
"content": "\n".join(result.differences),
}
logger.info(
"Semantic verification had issues. Pass with warnings."
)
# send status update to the client of type in progress with agent status
send_status_update(
status=FileProcessUpdate(
file.batch_id,
file.file_id,
ProcessStatus.COMPLETED,
AgentType.SEMANTIC_VERIFIER,
result.summary,
FileResult.WARNING,
),
)
await batch_service.create_file_log(
str(file.file_id),
description,
current_migration,
LogType.WARNING,
AgentType.SEMANTIC_VERIFIER,
AuthorRole.ASSISTANT,
)

elif response == "":
# If the semantic verifier agent returns an empty response
logger.info(
"Semantic verification had no return value. Pass with warnings."
)
# send status update to the client of type in progress with agent status
send_status_update(
status=FileProcessUpdate(
file.batch_id,
file.file_id,
ProcessStatus.COMPLETED,
AgentType.SEMANTIC_VERIFIER,
"No return value from semantic verifier agent.",
FileResult.WARNING,
),
)
await batch_service.create_file_log(
str(file.file_id),
"No return value from semantic verifier agent.",
FileResult.WARNING,
),
)
await batch_service.create_file_log(
str(file.file_id),
"No return value from semantic verifier agent.",
current_migration,
LogType.WARNING,
AgentType.SEMANTIC_VERIFIER,
AuthorRole.ASSISTANT,
)
current_migration,
LogType.WARNING,
AgentType.SEMANTIC_VERIFIER,
AuthorRole.ASSISTANT,
)

description = {
"role": response.role,
"name": response.name or "*",
"content": response.content,
}
description = {
"role": response.role,
"name": response.name or "*",
"content": response.content,
}

logger.info(description)
logger.info(description)

# send status update to the client of type in progress with agent status
# send status update to the client of type in progress with agent status
# send_status_update(
# status=FileProcessUpdate(
# file.batch_id,
# file.file_id,
# ProcessStatus.IN_PROGRESS,
# AgentType(response.name),
# json.loads(response.content)["summary"],
# FileResult.INFO,
# ),
# )
# Safely parse response content to avoid crashing on malformed or incomplete JSON
#start
try:
parsed_content = json.loads(response.content or "{}")
except json.JSONDecodeError:
logger.warning("Invalid JSON from agent: %s", response.content)
parsed_content = {
"input_summary": "",
"candidates": [],
"summary": "",
"input_error": "",
"rai_error": "Invalid JSON from agent.",
}

# Send status update using safe fallback values
send_status_update(
status=FileProcessUpdate(
file.batch_id,
file.file_id,
ProcessStatus.IN_PROGRESS,
AgentType(response.name),
parsed_content.get("summary", ""),
FileResult.INFO,
),
)
##end
await batch_service.create_file_log(
str(file.file_id),
description,
current_migration,
LogType.INFO,
AgentType(response.name),
AuthorRole(response.role),
)
except Exception as e:
#logger.error("Error during chat.invoke(): %s", str(e))
logger.error("Error during comms_manager.async_invoke(): %s", str(e))
# Log the error to the batch service for tracking
await batch_service.create_file_log(
str(file.file_id),
f"Critical error during agent communication: {str(e)}",
current_migration,
LogType.ERROR,
AgentType.ALL,
AuthorRole.ASSISTANT,
)
# Send error status update
send_status_update(
status=FileProcessUpdate(
file.batch_id,
file.file_id,
ProcessStatus.IN_PROGRESS,
AgentType(response.name),
json.loads(response.content)["summary"],
FileResult.INFO,
ProcessStatus.COMPLETED,
AgentType.ALL,
f"Processing failed: {str(e)}",
FileResult.ERROR,
),
)
break # Exit the while loop on critical error

await batch_service.create_file_log(
str(file.file_id),
description,
current_migration,
LogType.INFO,
AgentType(response.name),
AuthorRole(response.role),
)

if chat.is_complete:
if comms_manager.group_chat.is_complete:
is_complete = True

break
Expand Down Expand Up @@ -298,4 +356,4 @@ async def validate_migration(
author_role=AuthorRole.ASSISTANT,
)

return True
return True
Loading
Loading