Skip to content

Commit 98ef07a

Browse files
authored
Simplify celery task crontab config (#722)
1 parent 802b0d4 commit 98ef07a

File tree

4 files changed

+44
-50
lines changed

4 files changed

+44
-50
lines changed

backend/app/task/model/scheduler.py

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,7 @@ class TaskScheduler(Base):
4141
type: Mapped[int] = mapped_column(comment='调度类型(0间隔 1定时)')
4242
interval_every: Mapped[int | None] = mapped_column(comment='任务再次运行前的间隔周期数')
4343
interval_period: Mapped[str | None] = mapped_column(String(255), comment='任务运行之间的周期类型')
44-
crontab_minute: Mapped[str | None] = mapped_column(String(60 * 4), default='*', comment='运行的分钟,"*" 表示全部')
45-
crontab_hour: Mapped[str | None] = mapped_column(String(24 * 4), default='*', comment='运行的小时,"*" 表示全部')
46-
crontab_day_of_week: Mapped[str | None] = mapped_column(String(64), default='*', comment='运行的星期,"*" 表示全部')
47-
crontab_day_of_month: Mapped[str | None] = mapped_column(
48-
String(31 * 4), default='*', comment='运行的每月日期,"*" 表示全部'
49-
)
50-
crontab_month_of_year: Mapped[str | None] = mapped_column(
51-
String(64), default='*', comment='运行的月份,"*" 表示全部'
52-
)
44+
crontab: Mapped[str | None] = mapped_column(String(50), default='* * * * *', comment='任务运行的 Crontab 计划')
5345
one_off: Mapped[bool] = mapped_column(
5446
Boolean().with_variant(INTEGER, 'postgresql'), default=False, comment='是否仅运行一次'
5547
)

backend/app/task/schema/scheduler.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ class TaskSchedulerSchemeBase(SchemaBase):
1414

1515
name: str = Field(description='任务名称')
1616
task: str = Field(description='要运行的 Celery 任务')
17-
args: JsonValue | None = Field(default='[]', description='任务可接收的位置参数')
18-
kwargs: JsonValue | None = Field(default='{}', description='任务可接收的关键字参数')
17+
args: JsonValue | None = Field(default=None, description='任务可接收的位置参数')
18+
kwargs: JsonValue | None = Field(default=None, description='任务可接收的关键字参数')
1919
queue: str | None = Field(default=None, description='CELERY_TASK_QUEUES 中定义的队列')
2020
exchange: str | None = Field(default=None, description='低级别 AMQP 路由的交换机')
2121
routing_key: str | None = Field(default=None, description='低级别 AMQP 路由的路由密钥')
@@ -25,11 +25,7 @@ class TaskSchedulerSchemeBase(SchemaBase):
2525
type: TaskSchedulerType = Field(default=TaskSchedulerType.INTERVAL, description='任务调度类型(0间隔 1定时)')
2626
interval_every: int | None = Field(default=None, description='任务再次运行前的间隔周期数')
2727
interval_period: PeriodType | None = Field(default=None, description='任务运行之间的周期类型')
28-
crontab_minute: str | None = Field(default='*', description='运行的分钟,"*" 表示全部')
29-
crontab_hour: str | None = Field(default='*', description='运行的小时,"*" 表示全部')
30-
crontab_day_of_week: str | None = Field(default='*', description='运行的星期,"*" 表示全部')
31-
crontab_day_of_month: str | None = Field(default='*', description='运行的每月日期,"*" 表示全部')
32-
crontab_month_of_year: str | None = Field(default='*', description='运行的月份,"*" 表示全部')
28+
crontab: str = Field(default='* * * * *', description='运行的 Crontab 表达式')
3329
one_off: bool = Field(default=False, description='是否仅运行一次')
3430
remark: str | None = Field(default=None, description='备注')
3531

backend/app/task/service/scheduler_service.py

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,15 @@ async def create(*, obj: CreateTaskSchedulerParam) -> None:
6464
task_scheduler = await task_scheduler_dao.get_by_name(db, obj.name)
6565
if task_scheduler:
6666
raise errors.ConflictError(msg='任务调度已存在')
67+
if obj.type == TaskSchedulerType.CRONTAB:
68+
crontab_split = obj.crontab.split(' ')
69+
if len(crontab_split) != 5:
70+
raise errors.RequestError(msg='Crontab 表达式非法')
71+
crontab_verify('m', crontab_split[0])
72+
crontab_verify('h', crontab_split[1])
73+
crontab_verify('dow', crontab_split[2])
74+
crontab_verify('dom', crontab_split[3])
75+
crontab_verify('moy', crontab_split[4])
6776
await task_scheduler_dao.create(db, obj)
6877

6978
@staticmethod
@@ -83,11 +92,14 @@ async def update(*, pk: int, obj: UpdateTaskSchedulerParam) -> int:
8392
if await task_scheduler_dao.get_by_name(db, obj.name):
8493
raise errors.ConflictError(msg='任务调度已存在')
8594
if task_scheduler.type == TaskSchedulerType.CRONTAB:
86-
crontab_verify('m', task_scheduler.crontab_minute)
87-
crontab_verify('h', task_scheduler.crontab_hour)
88-
crontab_verify('dow', task_scheduler.crontab_day_of_week)
89-
crontab_verify('dom', task_scheduler.crontab_day_of_month)
90-
crontab_verify('moy', task_scheduler.crontab_month_of_year)
95+
crontab_split = obj.crontab.split(' ')
96+
if len(crontab_split) != 5:
97+
raise errors.RequestError(msg='Crontab 表达式非法')
98+
crontab_verify('m', crontab_split[0])
99+
crontab_verify('h', crontab_split[1])
100+
crontab_verify('dow', crontab_split[2])
101+
crontab_verify('dom', crontab_split[3])
102+
crontab_verify('moy', crontab_split[4])
91103
count = await task_scheduler_dao.update(db, pk, obj)
92104
return count
93105

@@ -103,12 +115,6 @@ async def update_status(*, pk: int) -> int:
103115
task_scheduler = await task_scheduler_dao.get(db, pk)
104116
if not task_scheduler:
105117
raise errors.NotFoundError(msg='任务调度不存在')
106-
if task_scheduler.type == TaskSchedulerType.CRONTAB:
107-
crontab_verify('m', task_scheduler.crontab_minute)
108-
crontab_verify('h', task_scheduler.crontab_hour)
109-
crontab_verify('dow', task_scheduler.crontab_day_of_week)
110-
crontab_verify('dom', task_scheduler.crontab_day_of_month)
111-
crontab_verify('moy', task_scheduler.crontab_month_of_year)
112118
count = await task_scheduler_dao.set_status(db, pk, not task_scheduler.enabled)
113119
return count
114120

backend/app/task/utils/schedulers.py

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,14 @@ def __init__(self, model: TaskScheduler, app=None):
4747
and model.interval_period is not None
4848
):
4949
self.schedule = schedules.schedule(timedelta(**{model.interval_period: model.interval_every}))
50-
elif model.type == TaskSchedulerType.CRONTAB and model.crontab_minute is not None:
50+
elif model.type == TaskSchedulerType.CRONTAB and model.crontab is not None:
51+
crontab_split = model.crontab.split(' ')
5152
self.schedule = TzAwareCrontab(
52-
minute=model.crontab_minute,
53-
hour=model.crontab_hour or '*',
54-
day_of_week=model.crontab_day_of_week or '*',
55-
day_of_month=model.crontab_day_of_month or '*',
56-
month_of_year=model.crontab_month_of_year or '*',
53+
minute=crontab_split[0],
54+
hour=crontab_split[1],
55+
day_of_week=crontab_split[2],
56+
day_of_month=crontab_split[3],
57+
month_of_year=crontab_split[4],
5758
)
5859
else:
5960
raise errors.NotFoundError(msg=f'{self.name} 计划为空!')
@@ -63,8 +64,8 @@ def __init__(self, model: TaskScheduler, app=None):
6364
asyncio.create_task(self._disable(model))
6465

6566
try:
66-
self.args = json.loads(model.args) if model.args else []
67-
self.kwargs = json.loads(model.kwargs) if model.kwargs else {}
67+
self.args = json.loads(model.args) if model.args else None
68+
self.kwargs = json.loads(model.kwargs) if model.kwargs else None
6869
except ValueError as exc:
6970
logger.error(f'禁用参数错误的任务:{self.name};error: {str(exc)}')
7071
asyncio.create_task(self._disable(model))
@@ -187,22 +188,21 @@ async def to_model_schedule(name: str, task: str, schedule: schedules.schedule |
187188
if not obj:
188189
obj = TaskScheduler(**CreateTaskSchedulerParam(task=task, **spec).model_dump())
189190
elif isinstance(schedule, schedules.crontab):
191+
crontab_minute = schedule._orig_minute if crontab_verify('m', schedule._orig_minute, False) else '*'
192+
crontab_hour = schedule._orig_hour if crontab_verify('h', schedule._orig_hour, False) else '*'
193+
crontab_day_of_week = (
194+
schedule._orig_day_of_week if crontab_verify('dom', schedule._orig_day_of_week, False) else '*'
195+
)
196+
crontab_day_of_month = (
197+
schedule._orig_day_of_month if crontab_verify('dom', schedule._orig_day_of_month, False) else '*'
198+
)
199+
crontab_month_of_year = (
200+
schedule._orig_month_of_year if crontab_verify('moy', schedule._orig_month_of_year, False) else '*'
201+
)
190202
spec = {
191203
'name': name,
192204
'type': TaskSchedulerType.CRONTAB.value,
193-
'crontab_minute': schedule._orig_minute
194-
if crontab_verify('m', schedule._orig_minute, False)
195-
else '*',
196-
'crontab_hour': schedule._orig_hour if crontab_verify('h', schedule._orig_hour, False) else '*',
197-
'crontab_day_of_week': schedule._orig_day_of_week
198-
if crontab_verify('dom', schedule._orig_day_of_week, False)
199-
else '*',
200-
'crontab_day_of_month': schedule._orig_day_of_month
201-
if crontab_verify('dom', schedule._orig_day_of_month, False)
202-
else '*',
203-
'crontab_month_of_year': schedule._orig_month_of_year
204-
if crontab_verify('moy', schedule._orig_month_of_year, False)
205-
else '*',
205+
'crontab': f'{crontab_minute} {crontab_hour} {crontab_day_of_week} {crontab_day_of_month} {crontab_month_of_year}', # noqa: E501
206206
}
207207
stmt = select(TaskScheduler).filter_by(**spec)
208208
query = await db.execute(stmt)
@@ -233,8 +233,8 @@ async def _unpack_fields(
233233
except KeyError:
234234
continue
235235
model_dict.update(
236-
args=json.dumps(args or []),
237-
kwargs=json.dumps(kwargs or {}),
236+
args=json.dumps(args) if args else None,
237+
kwargs=json.dumps(kwargs) if kwargs else None,
238238
**cls._unpack_options(**options or {}),
239239
**entry,
240240
)

0 commit comments

Comments
 (0)