Skip to content

Add support for celery dynamic tasks #715

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion backend/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ OAUTH2_LINUX_DO_CLIENT_SECRET='test'
# App Task
# Celery
CELERY_BROKER_REDIS_DATABASE=1
CELERY_BACKEND_REDIS_DATABASE=2
# Rabbitmq
CELERY_RABBITMQ_HOST='127.0.0.1'
CELERY_RABBITMQ_PORT=5672
Expand Down
13 changes: 4 additions & 9 deletions backend/alembic/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,16 @@

sys.path.append('../')

from backend.app import get_app_models
from backend.common.model import MappedBase
from backend.core import path_conf
from backend.database.db import SQLALCHEMY_DATABASE_URL
from backend.plugin.tools import get_plugin_models

# import your new model here
from backend.app.admin.model import * # noqa: F401
from backend.plugin.code_generator.model import * # noqa: F401

# import plugin model
for cls in get_plugin_models():
# import models
for cls in get_app_models() + get_plugin_models():
class_name = cls.__name__
if class_name in globals():
print(f'\nWarning: Class "{class_name}" already exists in global namespace.')
else:
if class_name not in globals():
globals()[class_name] = cls

if not os.path.exists(path_conf.ALEMBIC_VERSION_DIR):
Expand Down
41 changes: 41 additions & 0 deletions backend/app/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,43 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import inspect
import os.path

from backend.common.log import log
from backend.core.path_conf import BASE_PATH
from backend.utils.import_parse import import_module_cached


def get_app_models():
"""获取 app 所有模型类"""
app_path = os.path.join(BASE_PATH, 'app')
list_dirs = os.listdir(app_path)

apps = []

for d in list_dirs:
if os.path.isdir(os.path.join(app_path, d)) and d != '__pycache__':
apps.append(d)

classes = []

for app in apps:
try:
module_path = f'backend.app.{app}.model'
module = import_module_cached(module_path)
except Exception as e:
log.warning(f'应用 {app} 中不包含 model 相关配置: {e}')
continue

for name, obj in inspect.getmembers(module):
if inspect.isclass(obj):
classes.append(obj)

return classes


# import all app models for auto create db tables
for cls in get_app_models():
class_name = cls.__name__
if class_name not in globals():
globals()[class_name] = cls
12 changes: 6 additions & 6 deletions backend/app/task/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,21 @@
当前任务使用 Celery
实现,实施方案请查看 [#225](https://github.com/fastapi-practices/fastapi_best_architecture/discussions/225)

## 添加任务
## 定时任务

> [!IMPORTANT]
> 由于 Celery 任务扫描规则,使其对任务的目录结构要求及其严格,务必在 celery_task 目录下添加任务
`backend/app/task/tasks/beat.py` 文件内编写相关定时任务

### 简单任务

可以直接在 `tasks.py` 文件内编写相关任务代码
`backend/app/task/tasks/tasks.py` 文件内编写相关任务代码

### 层级任务

如果你想对任务进行目录层级划分,使任务结构更加清晰,你可以新建任意目录,但必须注意的是

1. 新建目录后,务必更新任务配置 `CELERY_TASKS_PACKAGES`,将新建目录添加到此列表
2. 在新建目录下,务必添加 `tasks.py` 文件,并在此文件中编写相关任务代码
1.`backend/app/task/tasks` 目录下新建 python 包目录
2. 新建目录后,务必更新 `conf.py` 配置中的 `CELERY_TASKS_PACKAGES`,将新建目录模块路径添加到此列表
3. 在新建目录下,务必添加 `tasks.py` 文件,并在此文件中编写相关任务代码

## 消息代理

Expand Down
8 changes: 5 additions & 3 deletions backend/app/task/api/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
# -*- coding: utf-8 -*-
from fastapi import APIRouter

from backend.app.task.api.v1.task import router as task_router
from backend.app.task.api.v1.result import router as task_result_router
from backend.app.task.api.v1.scheduler import router as task_scheduler_router
from backend.core.conf import settings

v1 = APIRouter(prefix=settings.FASTAPI_API_V1_PATH)
v1 = APIRouter(prefix=f'{settings.FASTAPI_API_V1_PATH}/task', tags=['任务'])

v1.include_router(task_router, prefix='/tasks', tags=['任务'])
v1.include_router(task_result_router, prefix='/results')
v1.include_router(task_scheduler_router, prefix='/schedulers')
57 changes: 57 additions & 0 deletions backend/app/task/api/v1/result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Annotated

from fastapi import APIRouter, Depends, Path, Query

from backend.app.task.schema.result import DeleteTaskResultParam, GetTaskResultDetail
from backend.app.task.service.result_service import task_result_service
from backend.common.pagination import DependsPagination, PageData, paging_data
from backend.common.response.response_schema import ResponseModel, ResponseSchemaModel, response_base
from backend.common.security.jwt import DependsJwtAuth
from backend.common.security.permission import RequestPermission
from backend.common.security.rbac import DependsRBAC
from backend.database.db import CurrentSession

router = APIRouter()


@router.get('/{pk}', summary='获取任务结果详情', dependencies=[DependsJwtAuth])
async def get_task_result(
pk: Annotated[int, Path(description='任务结果 ID')],
) -> ResponseSchemaModel[GetTaskResultDetail]:
result = await task_result_service.get(pk=pk)
return response_base.success(data=result)


@router.get(
'',
summary='分页获取所有任务结果',
dependencies=[
DependsJwtAuth,
DependsPagination,
],
)
async def get_task_results_paged(
db: CurrentSession,
name: Annotated[str | None, Query(description='任务名称')] = None,
task_id: Annotated[str | None, Query(description='任务 ID')] = None,
) -> ResponseSchemaModel[PageData[GetTaskResultDetail]]:
result_select = await task_result_service.get_select(name=name, task_id=task_id)
page_data = await paging_data(db, result_select)
return response_base.success(data=page_data)


@router.delete(
'',
summary='批量删除任务结果',
dependencies=[
Depends(RequestPermission('sys:task:del')),
DependsRBAC,
],
)
async def delete_task_result(obj: DeleteTaskResultParam) -> ResponseModel:
count = await task_result_service.delete(obj=obj)
if count > 0:
return response_base.success()
return response_base.fail()
130 changes: 130 additions & 0 deletions backend/app/task/api/v1/scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Annotated

from fastapi import APIRouter, Depends, Path, Query

from backend.app.task.schema.scheduler import CreateTaskSchedulerParam, GetTaskSchedulerDetail, UpdateTaskSchedulerParam
from backend.app.task.service.scheduler_service import task_scheduler_service
from backend.common.pagination import DependsPagination, PageData, paging_data
from backend.common.response.response_schema import ResponseModel, ResponseSchemaModel, response_base
from backend.common.security.jwt import DependsJwtAuth
from backend.common.security.permission import RequestPermission
from backend.common.security.rbac import DependsRBAC
from backend.database.db import CurrentSession

router = APIRouter()


@router.get('/all', summary='获取所有任务调度', dependencies=[DependsJwtAuth])
async def get_all_task_schedulers() -> ResponseModel:
schedulers = await task_scheduler_service.get_all()
return response_base.success(data=schedulers)


@router.get('/{pk}', summary='获取任务调度详情', dependencies=[DependsJwtAuth])
async def get_task_scheduler(pk: Annotated[int, Path(description='任务调度 ID')]):
task_scheduler = await task_scheduler_service.get(pk=pk)
return response_base.success(data=task_scheduler)


@router.get(
'',
summary='分页获取所有任务调度',
dependencies=[
DependsJwtAuth,
DependsPagination,
],
)
async def get_task_scheduler_paged(
db: CurrentSession,
name: Annotated[int, Path(description='任务调度名称')] = None,
type: Annotated[int | None, Query(description='任务调度类型')] = None,
) -> ResponseSchemaModel[PageData[GetTaskSchedulerDetail]]:
task_scheduler_select = await task_scheduler_service.get_select(name=name, type=type)
page_data = await paging_data(db, task_scheduler_select)
return response_base.success(data=page_data)


@router.post(
'',
summary='创建任务调度',
dependencies=[
Depends(RequestPermission('sys:task:add')),
DependsRBAC,
],
)
async def create_task_scheduler(obj: CreateTaskSchedulerParam):
await task_scheduler_service.create(obj=obj)
return response_base.success()


@router.put(
'/{pk}',
summary='更新任务调度',
dependencies=[
Depends(RequestPermission('sys:task:edit')),
DependsRBAC,
],
)
async def update_task_scheduler(pk: Annotated[int, Path(description='任务调度 ID')], obj: UpdateTaskSchedulerParam):
count = await task_scheduler_service.update(pk=pk, obj=obj)
if count > 0:
return response_base.success()
return response_base.fail()


@router.put(
'/{pk}/status',
summary='更新任务调度状态',
dependencies=[
Depends(RequestPermission('sys:task:edit')),
DependsRBAC,
],
)
async def update_task_scheduler_status(pk: Annotated[int, Path(description='任务调度 ID')]):
count = await task_scheduler_service.update_status(pk=pk)
if count > 0:
return response_base.success()
return response_base.fail()


@router.delete(
'/{pk}',
summary='删除任务调度',
dependencies=[
Depends(RequestPermission('sys:task:del')),
DependsRBAC,
],
)
async def delete_task_scheduler(pk: Annotated[int, Path(description='任务调度 ID')]):
count = await task_scheduler_service.delete(pk=pk)
if count > 0:
return response_base.success()
return response_base.fail()


@router.post(
'/{pk}/executions',
summary='手动执行任务',
dependencies=[
Depends(RequestPermission('sys:task:exec')),
DependsRBAC,
],
)
async def execute_task(pk: Annotated[int, Path(description='任务调度 ID')]) -> ResponseSchemaModel[str]:
await task_scheduler_service.execute(pk=pk)
return response_base.success()


@router.delete(
'/{task_id}/cancel',
summary='撤销任务',
dependencies=[
Depends(RequestPermission('sys:task:revoke')),
DependsRBAC,
],
)
async def revoke_task(task_id: Annotated[str, Path(description='任务 UUID')]) -> ResponseModel:
await task_scheduler_service.revoke(task_id=task_id)
return response_base.success()
58 changes: 0 additions & 58 deletions backend/app/task/api/v1/task.py

This file was deleted.

Loading