Skip to content

Commit 94cb221

Browse files
committed
feat(backend): disk-backed jobs under /tmp to survive restarts; more robust polling/results
1 parent 8ac3c43 commit 94cb221

File tree

1 file changed

+77
-45
lines changed

1 file changed

+77
-45
lines changed

backend/app/api/v1/routes.py

Lines changed: 77 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77
import threading
88
import uuid
99
import time
10-
from dataclasses import dataclass, field
10+
import json
11+
from dataclasses import dataclass, field, asdict
1112
from typing import Dict, Optional
1213

1314
from app.settings import settings
@@ -28,9 +29,8 @@ def _cleanup(paths: list[str]) -> None:
2829

2930

3031
# --------------------
31-
# Simple in-memory job queue for long-running processing
32-
# NOTE: This is suitable for single-instance demo deployments.
33-
# For production, use external storage and a proper queue.
32+
# Disk-backed job store (under /tmp) so jobs survive simple restarts.
33+
# NOTE: For real production usage, use durable storage/queue.
3434

3535
@dataclass
3636
class Job:
@@ -45,18 +45,54 @@ class Job:
4545
updated_at: float = field(default_factory=time.time)
4646

4747

48-
JOBS: Dict[str, Job] = {}
48+
JOBS_DIR = Path(tempfile.gettempdir()) / "sportiq-jobs"
49+
JOBS_DIR.mkdir(parents=True, exist_ok=True)
4950
JOBS_LOCK = threading.Lock()
5051

5152

52-
def _run_job(job_id: str, model_path: Path) -> None:
53+
def _job_dir(job_id: str) -> Path:
54+
return JOBS_DIR / job_id
55+
56+
57+
def _meta_path(job_id: str) -> Path:
58+
return _job_dir(job_id) / "meta.json"
59+
60+
61+
def _write_meta(job: Job) -> None:
62+
d = _job_dir(job.id)
63+
d.mkdir(parents=True, exist_ok=True)
64+
with open(_meta_path(job.id), "w", encoding="utf-8") as f:
65+
json.dump(asdict(job), f)
66+
67+
68+
def _read_meta(job_id: str) -> Optional[Job]:
69+
p = _meta_path(job_id)
70+
if not p.exists():
71+
return None
72+
try:
73+
with open(p, "r", encoding="utf-8") as f:
74+
data = json.load(f)
75+
return Job(**data)
76+
except Exception:
77+
return None
78+
79+
80+
def _update_meta(job_id: str, **updates) -> Optional[Job]:
5381
with JOBS_LOCK:
54-
job = JOBS.get(job_id)
82+
job = _read_meta(job_id)
5583
if not job:
56-
return
57-
job.status = "processing"
84+
return None
85+
for k, v in updates.items():
86+
setattr(job, k, v)
5887
job.updated_at = time.time()
88+
_write_meta(job)
89+
return job
90+
5991

92+
def _run_job(job_id: str, model_path: Path) -> None:
93+
job = _update_meta(job_id, status="processing")
94+
if not job:
95+
return
6096
try:
6197
run_pipeline(
6298
input_path=Path(job.input_path),
@@ -65,20 +101,11 @@ def _run_job(job_id: str, model_path: Path) -> None:
65101
intermediate_path=Path(job.intermediate_path),
66102
fast=job.fast,
67103
)
68-
69104
if not os.path.exists(job.output_path) or os.path.getsize(job.output_path) == 0:
70105
raise RuntimeError("Processing produced an empty output file")
71-
72-
with JOBS_LOCK:
73-
job.status = "done"
74-
job.updated_at = time.time()
106+
_update_meta(job_id, status="done")
75107
except Exception as e:
76-
with JOBS_LOCK:
77-
job = JOBS.get(job_id)
78-
if job:
79-
job.status = "error"
80-
job.error = str(e)
81-
job.updated_at = time.time()
108+
_update_meta(job_id, status="error", error=str(e))
82109

83110

84111
@router.post("/jobs")
@@ -94,21 +121,23 @@ async def create_job(video: UploadFile = File(...), fast: bool = False):
94121
await video.seek(0)
95122
shutil.copyfileobj(video.file, in_f)
96123

97-
inter_fd, inter_path = tempfile.mkstemp(suffix=".mp4")
98-
os.close(inter_fd)
99-
out_fd, out_path = tempfile.mkstemp(suffix=".mp4")
100-
os.close(out_fd)
101-
102124
job_id = str(uuid.uuid4())
125+
d = _job_dir(job_id)
126+
d.mkdir(parents=True, exist_ok=True)
127+
# Move input to job dir
128+
job_input = str(d / f"input{suffix}")
129+
shutil.move(in_path, job_input)
130+
job_inter = str(d / "intermediate.mp4")
131+
job_out = str(d / "output.mp4")
132+
103133
job = Job(
104134
id=job_id,
105-
input_path=in_path,
106-
output_path=out_path,
107-
intermediate_path=inter_path,
135+
input_path=job_input,
136+
output_path=job_out,
137+
intermediate_path=job_inter,
108138
fast=fast,
109139
)
110-
with JOBS_LOCK:
111-
JOBS[job_id] = job
140+
_write_meta(job)
112141

113142
t = threading.Thread(target=_run_job, args=(job_id, Path(settings.model_path)), daemon=True)
114143
t.start()
@@ -118,35 +147,38 @@ async def create_job(video: UploadFile = File(...), fast: bool = False):
118147

119148
@router.get("/jobs/{job_id}/status")
120149
async def get_job_status(job_id: str):
121-
with JOBS_LOCK:
122-
job = JOBS.get(job_id)
123-
if not job:
124-
raise HTTPException(status_code=404, detail="Job not found")
125-
return {"job_id": job.id, "status": job.status, "error": job.error}
150+
job = _read_meta(job_id)
151+
if not job:
152+
raise HTTPException(status_code=404, detail="Job not found")
153+
return {"job_id": job.id, "status": job.status, "error": job.error}
126154

127155

128156
@router.get("/jobs/{job_id}/result")
129157
async def get_job_result(job_id: str, background_tasks: BackgroundTasks):
130-
with JOBS_LOCK:
131-
job = JOBS.get(job_id)
132-
if not job:
133-
raise HTTPException(status_code=404, detail="Job not found")
134-
status = job.status
135-
out_path = job.output_path
136-
to_cleanup = [job.input_path, job.intermediate_path, job.output_path]
158+
job = _read_meta(job_id)
159+
if not job:
160+
raise HTTPException(status_code=404, detail="Job not found")
161+
status = job.status
162+
out_path = job.output_path
163+
to_cleanup = [job.input_path, job.intermediate_path, job.output_path, str(_meta_path(job_id))]
137164

138165
if status != "done":
139166
if status == "error":
140-
raise HTTPException(status_code=500, detail="Job failed")
167+
raise HTTPException(status_code=500, detail=job.error or "Job failed")
141168
return JSONResponse(status_code=202, content={"status": status})
142169

143170
if not os.path.exists(out_path) or os.path.getsize(out_path) == 0:
144171
raise HTTPException(status_code=500, detail="Output not available")
145172

146173
# Clean up files after response is sent
147174
background_tasks.add_task(_cleanup, to_cleanup)
148-
with JOBS_LOCK:
149-
JOBS.pop(job_id, None)
175+
# Remove job dir
176+
def _rm_dir(path: Path):
177+
try:
178+
shutil.rmtree(path, ignore_errors=True)
179+
except Exception:
180+
pass
181+
background_tasks.add_task(_rm_dir, _job_dir(job_id))
150182

151183
return FileResponse(out_path, media_type="video/mp4", filename="analyzed.mp4")
152184

0 commit comments

Comments
 (0)