Skip to content

Feat: Safe OWL Society Termination #450

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
4 changes: 3 additions & 1 deletion owl/utils/enhanced_role_playing.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,10 +436,11 @@ def step(
),
)


import threading
def run_society(
society: OwlRolePlaying,
round_limit: int = 15,
stop_event: threading.Event = None
) -> Tuple[str, List[dict], dict]:
overall_completion_token_count = 0
overall_prompt_token_count = 0
Expand Down Expand Up @@ -488,6 +489,7 @@ def run_society(
assistant_response.terminated
or user_response.terminated
or "TASK_DONE" in user_response.msg.content
or (stop_event and stop_event.is_set())
):
break

Expand Down
171 changes: 123 additions & 48 deletions owl/webapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,12 @@ def setup_logging():
STOP_LOG_THREAD = threading.Event()
CURRENT_PROCESS = None # Used to track the currently running process
STOP_REQUESTED = threading.Event() # Used to mark if stop was requested

STATE = {
"token_count": "0",
"status": (f"<span class='status-indicator status-success'></span> Ready"),
"logs": "No conversation records yet.",
"running": False
}

# Log reading and updating functions
def log_reader_thread(log_file):
Expand Down Expand Up @@ -320,7 +325,7 @@ def run_owl(question: str, example_module: str) -> Tuple[str, str, str]:
Returns:
Tuple[...]: Answer, token count, status
"""
global CURRENT_PROCESS
global CURRENT_PROCESS, STOP_REQUESTED

# Validate input
if not validate_input(question):
Expand Down Expand Up @@ -392,11 +397,22 @@ def run_owl(question: str, example_module: str) -> Tuple[str, str, str]:
"0",
f"❌ Error: Build failed - {str(e)}",
)

# Check if STOP_REQUESTED. Early Premption when triggered early
if STOP_REQUESTED and STOP_REQUESTED.is_set():
return (
f"Thread Returned Early due to termination",
"0",
"☑️ Success - OWL Stopped",
)

# Run society simulation
try:
logging.info("Running society simulation...")
answer, chat_history, token_info = run_society(society)
answer, chat_history, token_info = run_society(
society=society,
stop_event=STOP_REQUESTED
)
logging.info("Society simulation completed")
except Exception as e:
logging.error(f"Error occurred while running society simulation: {str(e)}")
Expand Down Expand Up @@ -430,6 +446,25 @@ def run_owl(question: str, example_module: str) -> Tuple[str, str, str]:
)
return (f"Error occurred: {str(e)}", "0", f"❌ Error: {str(e)}")

def stop_owl() -> None:
r"""
Trigger the STOP_REQUESTED Event to Stop OWL and update the app state

Returns:
None
"""
global CURRENT_PROCESS, STOP_REQUESTED, STATE
msg_template = lambda msg: (f"<span class='status-indicator status-running'></span> {msg}")

if STOP_REQUESTED.is_set() and CURRENT_PROCESS.is_alive():
STATE["status"] = msg_template("Termination in the process...")

if CURRENT_PROCESS.is_alive():
STOP_REQUESTED.set() # Signal the thread to stop
logging.info("📐STOP_REQUESTED Event is Set")
STATE["status"] = msg_template("Stopping the society...")
else:
STATE["status"] = msg_template("Process already completed.")

def update_module_description(module_name: str) -> str:
"""Return the description of the selected module"""
Expand Down Expand Up @@ -793,9 +828,14 @@ def clear_log_file():
return ""

# Create a real-time log update function
def process_with_live_logs(question, module_name):
"""Process questions and update logs in real-time"""
global CURRENT_PROCESS
import asyncio
async def process_with_live_logs(question, module_name) -> Tuple[gr.Button, gr.Button]:
r"""Start Owl in Thread and update logs in realtime

Returns:
Tuple[...]: Optimistically toggle the state of the button
"""
global CURRENT_PROCESS, STATE

# Clear log file
clear_log_file()
Expand All @@ -817,47 +857,73 @@ def process_in_background():
CURRENT_PROCESS = bg_thread # Record current process
bg_thread.start()

# While waiting for processing to complete, update logs once per second
while bg_thread.is_alive():
# Update conversation record display
logs2 = get_latest_logs(100, LOG_QUEUE)

# Always update status
yield (
"0",
"<span class='status-indicator status-running'></span> Processing...",
logs2,
)

time.sleep(1)

# Processing complete, get results
if not result_queue.empty():
result = result_queue.get()
answer, token_count, status = result

# Final update of conversation record
logs2 = get_latest_logs(100, LOG_QUEUE)
async def update_logs_async(result_queue, bg_thread, STATE) -> None:
r"""Updates the realtime logs in async with a new asyncio task

Args:
result_queue: The Queue updated by run_owl(). Contains answer, token_count & Status
bg_thread: The Background thread the run_owl() is running at
STATE: The current app state which is a global dictionary of data
"""
while bg_thread.is_alive():
STATE["logs"] = get_latest_logs(100, LOG_QUEUE)
STATE["token_count"] = "0" # Example update
STATE["status"] = (f"<span class='status-indicator status-running'></span> Processing...")
STATE["running"] = True

await asyncio.sleep(1) # Allow UI updates
# Processing complete, get results
if not result_queue.empty():
logging.info("Real time logs finished ✅")
result = result_queue.get()
answer, token_count, status = result
# Final update of conversation record
logs2 = get_latest_logs(100, LOG_QUEUE)
# Set different indicators based on status
if "Error" in status:
status_with_indicator = (
f"<span class='status-indicator status-error'></span> {status}"
)
else:
status_with_indicator = (
f"<span class='status-indicator status-success'></span> {status}"
)

# Set different indicators based on status
if "Error" in status:
status_with_indicator = (
f"<span class='status-indicator status-error'></span> {status}"
)
STATE["logs"] = logs2
STATE["status"] = status_with_indicator
STATE["token_count"] = token_count # Example update
STATE["running"] = False
else:
status_with_indicator = (
f"<span class='status-indicator status-success'></span> {status}"
)

yield token_count, status_with_indicator, logs2
else:
logs2 = get_latest_logs(100, LOG_QUEUE)
yield (
"0",
"<span class='status-indicator status-error'></span> Terminated",
logs2,
)

logs2 = get_latest_logs(100, LOG_QUEUE)
gr.update()

STATE["logs"] = "0"
STATE["status"] ="<span class='status-indicator status-error'></span> Terminated"
STATE["token_count"] = logs2
STATE["running"] = False

# Start a separate async task for updating logs
asyncio.create_task(update_logs_async(result_queue, bg_thread, STATE))

# Optimistic Toggle of Start Button
return (gr.Button(visible=False), gr.Button(visible=True))

def update_interface() -> Tuple[str,str,str,gr.Button,gr.Button]:
r"""Update the latest state values.

Returns:
Tuple[...]: Links output to token_count_output, status_output, log_display2, run_button, stop_button
"""
global STATE

return (
STATE["token_count"],
STATE["status"],
STATE["logs"],
gr.Button(visible=not STATE["running"]), # run_button
gr.Button(visible=STATE["running"]) # stop_button
)

with gr.Blocks(title="OWL", theme=gr.themes.Soft(primary_hue="blue")) as app:
gr.Markdown(
"""
Expand Down Expand Up @@ -1108,6 +1174,8 @@ def process_in_background():
run_button = gr.Button(
"Run", variant="primary", elem_classes="primary"
)
# Stop button (hidden initially)
stop_button = gr.Button("Stop", variant="secondary", visible=False)

status_output = gr.HTML(
value="<span class='status-indicator status-success'></span> Ready",
Expand Down Expand Up @@ -1237,10 +1305,17 @@ def process_in_background():
refresh_button.click(fn=update_env_table, outputs=[env_table])

# Set up event handling
run_button.click(
start_event = run_button.click(
fn=process_with_live_logs,
inputs=[question_input, module_dropdown],
outputs=[token_count_output, status_output, log_display2],
outputs=[run_button, stop_button],
queue=True
)
# When clicking the stop button, stop the background thread and show start button
stop_button.click(
fn=stop_owl,
queue=True,
cancels=start_event
)

# Module selection updates description
Expand Down Expand Up @@ -1270,7 +1345,7 @@ def toggle_auto_refresh(enabled):
outputs=[log_display2],
)

# No longer automatically refresh logs by default
app.load(update_interface, outputs=[token_count_output, status_output, log_display2, run_button, stop_button], every=1)

return app

Expand Down