Skip to content

Simplify celery task crontab config #722

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions backend/app/task/model/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,7 @@ class TaskScheduler(Base):
type: Mapped[int] = mapped_column(comment='调度类型(0间隔 1定时)')
interval_every: Mapped[int | None] = mapped_column(comment='任务再次运行前的间隔周期数')
interval_period: Mapped[str | None] = mapped_column(String(255), comment='任务运行之间的周期类型')
crontab_minute: Mapped[str | None] = mapped_column(String(60 * 4), default='*', comment='运行的分钟,"*" 表示全部')
crontab_hour: Mapped[str | None] = mapped_column(String(24 * 4), default='*', comment='运行的小时,"*" 表示全部')
crontab_day_of_week: Mapped[str | None] = mapped_column(String(64), default='*', comment='运行的星期,"*" 表示全部')
crontab_day_of_month: Mapped[str | None] = mapped_column(
String(31 * 4), default='*', comment='运行的每月日期,"*" 表示全部'
)
crontab_month_of_year: Mapped[str | None] = mapped_column(
String(64), default='*', comment='运行的月份,"*" 表示全部'
)
crontab: Mapped[str | None] = mapped_column(String(50), default='* * * * *', comment='任务运行的 Crontab 计划')
one_off: Mapped[bool] = mapped_column(
Boolean().with_variant(INTEGER, 'postgresql'), default=False, comment='是否仅运行一次'
)
Expand Down
10 changes: 3 additions & 7 deletions backend/app/task/schema/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ class TaskSchedulerSchemeBase(SchemaBase):

name: str = Field(description='任务名称')
task: str = Field(description='要运行的 Celery 任务')
args: JsonValue | None = Field(default='[]', description='任务可接收的位置参数')
kwargs: JsonValue | None = Field(default='{}', description='任务可接收的关键字参数')
args: JsonValue | None = Field(default=None, description='任务可接收的位置参数')
kwargs: JsonValue | None = Field(default=None, description='任务可接收的关键字参数')
queue: str | None = Field(default=None, description='CELERY_TASK_QUEUES 中定义的队列')
exchange: str | None = Field(default=None, description='低级别 AMQP 路由的交换机')
routing_key: str | None = Field(default=None, description='低级别 AMQP 路由的路由密钥')
Expand All @@ -25,11 +25,7 @@ class TaskSchedulerSchemeBase(SchemaBase):
type: TaskSchedulerType = Field(default=TaskSchedulerType.INTERVAL, description='任务调度类型(0间隔 1定时)')
interval_every: int | None = Field(default=None, description='任务再次运行前的间隔周期数')
interval_period: PeriodType | None = Field(default=None, description='任务运行之间的周期类型')
crontab_minute: str | None = Field(default='*', description='运行的分钟,"*" 表示全部')
crontab_hour: str | None = Field(default='*', description='运行的小时,"*" 表示全部')
crontab_day_of_week: str | None = Field(default='*', description='运行的星期,"*" 表示全部')
crontab_day_of_month: str | None = Field(default='*', description='运行的每月日期,"*" 表示全部')
crontab_month_of_year: str | None = Field(default='*', description='运行的月份,"*" 表示全部')
crontab: str = Field(default='* * * * *', description='运行的 Crontab 表达式')
one_off: bool = Field(default=False, description='是否仅运行一次')
remark: str | None = Field(default=None, description='备注')

Expand Down
28 changes: 17 additions & 11 deletions backend/app/task/service/scheduler_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@ async def create(*, obj: CreateTaskSchedulerParam) -> None:
task_scheduler = await task_scheduler_dao.get_by_name(db, obj.name)
if task_scheduler:
raise errors.ConflictError(msg='任务调度已存在')
if obj.type == TaskSchedulerType.CRONTAB:
crontab_split = obj.crontab.split(' ')
if len(crontab_split) != 5:
raise errors.RequestError(msg='Crontab 表达式非法')
crontab_verify('m', crontab_split[0])
crontab_verify('h', crontab_split[1])
crontab_verify('dow', crontab_split[2])
crontab_verify('dom', crontab_split[3])
crontab_verify('moy', crontab_split[4])
await task_scheduler_dao.create(db, obj)

@staticmethod
Expand All @@ -83,11 +92,14 @@ async def update(*, pk: int, obj: UpdateTaskSchedulerParam) -> int:
if await task_scheduler_dao.get_by_name(db, obj.name):
raise errors.ConflictError(msg='任务调度已存在')
if task_scheduler.type == TaskSchedulerType.CRONTAB:
crontab_verify('m', task_scheduler.crontab_minute)
crontab_verify('h', task_scheduler.crontab_hour)
crontab_verify('dow', task_scheduler.crontab_day_of_week)
crontab_verify('dom', task_scheduler.crontab_day_of_month)
crontab_verify('moy', task_scheduler.crontab_month_of_year)
crontab_split = obj.crontab.split(' ')
if len(crontab_split) != 5:
raise errors.RequestError(msg='Crontab 表达式非法')
crontab_verify('m', crontab_split[0])
crontab_verify('h', crontab_split[1])
crontab_verify('dow', crontab_split[2])
crontab_verify('dom', crontab_split[3])
crontab_verify('moy', crontab_split[4])
count = await task_scheduler_dao.update(db, pk, obj)
return count

Expand All @@ -103,12 +115,6 @@ async def update_status(*, pk: int) -> int:
task_scheduler = await task_scheduler_dao.get(db, pk)
if not task_scheduler:
raise errors.NotFoundError(msg='任务调度不存在')
if task_scheduler.type == TaskSchedulerType.CRONTAB:
crontab_verify('m', task_scheduler.crontab_minute)
crontab_verify('h', task_scheduler.crontab_hour)
crontab_verify('dow', task_scheduler.crontab_day_of_week)
crontab_verify('dom', task_scheduler.crontab_day_of_month)
crontab_verify('moy', task_scheduler.crontab_month_of_year)
count = await task_scheduler_dao.set_status(db, pk, not task_scheduler.enabled)
return count

Expand Down
46 changes: 23 additions & 23 deletions backend/app/task/utils/schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,14 @@ def __init__(self, model: TaskScheduler, app=None):
and model.interval_period is not None
):
self.schedule = schedules.schedule(timedelta(**{model.interval_period: model.interval_every}))
elif model.type == TaskSchedulerType.CRONTAB and model.crontab_minute is not None:
elif model.type == TaskSchedulerType.CRONTAB and model.crontab is not None:
crontab_split = model.crontab.split(' ')
self.schedule = TzAwareCrontab(
minute=model.crontab_minute,
hour=model.crontab_hour or '*',
day_of_week=model.crontab_day_of_week or '*',
day_of_month=model.crontab_day_of_month or '*',
month_of_year=model.crontab_month_of_year or '*',
minute=crontab_split[0],
hour=crontab_split[1],
day_of_week=crontab_split[2],
day_of_month=crontab_split[3],
month_of_year=crontab_split[4],
)
else:
raise errors.NotFoundError(msg=f'{self.name} 计划为空!')
Expand All @@ -63,8 +64,8 @@ def __init__(self, model: TaskScheduler, app=None):
asyncio.create_task(self._disable(model))

try:
self.args = json.loads(model.args) if model.args else []
self.kwargs = json.loads(model.kwargs) if model.kwargs else {}
self.args = json.loads(model.args) if model.args else None
self.kwargs = json.loads(model.kwargs) if model.kwargs else None
except ValueError as exc:
logger.error(f'禁用参数错误的任务:{self.name};error: {str(exc)}')
asyncio.create_task(self._disable(model))
Expand Down Expand Up @@ -187,22 +188,21 @@ async def to_model_schedule(name: str, task: str, schedule: schedules.schedule |
if not obj:
obj = TaskScheduler(**CreateTaskSchedulerParam(task=task, **spec).model_dump())
elif isinstance(schedule, schedules.crontab):
crontab_minute = schedule._orig_minute if crontab_verify('m', schedule._orig_minute, False) else '*'
crontab_hour = schedule._orig_hour if crontab_verify('h', schedule._orig_hour, False) else '*'
crontab_day_of_week = (
schedule._orig_day_of_week if crontab_verify('dom', schedule._orig_day_of_week, False) else '*'
)
crontab_day_of_month = (
schedule._orig_day_of_month if crontab_verify('dom', schedule._orig_day_of_month, False) else '*'
)
crontab_month_of_year = (
schedule._orig_month_of_year if crontab_verify('moy', schedule._orig_month_of_year, False) else '*'
)
spec = {
'name': name,
'type': TaskSchedulerType.CRONTAB.value,
'crontab_minute': schedule._orig_minute
if crontab_verify('m', schedule._orig_minute, False)
else '*',
'crontab_hour': schedule._orig_hour if crontab_verify('h', schedule._orig_hour, False) else '*',
'crontab_day_of_week': schedule._orig_day_of_week
if crontab_verify('dom', schedule._orig_day_of_week, False)
else '*',
'crontab_day_of_month': schedule._orig_day_of_month
if crontab_verify('dom', schedule._orig_day_of_month, False)
else '*',
'crontab_month_of_year': schedule._orig_month_of_year
if crontab_verify('moy', schedule._orig_month_of_year, False)
else '*',
'crontab': f'{crontab_minute} {crontab_hour} {crontab_day_of_week} {crontab_day_of_month} {crontab_month_of_year}', # noqa: E501
}
stmt = select(TaskScheduler).filter_by(**spec)
query = await db.execute(stmt)
Expand Down Expand Up @@ -233,8 +233,8 @@ async def _unpack_fields(
except KeyError:
continue
model_dict.update(
args=json.dumps(args or []),
kwargs=json.dumps(kwargs or {}),
args=json.dumps(args) if args else None,
kwargs=json.dumps(kwargs) if kwargs else None,
**cls._unpack_options(**options or {}),
**entry,
)
Expand Down