Skip to content

Add CLI support for startup celery services #724

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions backend/app/task/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def find_task_packages():
packages = []
for root, dirs, files in os.walk(os.path.join(BASE_PATH, 'app', 'task', 'tasks')):
if 'tasks.py' in files:
package = root.replace(str(BASE_PATH) + os.path.sep, '').replace(os.path.sep, '.')
package = root.replace(str(BASE_PATH.parent) + os.path.sep, '').replace(os.path.sep, '.')
packages.append(package)
return packages

Expand Down Expand Up @@ -45,8 +45,8 @@ def init_celery() -> celery.Celery:
result_extended=True,
# result_expires=0, # 任务结果自动清理,0 或 None 表示不清理
beat_schedule=LOCAL_BEAT_SCHEDULE,
beat_scheduler='app.task.utils.schedulers:DatabaseScheduler',
task_cls='app.task.tasks.base:TaskBase',
beat_scheduler='backend.app.task.utils.schedulers:DatabaseScheduler',
task_cls='backend.app.task.tasks.base:TaskBase',
task_track_started=True,
enable_utc=False,
timezone=settings.DATETIME_TIMEZONE,
Expand Down
6 changes: 3 additions & 3 deletions backend/celery-start.sh
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
#!/usr/bin/env bash

# work && beat
celery -A app.task.celery worker -l info -P gevent -c 100 &
celery -A backend.app.task.celery worker -l info -P gevent -c 100 &

# beat
celery -A app.task.celery beat -l info &
celery -A backend.app.task.celery beat -l info &

# flower
celery -A app.task.celery flower --port=8555 --basic-auth=admin:123456
celery -A backend.app.task.celery flower --port=8555 --basic-auth=admin:123456
82 changes: 77 additions & 5 deletions backend/cli.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import asyncio
import subprocess

from dataclasses import dataclass
from typing import Annotated
from typing import Annotated, Literal

import cappa
import granian
Expand Down Expand Up @@ -49,6 +50,34 @@ def run(host: str, port: int, reload: bool, workers: int | None) -> None:
).serve()


def run_celery_worker(log_level: Literal['info', 'debug']) -> None:
try:
subprocess.run(['celery', '-A', 'backend.app.task.celery', 'worker', '-l', f'{log_level}', '-P', 'gevent'])
except KeyboardInterrupt:
pass


def run_celery_beat(log_level: Literal['info', 'debug']) -> None:
try:
subprocess.run(['celery', '-A', 'backend.app.task.celery', 'beat', '-l', f'{log_level}'])
except KeyboardInterrupt:
pass


def run_celery_flower(port: int, basic_auth: str) -> None:
try:
subprocess.run([
'celery',
'-A',
'backend.app.task.celery',
'flower',
f'--port={port}',
f'--basic-auth={basic_auth}',
])
except KeyboardInterrupt:
pass


async def install_plugin(
path: str, repo_url: str, no_sql: bool, db_type: DataBaseType, pk_type: PrimaryKeyType
) -> None:
Expand Down Expand Up @@ -89,7 +118,7 @@ async def execute_sql_scripts(sql_scripts: str) -> None:
console.print(Text('SQL 脚本已执行完成', style='bold green'))


@cappa.command(help='运行服务')
@cappa.command(help='运行 API 服务')
@dataclass
class Run:
host: Annotated[
Expand Down Expand Up @@ -118,6 +147,49 @@ def __call__(self):
run(host=self.host, port=self.port, reload=self.no_reload, workers=self.workers)


@cappa.command(help='从当前主机启动 Celery worker 服务')
@dataclass
class Worker:
log_level: Annotated[
Literal['info', 'debug'],
cappa.Arg(long=True, short='-l', default='info', help='日志输出级别'),
]

def __call__(self):
run_celery_worker(log_level=self.log_level)


@cappa.command(help='从当前主机启动 Celery beat 服务')
@dataclass
class Beat:
log_level: Annotated[
Literal['info', 'debug'],
cappa.Arg(long=True, short='-l', default='info', help='日志输出级别'),
]

def __call__(self):
run_celery_beat(log_level=self.log_level)


@cappa.command(help='从当前主机启动 Celery flower 服务')
@dataclass
class Flower:
port: Annotated[int, cappa.Arg(long=True, default=8555, help='提供服务的主机端口号')]
basic_auth: Annotated[str, cappa.Arg(long=True, default='admin:123456', help='页面登录的用户名和密码')]

def __call__(self):
run_celery_flower(port=self.port, basic_auth=self.basic_auth)


@cappa.command(help='运行 Celery 服务')
@dataclass
class Celery:
subcmd: cappa.Subcommands[Worker | Beat | Flower | None] = None

def __call__(self):
console.print('\n更多信息,尝试 "[cyan]--help[/]"')


@cappa.command(help='新增插件')
@dataclass
class Add:
Expand Down Expand Up @@ -151,13 +223,13 @@ async def __call__(self):
class FbaCli:
version: Annotated[
bool,
cappa.Arg(short='-V', long=True, default=False, help='打印当前版本号'),
cappa.Arg(short='-V', long=True, default=False, show_default=False, help='打印当前版本号'),
]
sql: Annotated[
str,
cappa.Arg(long=True, default='', help='在事务中执行 SQL 脚本'),
cappa.Arg(value_name='PATH', long=True, default='', show_default=False, help='在事务中执行 SQL 脚本'),
]
subcmd: cappa.Subcommands[Run | Add | None] = None
subcmd: cappa.Subcommands[Run | Celery | Add | None] = None

async def __call__(self):
if self.version:
Expand Down