Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions app/agents/voice/automatic/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,29 @@


async def main():
# If no command-line arguments are provided, read a single session config from stdin.
# This is the new secure mode for direct (non-pooled) process creation.
if len(sys.argv) == 1:
logger.info(
"No command-line args detected, starting in single-session stdin mode."
)
try:
line = await asyncio.to_thread(sys.stdin.readline)
if line.strip():
session_config = json.loads(line.strip())
await handle_session(session_config)
else:
logger.error(
"Started with no arguments but received no config from stdin."
)
except Exception as e:
logger.error(
f"Failed to read/handle session config from stdin: {e}", exc_info=True
)
finally:
logger.info("Single-session stdin mode finished.")
return

parser = argparse.ArgumentParser()
parser.add_argument("-u", "--url", type=str, help="URL of the Daily room")
parser.add_argument("-t", "--token", type=str, help="Daily token")
Expand Down
16 changes: 12 additions & 4 deletions app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,10 +324,18 @@ def get_required_env(var_name: str) -> str:
MAX_DAILY_SESSION_LIMIT = int(os.environ.get("MAX_DAILY_SESSION_LIMIT", 1800))

# Pool Configuration
VOICE_AGENT_POOL_SIZE = int(os.environ.get("VOICE_AGENT_POOL_SIZE", 1))
VOICE_AGENT_MAX_POOL_SIZE = int(os.environ.get("VOICE_AGENT_MAX_POOL_SIZE", 3))
DAILY_ROOM_POOL_SIZE = int(os.environ.get("DAILY_ROOM_POOL_SIZE", 1))
DAILY_ROOM_MAX_POOL_SIZE = int(os.environ.get("DAILY_ROOM_MAX_POOL_SIZE", 5))
AUTOMATIC_VOICE_AGENT_POOL_SIZE = int(
os.environ.get("AUTOMATIC_VOICE_AGENT_POOL_SIZE", 1)
)
AUTOMATIC_VOICE_AGENT_MAX_POOL_SIZE = int(
os.environ.get("AUTOMATIC_VOICE_AGENT_MAX_POOL_SIZE", 3)
)
AUTOMATIC_DAILY_ROOM_POOL_SIZE = int(
os.environ.get("AUTOMATIC_DAILY_ROOM_POOL_SIZE", 1)
)
AUTOMATIC_DAILY_ROOM_MAX_POOL_SIZE = int(
os.environ.get("AUTOMATIC_DAILY_ROOM_MAX_POOL_SIZE", 5)
)

# Human-in-the-Loop (HITL) Configuration
HITL_ENABLE = os.environ.get("HITL_ENABLE", "true").lower() == "true"
Expand Down
75 changes: 29 additions & 46 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@
from app import __version__
from app.api.routers import automatic, breeze_buddy
from app.core.config import (
AUTOMATIC_DAILY_ROOM_MAX_POOL_SIZE,
AUTOMATIC_DAILY_ROOM_POOL_SIZE,
AUTOMATIC_VOICE_AGENT_MAX_POOL_SIZE,
AUTOMATIC_VOICE_AGENT_POOL_SIZE,
DAILY_API_KEY,
DAILY_API_URL,
DAILY_ROOM_MAX_POOL_SIZE,
DAILY_ROOM_POOL_SIZE,
ENABLE_AUTOMATIC_DAILY_RECORDING,
HOST,
MAX_DAILY_SESSION_LIMIT,
PORT,
VOICE_AGENT_MAX_POOL_SIZE,
VOICE_AGENT_POOL_SIZE,
)

# Import necessary components from the new structure
Expand Down Expand Up @@ -89,8 +89,8 @@ async def lifespan(_app: FastAPI):
try:
await initialize_room_pool(
daily_rest_helper=daily_helpers["rest"],
pool_size=DAILY_ROOM_POOL_SIZE,
max_pool_size=DAILY_ROOM_MAX_POOL_SIZE,
pool_size=AUTOMATIC_DAILY_ROOM_POOL_SIZE,
max_pool_size=AUTOMATIC_DAILY_ROOM_MAX_POOL_SIZE,
max_session_limit=MAX_DAILY_SESSION_LIMIT,
enable_recording=ENABLE_AUTOMATIC_DAILY_RECORDING,
)
Expand All @@ -101,7 +101,8 @@ async def lifespan(_app: FastAPI):
# Initialize voice agent process pool
try:
await initialize_voice_agent_pool(
pool_size=VOICE_AGENT_POOL_SIZE, max_pool_size=VOICE_AGENT_MAX_POOL_SIZE
pool_size=AUTOMATIC_VOICE_AGENT_POOL_SIZE,
max_pool_size=AUTOMATIC_VOICE_AGENT_MAX_POOL_SIZE,
)

# Set up callbacks to avoid circular imports
Expand Down Expand Up @@ -259,55 +260,37 @@ async def bot_connect(
f"Failed to get process from pool: {e}, falling back to direct creation"
)

# 5. Fallback: Launch subprocess directly
# 5. Fallback: Launch subprocess directly and pass config via stdin
bot_file = "app.agents.voice.automatic"
cmd = [
"python3",
"-m",
bot_file,
"-u",
room_url,
"-t",
bot_token,
"--session-id",
session_id,
"--client-sid",
client_sid,
]

# Dynamically build command arguments from session_params
arg_map = {
"mode": "--mode",
"user_name": "--user-name",
"user_email": "--user-email",
"tts_provider": "--tts-provider",
"voice_name": "--voice-name",
"euler_token": "--euler-token",
"breeze_token": "--breeze-token",
"shop_url": "--shop-url",
"shop_id": "--shop-id",
"shop_type": "--shop-type",
"merchant_id": "--merchant-id",
"platform_integrations": "--platform-integrations",
"reseller_id": "--reseller-id",
cmd = ["python3", "-m", bot_file]

session_config = {
"room_url": room_url,
"token": bot_token,
"session_id": session_id,
"client_sid": client_sid,
**session_params,
}

for key, value in session_params.items():
if value is not None:
arg_name = arg_map.get(key)
if isinstance(value, list):
cmd.extend([arg_name] + value)
else:
cmd.extend([arg_name, str(value)])
config_json = json.dumps(session_config) + "\n"

logger.bind(session_id=session_id).info(
f"Launching subprocess with command: {' '.join(cmd)}"
f"Launching subprocess with command: {' '.join(cmd)} and passing config via stdin"
)

proc = subprocess.Popen(
cmd,
cwd=Path(__file__).parent.parent,
stdin=subprocess.PIPE,
bufsize=1,
)

# Write config to stdin and close it
if proc.stdin:
try:
proc.stdin.write(config_json.encode("utf-8"))
proc.stdin.close()
except Exception as e:
logger.error(f"Failed to write to subprocess stdin: {e}")
bot_procs[proc.pid] = (proc, room_url, session_id, "direct")
Comment on lines +288 to 294
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Propagate fallback stdin write failures instead of returning success.

Right now if proc.stdin.write(...) blows up (e.g., the agent process crashes before reading), we just log the error and still hand the client a “success” response. That leaves them with a Daily room but no agent ever coming online. Mirror the pool path: kill the subprocess, re-raise (and return the room to the pool if applicable) so the caller gets a failure instead of a silent noop.

         if proc.stdin:
             try:
                 proc.stdin.write(config_json.encode("utf-8"))
                 proc.stdin.close()
             except Exception as e:
                 logger.error(f"Failed to write to subprocess stdin: {e}")
+                proc.kill()
+                raise
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if proc.stdin:
try:
proc.stdin.write(config_json.encode("utf-8"))
proc.stdin.close()
except Exception as e:
logger.error(f"Failed to write to subprocess stdin: {e}")
bot_procs[proc.pid] = (proc, room_url, session_id, "direct")
if proc.stdin:
try:
proc.stdin.write(config_json.encode("utf-8"))
proc.stdin.close()
except Exception as e:
logger.error(f"Failed to write to subprocess stdin: {e}")
proc.kill()
raise
bot_procs[proc.pid] = (proc, room_url, session_id, "direct")
🧰 Tools
🪛 Ruff (0.13.1)

292-292: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
In app/main.py around lines 288 to 294, the code currently logs exceptions from
proc.stdin.write/config_json and continues as success; instead, on any exception
when writing to the subprocess stdin you must: stop registering the process,
kill/terminate the subprocess (proc.kill() or proc.terminate() then proc.wait())
to avoid zombie/half-started agents, return the room to the pool if this room
was taken from a pool (restore whatever pool state or call the pool-return
helper), and re-raise the exception so the caller receives a failure; ensure
bot_procs[proc.pid] is only set after a successful write/close and that cleanup
happens inside the except before re-raising.

logger.bind(session_id=session_id).info(
f"Subprocess started with PID: {proc.pid}"
Expand Down
10 changes: 5 additions & 5 deletions POOL_IMPLEMENTATION.md → docs/POOL_IMPLEMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,16 @@ The pool sizes are now configurable via environment variables. You can set them

```bash
# The number of voice agent processes to keep ready in the pool.
VOICE_AGENT_POOL_SIZE=3
AUTOMATIC_VOICE_AGENT_POOL_SIZE=3

# The maximum number of voice agent processes the pool can scale up to.
VOICE_AGENT_MAX_POOL_SIZE=3
AUTOMATIC_VOICE_AGENT_MAX_POOL_SIZE=3

# The number of Daily.co rooms to keep ready in the pool.
DAILY_ROOM_POOL_SIZE=5
AUTOMATIC_DAILY_ROOM_POOL_SIZE=5

# The maximum number of Daily.co rooms the pool can scale up to.
DAILY_ROOM_MAX_POOL_SIZE=5
AUTOMATIC_DAILY_ROOM_MAX_POOL_SIZE=5
```

### Multi-Pod Setup
Expand Down Expand Up @@ -242,7 +242,7 @@ curl -X POST http://localhost:8000/agent/voice/automatic/cleanup/{session_id}
- [ ] **Model Pre-warming**: Investigate pre-loading heavy models (like STT, VAD) into memory when a process is created, rather than on the first session assignment. This could further reduce the initial session delay.
- [ ] **Shared Model Cache**: For multi-process setups, explore using a shared memory cache (e.g., Redis, Memcached) for models to reduce the overall memory footprint.
- [ ] **Asynchronous Model Loading**: Load non-critical models asynchronously after the primary connection is established to improve perceived performance.
- [ ] **Auto-scaling of pool sizes**: Implement logic to dynamically adjust `VOICE_AGENT_POOL_SIZE` and `DAILY_ROOM_POOL_SIZE` based on real-time load and demand.
- [ ] **Auto-scaling of pool sizes**: Implement logic to dynamically adjust `AUTOMATIC_VOICE_AGENT_POOL_SIZE` and `AUTOMATIC_DAILY_ROOM_POOL_SIZE` based on real-time load and demand.
- [ ] **Advanced Health Checks**: Implement more sophisticated health checks that not only verify if a process is running but also check its responsiveness and resource consumption.
- [ ] **Performance Metrics Dashboard**: Create a dedicated dashboard (e.g., using Grafana) to visualize pool statistics, connection times, and resource utilization over time.

Expand Down