Skip to content

Commit e442acb

Browse files
committed
🎨 Add exceptions and annotations
1 parent 4a132b1 commit e442acb

File tree

8 files changed

+92
-56
lines changed

8 files changed

+92
-56
lines changed

src/exceptions.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
class InvalidTrigger(Exception):
2+
def __init__(self, trigger: str) -> None:
3+
msg = f"Invalid trigger: {trigger}(type: {type(trigger)}"
4+
super().__init__(msg)
5+
6+
7+
class InvalidJobStore(Exception):
8+
def __init__(self, store: str) -> None:
9+
msg = f"Invalid job store: {store}"
10+
super().__init__(msg)
11+
12+
13+
class InvalidExecutor(Exception):
14+
def __init__(self, executor: str) -> None:
15+
msg = f"Executor type {executor} not supported"
16+
super().__init__(msg)
17+
18+
19+
class InvalidAction(Exception):
20+
def __init__(self, action: str) -> None:
21+
msg = f"Invalid action: {action}"
22+
super().__init__(msg)

src/routes/executor.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,13 @@
1010

1111
from ..scheduler import scheduler
1212
from ..schema import ExecutorInfo
13-
from ..shared import frame_page
13+
from ..shared import Components, frame_page
1414

1515
router = APIRouter(prefix="/job/executor", tags=["executor"])
1616

1717

1818
@router.get("", response_model=FastUI, response_model_exclude_none=True)
19-
def store():
19+
def store() -> Components:
2020
executors = [
2121
ExecutorInfo.model_validate({"alias": alias, "executor": executor})
2222
for alias, executor in scheduler._executors.items()
@@ -64,7 +64,9 @@ def store():
6464

6565

6666
@router.post("/new", response_model=FastUI, response_model_exclude_none=True)
67-
async def new_executor(new_executor: Annotated[ExecutorInfo, fastui_form(ExecutorInfo)]):
67+
async def new_executor(
68+
new_executor: Annotated[ExecutorInfo, fastui_form(ExecutorInfo)],
69+
) -> c.Paragraph:
6870
alias = new_executor.alias
6971
if new_executor.alias in scheduler._jobstores:
7072
return c.Paragraph(text=f"Executor({alias=}) already exists.")
@@ -74,10 +76,10 @@ async def new_executor(new_executor: Annotated[ExecutorInfo, fastui_form(Executo
7476

7577

7678
@router.post("/remove", response_model=FastUI, response_model_exclude_none=True)
77-
async def remove_executor(alias: str = Form()):
79+
async def remove_executor(alias: Annotated[str, Form()]) -> c.Paragraph:
7880
if alias not in scheduler._executors:
7981
return c.Paragraph(text=f"Executor({alias=}) not exists.")
80-
elif alias == "default":
82+
if alias == "default":
8183
return c.Paragraph(text="Cannot remove default executor.")
8284
scheduler.remove_executor(alias)
8385
return c.Paragraph(text=f"Executor({alias=}) removed successfully.")

src/routes/job.py

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,22 @@
44
from uuid import uuid4
55

66
from fastapi import APIRouter
7-
from fastui import FastUI
7+
from fastui import AnyComponent, FastUI
88
from fastui import components as c
99
from fastui.components.display import DisplayLookup
1010
from fastui.events import BackEvent, GoToEvent, PageEvent
1111
from fastui.forms import fastui_form
1212

13+
from ..exceptions import InvalidAction
1314
from ..scheduler import scheduler
1415
from ..schema import JobInfo, ModifyJobParam, NewJobParam
15-
from ..shared import confirm_modal, frame_page
16+
from ..shared import Components, confirm_modal, frame_page
1617

1718
router = APIRouter(prefix="/job", tags=["job"])
1819

1920

2021
@router.get("/", response_model=FastUI, response_model_exclude_none=True)
21-
async def jobs():
22+
async def jobs() -> Components:
2223
jobs = scheduler.get_jobs()
2324

2425
return frame_page(
@@ -54,7 +55,7 @@ async def jobs():
5455

5556

5657
@router.post("/", response_model=FastUI, response_model_exclude_none=True)
57-
async def new_job(job_info: Annotated[NewJobParam, fastui_form(NewJobParam)]):
58+
async def new_job(job_info: Annotated[NewJobParam, fastui_form(NewJobParam)]) -> Components:
5859
trigger = job_info.get_trigger()
5960
job = scheduler.add_job(
6061
job_info.func,
@@ -77,7 +78,7 @@ async def new_job(job_info: Annotated[NewJobParam, fastui_form(NewJobParam)]):
7778

7879

7980
@router.get("/detail/{id}", response_model=FastUI, response_model_exclude_none=True)
80-
async def job_detail(id: str):
81+
async def job_detail(id: str) -> Components:
8182
job = scheduler.get_job(id)
8283
if not job:
8384
return [c.FireEvent(event=GoToEvent(url="/"))]
@@ -134,7 +135,9 @@ async def job_detail(id: str):
134135

135136

136137
@router.post("/modify/{id}", response_model=FastUI, response_model_exclude_none=True)
137-
async def modify_job(id: str, job_info: Annotated[ModifyJobParam, fastui_form(ModifyJobParam)]):
138+
async def modify_job(
139+
id: str, job_info: Annotated[ModifyJobParam, fastui_form(ModifyJobParam)]
140+
) -> Components:
138141
modify_kwargs = job_info.model_dump(exclude={"trigger", "trigger_params"})
139142
modify_kwargs["trigger"] = job_info.get_trigger()
140143
modify_kwargs = dict(filter(lambda x: x[1], modify_kwargs.items()))
@@ -151,10 +154,12 @@ async def modify_job(id: str, job_info: Annotated[ModifyJobParam, fastui_form(Mo
151154

152155

153156
@router.post("/{action}/{id}", response_model=FastUI, response_model_exclude_none=True)
154-
async def pause_job(action: Literal["pause", "resume", "modify", "reload", "remove"], id: str):
157+
async def pause_job(
158+
action: Literal["pause", "resume", "modify", "reload", "remove"], id: str
159+
) -> Components:
155160
job = scheduler.get_job(id)
156161
if not job:
157-
return c.Error(title="Error", description=f"Job({id=}) not found", status_code=500)
162+
return [c.Error(title="Error", description=f"Job({id=}) not found", status_code=500)]
158163

159164
match action:
160165
case "pause":
@@ -167,7 +172,7 @@ async def pause_job(action: Literal["pause", "resume", "modify", "reload", "remo
167172
module = import_module(job.func.__module__)
168173
reload(module)
169174
case _:
170-
raise ValueError(f"Invalid action {action}")
175+
raise InvalidAction(action)
171176

172177
return [
173178
c.Paragraph(text=f"Job({id=}, name='{job.name}'), {action=} success."),
@@ -176,7 +181,7 @@ async def pause_job(action: Literal["pause", "resume", "modify", "reload", "remo
176181

177182

178183
@router.get("/view/{path:path}", response_model=FastUI, response_model_exclude_none=True)
179-
def edit_job_script(path: Path):
184+
def edit_job_script(path: Path) -> AnyComponent:
180185
if not path.exists():
181186
return c.Error(title="Error", description=f"Module {path} not found", status_code=500)
182187
return c.Code(text=path.read_text(), language="python")

src/routes/job_log.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
from functools import lru_cache
21
from typing import Annotated, Literal
32

43
from fastapi import APIRouter
@@ -11,22 +10,22 @@
1110

1211
from ..config import LOG_PATH
1312
from ..log import PARSE_PATTERN, logger
14-
from ..shared import frame_page
13+
from ..shared import Components, frame_page
1514
from .api import get_available_job_logs
1615

1716
router = APIRouter(prefix="/job/log", tags=["job_log"])
1817

1918
PAGE_LINE = 1000
2019

2120

22-
def parse_log_message(text: str):
21+
def parse_log_message(text: str) -> str:
2322
messages = text.strip().split("\n", 1)
2423
if len(messages) > 1:
2524
return f"{messages[0]}\n```\n{messages[1]}\n```"
2625
return messages[0]
2726

2827

29-
def get_log_content(log_file: str, level: str):
28+
def get_log_content(log_file: str, level: str) -> list[str]:
3029
return [
3130
f"**[{line['pid']}] {line['time']}** *{line['level']}* "
3231
f"`{line['name']}:{line['line']}`: {parse_log_message(line['message'])}"
@@ -41,15 +40,15 @@ async def get_log(
4140
log_file: str | None = None,
4241
level: str = "",
4342
page: Annotated[int, Field(ge=1)] = 1,
44-
):
43+
) -> Components:
4544
form_fields: list[FormField] = [
4645
FormFieldSelect(
4746
title="Level",
4847
name="level",
4948
placeholder="Filter by level",
5049
options=[
5150
{"value": level, "label": level}
52-
for level in logger._core.levels.keys() # type: ignore
51+
for level in logger._core.levels # type: ignore
5352
],
5453
)
5554
]
@@ -74,7 +73,7 @@ async def get_log(
7473
log_file = "scheduler.log"
7574

7675
if not (log_file and (LOG_PATH / log_file).exists()):
77-
return c.Error(title="File not found", description=f"Log file {log_file} not found.")
76+
return [c.Error(title="File not found", description=f"Log file {log_file} not found.")]
7877

7978
contents = get_log_content(log_file, level)
8079
return frame_page(

src/routes/job_store.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,13 @@
1010

1111
from ..scheduler import scheduler
1212
from ..schema import JobStoreInfo
13-
from ..shared import frame_page
13+
from ..shared import Components, frame_page
1414

1515
router = APIRouter(prefix="/job/store", tags=["job_store"])
1616

1717

1818
@router.get("", response_model=FastUI, response_model_exclude_none=True)
19-
def store():
19+
def store() -> Components:
2020
job_stores = [
2121
JobStoreInfo.model_validate({"alias": alias, "store": store})
2222
for alias, store in scheduler._jobstores.items()
@@ -64,7 +64,9 @@ def store():
6464

6565

6666
@router.post("/new", response_model=FastUI, response_model_exclude_none=True)
67-
async def new_job_store(new_store: Annotated[JobStoreInfo, fastui_form(JobStoreInfo)]):
67+
async def new_job_store(
68+
new_store: Annotated[JobStoreInfo, fastui_form(JobStoreInfo)],
69+
) -> c.Paragraph:
6870
alias = new_store.alias
6971
if new_store.alias in scheduler._jobstores:
7072
return c.Paragraph(text=f"Job store({alias=}) already exists")
@@ -74,10 +76,10 @@ async def new_job_store(new_store: Annotated[JobStoreInfo, fastui_form(JobStoreI
7476

7577

7678
@router.post("/remove", response_model=FastUI, response_model_exclude_none=True)
77-
async def remove_job_store(alias: str = Form()):
79+
async def remove_job_store(alias: Annotated[str, Form()]) -> c.Paragraph:
7880
if alias not in scheduler._jobstores:
7981
return c.Paragraph(text=f"Job store({alias=}) not exists")
80-
elif alias == "default":
82+
if alias == "default":
8183
return c.Paragraph(text="Cannot remove default job store")
8284
scheduler.remove_jobstore(alias)
8385
return c.Paragraph(text=f"Job store({alias=}) removed successfully")

src/scheduler.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from functools import partial
2-
from typing import Literal
2+
from typing import TYPE_CHECKING, Literal
33

44
from apscheduler.events import (
55
EVENT_EXECUTOR_ADDED,
@@ -18,43 +18,45 @@
1818
JobSubmissionEvent,
1919
SchedulerEvent,
2020
)
21-
from apscheduler.job import Job
2221
from apscheduler.schedulers.asyncio import AsyncIOScheduler
2322

2423
from .config import SCHEDULER_CONFIG
2524
from .log import server_log
2625

26+
if TYPE_CHECKING:
27+
from apscheduler.job import Job
28+
2729
scheduler = AsyncIOScheduler(**SCHEDULER_CONFIG)
2830

2931

3032
def listen_executor_or_jobstore_event(
3133
event: SchedulerEvent,
3234
mapper: dict,
3335
action: Literal["Add executor", "Remove executor", "Add job store", "Remove job store"],
34-
):
36+
) -> None:
3537
obj = f"{event.alias}[{mapper[event.alias]}]" if event.alias in mapper else event.alias
3638
server_log.debug(f"{action} {obj}")
3739

3840

3941
def listen_job_event(
4042
event: JobEvent, action: Literal["Add job", "Remove job", "Modify job", "Submit job"]
41-
):
43+
) -> None:
4244
job: Job | None = scheduler.get_job(event.job_id)
4345
server_log.debug(f"{action}: {job.name if job else ''}[{event.job_id}]")
4446

4547

4648
def listen_job_execution_event(
4749
event: JobExecutionEvent,
4850
action: Literal["Executed job", "Missed job", "Error job"],
49-
):
51+
) -> None:
5052
message = f"{action}: {event.job_id}[{scheduler.get_job(event.job_id)}]"
5153
if event.exception:
5254
server_log.opt(exception=event.exception).error(message)
5355
else:
5456
server_log.debug(message)
5557

5658

57-
def listen_job_submission_event(event: JobSubmissionEvent):
59+
def listen_job_submission_event(event: JobSubmissionEvent) -> None:
5860
job: Job | None = scheduler.get_job(event.job_id)
5961
if not job:
6062
return

0 commit comments

Comments
 (0)