Skip to content

Optimize celery integrations and events #721

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions backend/app/task/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
# -*- coding: utf-8 -*-
import sys

from pathlib import Path
from backend.core.path_conf import BASE_PATH

from .actions import * # noqa: F403

# 导入项目根目录
sys.path.append(str(Path(__file__).resolve().parent.parent.parent.parent))
sys.path.append(str(BASE_PATH.parent))
13 changes: 13 additions & 0 deletions backend/app/task/actions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from starlette.concurrency import run_in_threadpool

from backend.app.task.celery import celery_app
from backend.common.socketio.server import sio


@sio.event
async def task_worker_status(sid, data):
"""任务 Worker 状态事件"""
worker = await run_in_threadpool(celery_app.control.ping)
await sio.emit('task_worker_status', worker, sid)
2 changes: 1 addition & 1 deletion backend/app/task/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def init_celery() -> celery.Celery:
if settings.CELERY_BROKER == 'redis'
else f'amqp://{settings.CELERY_RABBITMQ_USERNAME}:{settings.CELERY_RABBITMQ_PASSWORD}@{settings.CELERY_RABBITMQ_HOST}:{settings.CELERY_RABBITMQ_PORT}',
broker_connection_retry_on_startup=True,
backend=f'db+{settings.DATABASE_TYPE + "+pymysql" if settings.DATABASE_TYPE == "mysql" else settings.DATABASE_TYPE}' # noqa: E501
backend=f'db+{settings.DATABASE_TYPE}+{"pymysql" if settings.DATABASE_TYPE == "mysql" else "psycopg"}'
f'://{settings.DATABASE_USER}:{settings.DATABASE_PASSWORD}@{settings.DATABASE_HOST}:{settings.DATABASE_PORT}/{settings.DATABASE_SCHEMA}',
database_engine_options={'echo': settings.DATABASE_ECHO},
database_table_names={
Expand Down
1 change: 1 addition & 0 deletions backend/common/socketio/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from .actions import * # noqa: F403
17 changes: 4 additions & 13 deletions backend/common/socketio/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,8 @@

# 创建 Socket.IO 服务器实例
sio = socketio.AsyncServer(
# 集成 Celery 实现消息订阅
client_manager=socketio.AsyncRedisManager(
f'redis://:{settings.REDIS_PASSWORD}@{settings.REDIS_HOST}:'
f'{settings.REDIS_PORT}/{settings.CELERY_BROKER_REDIS_DATABASE}'
)
if settings.CELERY_BROKER == 'redis'
else socketio.AsyncAioPikaManager(
(
f'amqp://{settings.CELERY_RABBITMQ_USERNAME}:{settings.CELERY_RABBITMQ_PASSWORD}@'
f'{settings.CELERY_RABBITMQ_HOST}:{settings.CELERY_RABBITMQ_PORT}'
)
f'redis://:{settings.REDIS_PASSWORD}@{settings.REDIS_HOST}:{settings.REDIS_PORT}/{settings.REDIS_DATABASE}'
),
async_mode='asgi',
cors_allowed_origins=settings.CORS_ALLOWED_ORIGINS,
Expand All @@ -30,7 +21,7 @@

@sio.event
async def connect(sid, environ, auth):
"""处理 WebSocket 连接事件"""
"""Socket 连接事件"""
if not auth:
log.error('WebSocket 连接失败:无授权')
return False
Expand All @@ -57,6 +48,6 @@ async def connect(sid, environ, auth):


@sio.event
async def disconnect(sid: str) -> None:
"""处理 WebSocket 断开连接事件"""
async def disconnect(sid) -> None:
"""Socket 断开连接事件"""
await redis_client.spop(settings.TOKEN_ONLINE_REDIS_PREFIX)
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ dependencies = [
"msgspec>=0.19.0",
"path>=17.0.0",
"psutil>=7.0.0",
"psycopg>=3.2.9",
"pwdlib>=0.2.1",
"pydantic>=2.11.0",
"pydantic-settings>=2.10.0",
Expand Down
7 changes: 6 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ prompt-toolkit==3.0.51
# via click-repl
psutil==7.0.0
# via fastapi-best-architecture
psycopg==3.2.9
# via fastapi-best-architecture
pwdlib==0.2.1
# via fastapi-best-architecture
pyasn1==0.6.1
Expand Down Expand Up @@ -296,6 +298,7 @@ typing-extensions==4.14.1
# exceptiongroup
# fastapi
# fastapi-pagination
# psycopg
# pydantic
# pydantic-core
# rich
Expand All @@ -310,7 +313,9 @@ typing-inspection==0.4.1
# pydantic
# pydantic-settings
tzdata==2025.2
# via kombu
# via
# kombu
# psycopg
ua-parser==1.0.1
# via user-agents
ua-parser-builtins==0.18.0.post1
Expand Down
15 changes: 15 additions & 0 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.