Skip to content

Spectrogram job file downloads #124

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions config/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,12 +291,12 @@
CELERY_TASK_SERIALIZER = "json"
# https://docs.celeryq.dev/en/stable/userguide/configuration.html#std:setting-result_serializer
CELERY_RESULT_SERIALIZER = "json"
# https://docs.celeryq.dev/en/stable/userguide/configuration.html#task-time-limit
# TODO: set to whatever value is adequate in your circumstances
CELERY_TASK_TIME_LIMIT = 5 * 60
# https://docs.celeryq.dev/en/stable/userguide/configuration.html#task-soft-time-limit
# TODO: set to whatever value is adequate in your circumstances
CELERY_TASK_SOFT_TIME_LIMIT = 2 * 60
CELERY_TASK_SOFT_TIME_LIMIT = 5 * 60
# https://docs.celeryq.dev/en/stable/userguide/configuration.html#task-time-limit
# TODO: set to whatever value is adequate in your circumstances
CELERY_TASK_TIME_LIMIT = CELERY_TASK_SOFT_TIME_LIMIT + (1 * 60)
# https://docs.celeryq.dev/en/stable/userguide/configuration.html#beat-scheduler
CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler"
# https://docs.celeryq.dev/en/stable/userguide/configuration.html#worker-send-task-events
Expand Down
8 changes: 7 additions & 1 deletion config/settings/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,15 @@
INSTALLED_APPS += ["django_extensions"]
# Celery
# ------------------------------------------------------------------------------

# https://docs.celeryq.dev/en/stable/userguide/configuration.html#task-eager-propagates
CELERY_TASK_EAGER_PROPAGATES = True
# https://docs.celeryq.dev/en/stable/userguide/configuration.html#task-soft-time-limit
# TODO: set to whatever value is adequate in your circumstances
CELERY_TASK_SOFT_TIME_LIMIT = 60 * 60
# https://docs.celeryq.dev/en/stable/userguide/configuration.html#task-time-limit
# TODO: set to whatever value is adequate in your circumstances
CELERY_TASK_TIME_LIMIT = CELERY_TASK_SOFT_TIME_LIMIT + (5 * 60)

# Your stuff...
# ------------------------------------------------------------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import {
} from '../../apiClient/jobService';
import { VizContainerProps } from '../types';

// How long to wait between job status polls to the server
const POLL_INTERVAL = 3000;

export interface SpectrogramSettings {
fftSize: number;
stdDev: number;
Expand Down Expand Up @@ -172,7 +175,7 @@ const SpectrogramVizContainer = ({
message: error instanceof Error ? error.message : 'Unknown error',
}));
}
}, 2000);
}, POLL_INTERVAL);
}

return () => {
Expand Down
8 changes: 4 additions & 4 deletions jobs/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def update_job_status(job_id: int, status: str, token: str, info=None):
f"{settings.API_URL}/api/jobs/update-job-status/",
data=data,
headers=headers,
timeout=20,
timeout=60,
)

return response.status_code == requests.codes.created
Expand All @@ -56,7 +56,7 @@ def get_job_meta(job_id: int, token: str):
response = requests.get(
f"{settings.API_URL}/api/jobs/job-metadata/{job_id}/",
headers=headers,
timeout=10,
timeout=60,
)
if response.status_code != requests.codes.ok:
return None
Expand Down Expand Up @@ -118,7 +118,7 @@ def post_results(job_id, token: str, json_data=None, file_data=None, file_name=N
f"{settings.API_URL}/api/jobs/save-job-data/{job_id}/",
json={"json_data": json_data},
headers=headers,
timeout=10,
timeout=60,
)
if response.status_code != requests.codes.created:
fail = True
Expand All @@ -130,7 +130,7 @@ def post_results(job_id, token: str, json_data=None, file_data=None, file_name=N
f"{settings.API_URL}/api/jobs/save-job-data/{job_id}/",
files=files,
headers=headers,
timeout=10,
timeout=60,
)
if response.status_code != requests.codes.created:
fail = True
Expand Down
215 changes: 192 additions & 23 deletions jobs/tasks.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
import logging
import os
import shutil
from pathlib import Path

from celery import shared_task
from spectrumx import Client as SDSClient
from spectrumx.errors import Result
from spectrumx.models.captures import CaptureType
from spectrumx.models.files import File

from spectrumx_visualization_platform.users.models import User

from .io import get_job_file
from .io import get_job_meta
Expand All @@ -12,15 +20,14 @@

@shared_task
def submit_job(job_id: int, token: str, config: dict | None = None):
# the very first thing we should do is update the Job status to "running"
# The very first thing we should do is update the Job status to "running"
update_job_status(job_id, "running", token)

# the next thing we do is get the job information. This will tell us:
# The next thing we do is get the job information. This will tell us:
# 1. What type of visualization we should do
# 2. A list of files we'll need
job_data = get_job_meta(job_id, token)
logging.info(f"job_data: {job_data}")
if job_data is None:
job_metadata = get_job_meta(job_id, token)
if job_metadata is None:
error_msg = "Could not get job information."
update_job_status(
job_id,
Expand All @@ -30,12 +37,26 @@ def submit_job(job_id: int, token: str, config: dict | None = None):
)
raise ValueError(error_msg)

# Next, run through the local files and download them from the SVI main system.
# Create a directory for the job files
print(f"Job {job_id} is running with config: {config}")
# Create directories for job files and results
Path("jobs/job_files").mkdir(parents=True, exist_ok=True)
print("job data: " + str(job_data)) # debug added 44
for f in job_data["data"]["local_files"]:
Path("jobs/job_results").mkdir(parents=True, exist_ok=True)

# Handle SDS data downloading if needed
file_paths = []
if config and "capture_ids" in config:
try:
# Get user from token
user = User.objects.get(uuid=job_metadata["data"]["user_id"])
file_paths = download_sds_files(
job_id, token, config["capture_ids"], user, config["capture_type"]
)
except Exception as e:
error_msg = f"Error downloading SDS data: {e!s}"
update_job_status(job_id, "failed", token, info=error_msg)
raise ValueError(error_msg)

# Process local files
for f in job_metadata["data"]["local_files"]:
data = get_job_file(f["id"], token, "local")

if data is None:
Expand All @@ -48,21 +69,19 @@ def submit_job(job_id: int, token: str, config: dict | None = None):
)
raise ValueError(error_msg)

# .. store data in some way to access it later in the code,
# .. either in memory or locally to disk
with Path.open(f"jobs/job_files/{f['name']}", "wb") as new_file:
file_path = Path("jobs/job_files") / f["name"]
with file_path.open("wb") as new_file:
new_file.write(data)
file_paths.append(str(file_path))

Path("jobs/job_results").mkdir(parents=True, exist_ok=True)

if job_data["data"]["type"] == "spectrogram":
if job_metadata["data"]["type"] == "spectrogram":
try:
figure = make_spectrogram(
job_data,
job_metadata,
config,
files_dir="jobs/job_files/",
file_paths=file_paths,
)
figure.savefig("jobs/job_results/figure.png")
figure.savefig(f"jobs/job_results/{job_id}.png")
except Exception as e:
update_job_status(
job_id,
Expand All @@ -72,7 +91,7 @@ def submit_job(job_id: int, token: str, config: dict | None = None):
)
raise
else:
error_msg = f"Unknown job type: {job_data['data']['type']}"
error_msg = f"Unknown job type: {job_metadata['data']['type']}"
update_job_status(
job_id,
"failed",
Expand All @@ -84,8 +103,8 @@ def submit_job(job_id: int, token: str, config: dict | None = None):
# Let's say the code dumped to a local file and we want to upload that.
# We can do either that, or have an in-memory file. Either way,
# "results_file" will be our file contents (byte format)
with Path.open("jobs/job_results/figure.png", "rb") as results_file:
# post results -- we can make this call as many times as needed to get
with Path.open(f"jobs/job_results/{job_id}.png", "rb") as results_file:
# Post results -- we can make this call as many times as needed to get
# results to send to the main system.
# We can also mix JSON data and a file. It will save 2 records of
# "JobData", one for the JSON and one for the file.
Expand All @@ -103,7 +122,10 @@ def submit_job(job_id: int, token: str, config: dict | None = None):
update_job_status(job_id, "failed", token, info=error_msg)
raise ValueError(error_msg)

# update the job as complete
# Clean up job files
cleanup_job_files(job_id)

# Update the job as complete
info = {
"results_id": response["file_ids"]["figure.png"],
}
Expand All @@ -113,3 +135,150 @@ def submit_job(job_id: int, token: str, config: dict | None = None):
@shared_task
def error_handler(request, exc, _traceback):
update_job_status(request.job_id, "failed", request.token, info=str(exc))


def download_sds_files(
job_id: int,
token: str,
capture_ids: list[str],
user: User,
capture_type: str,
) -> list[str]:
"""Download files from SDS for the given capture IDs and return their paths.
Args:
job_id: The ID of the job
token: The authentication token
capture_ids: List containing a single capture ID to download
user: The user object
capture_type: Type of capture (e.g., "digital_rf", "sigmf")
Returns:
list[str]: List of paths to the downloaded files
Raises:
ValueError: If there are any errors during the download process
"""
sds_client = user.sds_client()
sds_captures = sds_client.captures.listing(capture_type=capture_type)

# Get the first capture ID
capture_id = capture_ids[0]
capture = next((c for c in sds_captures if str(c.uuid) == str(capture_id)), None)
if not capture:
error_msg = f"Capture ID {capture_id} not found in SDS"
update_job_status(job_id, "failed", token, info=error_msg)
raise ValueError(error_msg)

# Get the UUIDs of the files in the capture for comparison later
file_uuids = [file.uuid for file in capture.files]

# Create a directory for this capture
local_path = Path(
"jobs/job_files",
str(job_id),
capture_id,
)
local_path.mkdir(parents=True)

# Download files
file_results = safe_sds_client_download(
sds_client, capture.top_level_dir, local_path
)

downloaded_files = [result() for result in file_results if result]
download_errors = [result.error_info for result in file_results if not result]

if download_errors:
error_msg = f"Failed to download SDS files: {download_errors}"
update_job_status(job_id, "failed", token, info=error_msg)
raise ValueError(error_msg)

# Clean up unnecessary files and directories
matching_files = []
for f in downloaded_files:
if f.uuid in file_uuids:
matching_files.append(f)
else:
f.local_path.unlink()

file_paths = [str(f.local_path) for f in matching_files]
logging.info(
f"Files matching capture (expected): {len(file_paths)} ({len(file_uuids)})"
)
logging.info(f"Extra files removed: {len(downloaded_files) - len(matching_files)}")

if not file_paths:
error_msg = f"No matching files found for capture {capture_id}"
update_job_status(job_id, "failed", token, info=error_msg)
raise ValueError(error_msg)

if capture_type == CaptureType.DigitalRF:
# For DigitalRF, maintain the directory structure
common_path = os.path.commonpath(file_paths)
shutil.move(common_path, local_path)
sds_root = str(capture.files[0].directory).strip("/").split("/")[0]
sds_root_path = local_path / sds_root
if sds_root_path.exists():
shutil.rmtree(sds_root_path)
# Return all files in the directory structure
return [str(p) for p in local_path.glob("**/*")]
if capture_type == CaptureType.SigMF:
# For SigMF, move files to the root of the capture directory
for file_path in file_paths:
file_name = Path(file_path).name
shutil.move(file_path, local_path / file_name)
# Return all files in the capture directory
return [str(p) for p in local_path.glob("*")]

raise ValueError(f"Unsupported capture type: {capture_type}")


def safe_sds_client_download(
sds_client: SDSClient, from_sds_path: str, to_local_path: str
) -> list[Result[File]]:
try:
file_results = sds_client.download(
from_sds_path=from_sds_path,
to_local_path=to_local_path,
skip_contents=False,
overwrite=True,
verbose=True,
)
except StopIteration:
# Account for a bug in the SDS client
logging.warning("Caught StopIteration error--continuing.")
return file_results


def cleanup_job_files(job_id: int) -> None:
"""Clean up files and directories created for a specific job.
Args:
job_id: The ID of the job whose files should be cleaned up
Note:
This function only removes files and directories specific to the given job_id,
preserving the main job_files and job_results directories for other jobs.
"""
try:
# Remove job-specific directories and files
job_files_dir = Path("jobs/job_files")
job_results_dir = Path("jobs/job_results")

# Remove job-specific subdirectories in job_files
if job_files_dir.exists():
for item in job_files_dir.iterdir():
if item.is_dir() and item.name == str(job_id):
shutil.rmtree(item)
elif item.is_file():
# Remove individual files that were created for this job
item.unlink()

# Remove job-specific result file
result_file = job_results_dir / f"{job_id}.png"
if result_file.exists():
result_file.unlink()

except Exception as e:
logging.warning(f"Error cleaning up job files: {e}")
Loading