Skip to content

Job queue #272

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ AIBTC_SUPABASE_BUCKET_NAME=your_bucket_name
# Backend Wallet Configuration
# =============================================================================
AIBTC_BACKEND_WALLET_SEED_PHRASE=your_wallet_seed_phrase
AIBTC_BACKEND_WALLET_PRIVATE_KEY=your_wallet_private_key
AIBTC_BACKEND_WALLET_PUBLIC_KEY=your_wallet_public_key
AIBTC_BACKEND_WALLET_ADDRESS=your_wallet_address

# =============================================================================
# Twitter Configuration
Expand Down
21 changes: 10 additions & 11 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from api import chat, tools, webhooks
from config import config
from lib.logger import configure_logger
from services import startup
from services.websocket import websocket_manager

# Configure module logger
Expand Down Expand Up @@ -46,18 +45,18 @@ async def health_check():

@app.on_event("startup")
async def startup_event():
"""Run startup tasks."""
# Start the WebSocket manager's cleanup task
# Note: This is now redundant as startup.run() will also start the WebSocket manager
# but we'll keep it for clarity and to ensure it's started early
"""Run web server startup tasks."""
logger.info("Starting FastAPI web server...")
# Only start WebSocket manager for web server connections
# Background services (job runners, bot, etc.) are handled by worker.py
asyncio.create_task(websocket_manager.start_cleanup_task())

# Run other startup tasks
await startup.run()
logger.info("Web server startup complete")


@app.on_event("shutdown")
async def shutdown_event():
"""Run shutdown tasks."""
logger.info("Shutting down FastAPI application")
await startup.shutdown()
"""Run web server shutdown tasks."""
logger.info("Shutting down FastAPI web server...")
# Only handle web server specific cleanup
# Background services shutdown is handled by worker.py
logger.info("Web server shutdown complete")
43 changes: 2 additions & 41 deletions services/runner/base.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import os
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any, Dict, Generic, List, Optional, Type, TypeVar
from uuid import UUID

from lib.logger import configure_logger

Expand All @@ -21,48 +19,11 @@ class RunnerResult:
T = TypeVar("T", bound=RunnerResult)


def get_required_env_var(name: str) -> UUID:
"""Get a required environment variable and convert it to UUID."""
value = os.getenv(name)
if not value:
raise ValueError(f"{name} environment variable is not set")
return UUID(value)


@dataclass
class RunnerConfig:
"""Configuration class for runners."""

twitter_profile_id: UUID
twitter_agent_id: UUID
twitter_wallet_id: Optional[UUID]

@classmethod
def from_env(cls) -> "RunnerConfig":
"""Create configuration from environment variables."""
from backend.factory import backend
from backend.models import WalletFilter

twitter_profile_id = get_required_env_var("AIBTC_TWITTER_PROFILE_ID")
twitter_agent_id = get_required_env_var("AIBTC_TWITTER_AGENT_ID")

twitter_wallet = backend.list_wallets(
filters=WalletFilter(profile_id=twitter_profile_id)
)

twitter_wallet_id = None
if not twitter_wallet:
logger.warning(
"No Twitter wallet found - some functionality may be limited"
)
else:
twitter_wallet_id = twitter_wallet[0].id

return cls(
twitter_profile_id=twitter_profile_id,
twitter_agent_id=twitter_agent_id,
twitter_wallet_id=twitter_wallet_id,
)
pass


class JobType:
Expand Down Expand Up @@ -152,7 +113,7 @@ class BaseTask(ABC, Generic[T]):
"""Base class for all tasks."""

def __init__(self, config: Optional[RunnerConfig] = None):
self.config = config or RunnerConfig.from_env()
self.config = config or RunnerConfig()
self._start_time: Optional[float] = None

@property
Expand Down
2 changes: 1 addition & 1 deletion services/runner/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ async def _execute_job(self, execution: JobExecution, worker_name: str) -> None:

context = JobContext(
job_type=job_type,
config=RunnerConfig.from_env(),
config=RunnerConfig(),
retry_count=execution.attempt - 1,
max_retries=metadata.max_retries,
)
Expand Down
2 changes: 1 addition & 1 deletion services/runner/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ async def execute_runner_job(

# Create context
context = JobContext(
job_type=job_enum, config=RunnerConfig.from_env(), parameters=parameters
job_type=job_enum, config=RunnerConfig(), parameters=parameters
)

# Create runner instance
Expand Down
14 changes: 3 additions & 11 deletions services/runner/tasks/dao_deployment_task.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict, List, Optional
from uuid import UUID

from backend.factory import backend
from backend.models import (
DAOFilter,
Profile,
QueueMessage,
QueueMessageBase,
QueueMessageFilter,
Expand Down Expand Up @@ -52,10 +50,7 @@ class DAODeploymentTask(BaseTask[DAODeploymentResult]):
def __init__(self, config: Optional[RunnerConfig] = None):
super().__init__(config)
self._pending_messages = None
self.tools_map_all = initialize_tools(
Profile(id=self.config.twitter_profile_id, created_at=datetime.now()),
agent_id=self.config.twitter_agent_id,
)
self.tools_map_all = initialize_tools(None, None)
self.tools_map = filter_tools_by_names(
["contract_deploy_dao"], self.tools_map_all
)
Expand All @@ -72,10 +67,8 @@ async def _validate_config(self, context: JobContext) -> bool:
logger.error("Tools not properly initialized")
return False

# Validate that the twitter profile and agent are available
if not self.config.twitter_profile_id or not self.config.twitter_agent_id:
logger.error("Twitter profile or agent ID not configured")
return False
# Configuration validation passed
logger.debug("DAO deployment task configuration validation passed")

return True
except Exception as e:
Expand Down Expand Up @@ -108,7 +101,6 @@ async def _validate_prerequisites(self, context: JobContext) -> bool:
filters=DAOFilter(
is_deployed=False,
is_broadcasted=True,
wallet_id=self.config.twitter_wallet_id,
)
)
if pending_daos:
Expand Down