diff --git a/config/settings/base.py b/config/settings/base.py index acea024..1f59932 100644 --- a/config/settings/base.py +++ b/config/settings/base.py @@ -291,12 +291,12 @@ CELERY_TASK_SERIALIZER = "json" # https://docs.celeryq.dev/en/stable/userguide/configuration.html#std:setting-result_serializer CELERY_RESULT_SERIALIZER = "json" -# https://docs.celeryq.dev/en/stable/userguide/configuration.html#task-time-limit -# TODO: set to whatever value is adequate in your circumstances -CELERY_TASK_TIME_LIMIT = 5 * 60 # https://docs.celeryq.dev/en/stable/userguide/configuration.html#task-soft-time-limit # TODO: set to whatever value is adequate in your circumstances -CELERY_TASK_SOFT_TIME_LIMIT = 2 * 60 +CELERY_TASK_SOFT_TIME_LIMIT = 5 * 60 +# https://docs.celeryq.dev/en/stable/userguide/configuration.html#task-time-limit +# TODO: set to whatever value is adequate in your circumstances +CELERY_TASK_TIME_LIMIT = CELERY_TASK_SOFT_TIME_LIMIT + (1 * 60) # https://docs.celeryq.dev/en/stable/userguide/configuration.html#beat-scheduler CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler" # https://docs.celeryq.dev/en/stable/userguide/configuration.html#worker-send-task-events diff --git a/config/settings/local.py b/config/settings/local.py index 5a9e3b9..3839a1c 100644 --- a/config/settings/local.py +++ b/config/settings/local.py @@ -82,9 +82,15 @@ INSTALLED_APPS += ["django_extensions"] # Celery # ------------------------------------------------------------------------------ - # https://docs.celeryq.dev/en/stable/userguide/configuration.html#task-eager-propagates CELERY_TASK_EAGER_PROPAGATES = True +# https://docs.celeryq.dev/en/stable/userguide/configuration.html#task-soft-time-limit +# TODO: set to whatever value is adequate in your circumstances +CELERY_TASK_SOFT_TIME_LIMIT = 60 * 60 +# https://docs.celeryq.dev/en/stable/userguide/configuration.html#task-time-limit +# TODO: set to whatever value is adequate in your circumstances +CELERY_TASK_TIME_LIMIT = CELERY_TASK_SOFT_TIME_LIMIT + (5 * 60) + # Your stuff... # ------------------------------------------------------------------------------ diff --git a/frontend/src/components/spectrogram/SpectrogramVizContainer.tsx b/frontend/src/components/spectrogram/SpectrogramVizContainer.tsx index b056f0e..127a281 100644 --- a/frontend/src/components/spectrogram/SpectrogramVizContainer.tsx +++ b/frontend/src/components/spectrogram/SpectrogramVizContainer.tsx @@ -11,6 +11,9 @@ import { } from '../../apiClient/jobService'; import { VizContainerProps } from '../types'; +// How long to wait between job status polls to the server +const POLL_INTERVAL = 3000; + export interface SpectrogramSettings { fftSize: number; stdDev: number; @@ -172,7 +175,7 @@ const SpectrogramVizContainer = ({ message: error instanceof Error ? error.message : 'Unknown error', })); } - }, 2000); + }, POLL_INTERVAL); } return () => { diff --git a/jobs/io.py b/jobs/io.py index b3e6727..80527cf 100644 --- a/jobs/io.py +++ b/jobs/io.py @@ -33,7 +33,7 @@ def update_job_status(job_id: int, status: str, token: str, info=None): f"{settings.API_URL}/api/jobs/update-job-status/", data=data, headers=headers, - timeout=20, + timeout=60, ) return response.status_code == requests.codes.created @@ -56,7 +56,7 @@ def get_job_meta(job_id: int, token: str): response = requests.get( f"{settings.API_URL}/api/jobs/job-metadata/{job_id}/", headers=headers, - timeout=10, + timeout=60, ) if response.status_code != requests.codes.ok: return None @@ -118,7 +118,7 @@ def post_results(job_id, token: str, json_data=None, file_data=None, file_name=N f"{settings.API_URL}/api/jobs/save-job-data/{job_id}/", json={"json_data": json_data}, headers=headers, - timeout=10, + timeout=60, ) if response.status_code != requests.codes.created: fail = True @@ -130,7 +130,7 @@ def post_results(job_id, token: str, json_data=None, file_data=None, file_name=N f"{settings.API_URL}/api/jobs/save-job-data/{job_id}/", files=files, headers=headers, - timeout=10, + timeout=60, ) if response.status_code != requests.codes.created: fail = True diff --git a/jobs/tasks.py b/jobs/tasks.py index 5604016..47f5221 100644 --- a/jobs/tasks.py +++ b/jobs/tasks.py @@ -1,7 +1,15 @@ import logging +import os +import shutil from pathlib import Path from celery import shared_task +from spectrumx import Client as SDSClient +from spectrumx.errors import Result +from spectrumx.models.captures import CaptureType +from spectrumx.models.files import File + +from spectrumx_visualization_platform.users.models import User from .io import get_job_file from .io import get_job_meta @@ -12,15 +20,14 @@ @shared_task def submit_job(job_id: int, token: str, config: dict | None = None): - # the very first thing we should do is update the Job status to "running" + # The very first thing we should do is update the Job status to "running" update_job_status(job_id, "running", token) - # the next thing we do is get the job information. This will tell us: + # The next thing we do is get the job information. This will tell us: # 1. What type of visualization we should do # 2. A list of files we'll need - job_data = get_job_meta(job_id, token) - logging.info(f"job_data: {job_data}") - if job_data is None: + job_metadata = get_job_meta(job_id, token) + if job_metadata is None: error_msg = "Could not get job information." update_job_status( job_id, @@ -30,12 +37,26 @@ def submit_job(job_id: int, token: str, config: dict | None = None): ) raise ValueError(error_msg) - # Next, run through the local files and download them from the SVI main system. - # Create a directory for the job files - print(f"Job {job_id} is running with config: {config}") + # Create directories for job files and results Path("jobs/job_files").mkdir(parents=True, exist_ok=True) - print("job data: " + str(job_data)) # debug added 44 - for f in job_data["data"]["local_files"]: + Path("jobs/job_results").mkdir(parents=True, exist_ok=True) + + # Handle SDS data downloading if needed + file_paths = [] + if config and "capture_ids" in config: + try: + # Get user from token + user = User.objects.get(uuid=job_metadata["data"]["user_id"]) + file_paths = download_sds_files( + job_id, token, config["capture_ids"], user, config["capture_type"] + ) + except Exception as e: + error_msg = f"Error downloading SDS data: {e!s}" + update_job_status(job_id, "failed", token, info=error_msg) + raise ValueError(error_msg) + + # Process local files + for f in job_metadata["data"]["local_files"]: data = get_job_file(f["id"], token, "local") if data is None: @@ -48,21 +69,19 @@ def submit_job(job_id: int, token: str, config: dict | None = None): ) raise ValueError(error_msg) - # .. store data in some way to access it later in the code, - # .. either in memory or locally to disk - with Path.open(f"jobs/job_files/{f['name']}", "wb") as new_file: + file_path = Path("jobs/job_files") / f["name"] + with file_path.open("wb") as new_file: new_file.write(data) + file_paths.append(str(file_path)) - Path("jobs/job_results").mkdir(parents=True, exist_ok=True) - - if job_data["data"]["type"] == "spectrogram": + if job_metadata["data"]["type"] == "spectrogram": try: figure = make_spectrogram( - job_data, + job_metadata, config, - files_dir="jobs/job_files/", + file_paths=file_paths, ) - figure.savefig("jobs/job_results/figure.png") + figure.savefig(f"jobs/job_results/{job_id}.png") except Exception as e: update_job_status( job_id, @@ -72,7 +91,7 @@ def submit_job(job_id: int, token: str, config: dict | None = None): ) raise else: - error_msg = f"Unknown job type: {job_data['data']['type']}" + error_msg = f"Unknown job type: {job_metadata['data']['type']}" update_job_status( job_id, "failed", @@ -84,8 +103,8 @@ def submit_job(job_id: int, token: str, config: dict | None = None): # Let's say the code dumped to a local file and we want to upload that. # We can do either that, or have an in-memory file. Either way, # "results_file" will be our file contents (byte format) - with Path.open("jobs/job_results/figure.png", "rb") as results_file: - # post results -- we can make this call as many times as needed to get + with Path.open(f"jobs/job_results/{job_id}.png", "rb") as results_file: + # Post results -- we can make this call as many times as needed to get # results to send to the main system. # We can also mix JSON data and a file. It will save 2 records of # "JobData", one for the JSON and one for the file. @@ -103,7 +122,10 @@ def submit_job(job_id: int, token: str, config: dict | None = None): update_job_status(job_id, "failed", token, info=error_msg) raise ValueError(error_msg) - # update the job as complete + # Clean up job files + cleanup_job_files(job_id) + + # Update the job as complete info = { "results_id": response["file_ids"]["figure.png"], } @@ -113,3 +135,150 @@ def submit_job(job_id: int, token: str, config: dict | None = None): @shared_task def error_handler(request, exc, _traceback): update_job_status(request.job_id, "failed", request.token, info=str(exc)) + + +def download_sds_files( + job_id: int, + token: str, + capture_ids: list[str], + user: User, + capture_type: str, +) -> list[str]: + """Download files from SDS for the given capture IDs and return their paths. + + Args: + job_id: The ID of the job + token: The authentication token + capture_ids: List containing a single capture ID to download + user: The user object + capture_type: Type of capture (e.g., "digital_rf", "sigmf") + + Returns: + list[str]: List of paths to the downloaded files + + Raises: + ValueError: If there are any errors during the download process + """ + sds_client = user.sds_client() + sds_captures = sds_client.captures.listing(capture_type=capture_type) + + # Get the first capture ID + capture_id = capture_ids[0] + capture = next((c for c in sds_captures if str(c.uuid) == str(capture_id)), None) + if not capture: + error_msg = f"Capture ID {capture_id} not found in SDS" + update_job_status(job_id, "failed", token, info=error_msg) + raise ValueError(error_msg) + + # Get the UUIDs of the files in the capture for comparison later + file_uuids = [file.uuid for file in capture.files] + + # Create a directory for this capture + local_path = Path( + "jobs/job_files", + str(job_id), + capture_id, + ) + local_path.mkdir(parents=True) + + # Download files + file_results = safe_sds_client_download( + sds_client, capture.top_level_dir, local_path + ) + + downloaded_files = [result() for result in file_results if result] + download_errors = [result.error_info for result in file_results if not result] + + if download_errors: + error_msg = f"Failed to download SDS files: {download_errors}" + update_job_status(job_id, "failed", token, info=error_msg) + raise ValueError(error_msg) + + # Clean up unnecessary files and directories + matching_files = [] + for f in downloaded_files: + if f.uuid in file_uuids: + matching_files.append(f) + else: + f.local_path.unlink() + + file_paths = [str(f.local_path) for f in matching_files] + logging.info( + f"Files matching capture (expected): {len(file_paths)} ({len(file_uuids)})" + ) + logging.info(f"Extra files removed: {len(downloaded_files) - len(matching_files)}") + + if not file_paths: + error_msg = f"No matching files found for capture {capture_id}" + update_job_status(job_id, "failed", token, info=error_msg) + raise ValueError(error_msg) + + if capture_type == CaptureType.DigitalRF: + # For DigitalRF, maintain the directory structure + common_path = os.path.commonpath(file_paths) + shutil.move(common_path, local_path) + sds_root = str(capture.files[0].directory).strip("/").split("/")[0] + sds_root_path = local_path / sds_root + if sds_root_path.exists(): + shutil.rmtree(sds_root_path) + # Return all files in the directory structure + return [str(p) for p in local_path.glob("**/*")] + if capture_type == CaptureType.SigMF: + # For SigMF, move files to the root of the capture directory + for file_path in file_paths: + file_name = Path(file_path).name + shutil.move(file_path, local_path / file_name) + # Return all files in the capture directory + return [str(p) for p in local_path.glob("*")] + + raise ValueError(f"Unsupported capture type: {capture_type}") + + +def safe_sds_client_download( + sds_client: SDSClient, from_sds_path: str, to_local_path: str +) -> list[Result[File]]: + try: + file_results = sds_client.download( + from_sds_path=from_sds_path, + to_local_path=to_local_path, + skip_contents=False, + overwrite=True, + verbose=True, + ) + except StopIteration: + # Account for a bug in the SDS client + logging.warning("Caught StopIteration error--continuing.") + return file_results + + +def cleanup_job_files(job_id: int) -> None: + """Clean up files and directories created for a specific job. + + Args: + job_id: The ID of the job whose files should be cleaned up + + Note: + This function only removes files and directories specific to the given job_id, + preserving the main job_files and job_results directories for other jobs. + """ + try: + # Remove job-specific directories and files + job_files_dir = Path("jobs/job_files") + job_results_dir = Path("jobs/job_results") + + # Remove job-specific subdirectories in job_files + if job_files_dir.exists(): + for item in job_files_dir.iterdir(): + if item.is_dir() and item.name == str(job_id): + shutil.rmtree(item) + elif item.is_file(): + # Remove individual files that were created for this job + item.unlink() + + # Remove job-specific result file + result_file = job_results_dir / f"{job_id}.png" + if result_file.exists(): + result_file.unlink() + + except Exception as e: + logging.warning(f"Error cleaning up job files: {e}") diff --git a/jobs/views.py b/jobs/views.py index 9a4586e..0947b8b 100644 --- a/jobs/views.py +++ b/jobs/views.py @@ -99,16 +99,20 @@ class LocalFile(TypedDict): class JobMetadata(TypedDict): + user_id: str type: str status: Literal["submitted", "running", "completed", "failed"] created_at: datetime updated_at: datetime local_files: list[LocalFile] remote_files: list[str] + config: dict + results_id: str | None class JobMetadataResponse(TypedDict): status: Literal["success", "error"] + job_id: int data: JobMetadata | None message: str | None @@ -159,13 +163,16 @@ def get_job_metadata(request: Request, job_id: int) -> JobMetadataResponse: return JsonResponse( { "status": "success", + "job_id": job_id, "data": { + "user_id": job.owner.uuid, "type": job.type, "status": job.status, "created_at": job.created_at, "updated_at": job.updated_at, "local_files": local_files, "remote_files": remote_files, + "config": job.config, "results_id": results_id, }, }, diff --git a/jobs/visualizations/spectrogram.py b/jobs/visualizations/spectrogram.py index 1a7dfa5..5d4a81c 100644 --- a/jobs/visualizations/spectrogram.py +++ b/jobs/visualizations/spectrogram.py @@ -1,9 +1,6 @@ -# import tarfile import argparse import json import logging -import tarfile -import tempfile from dataclasses import dataclass from pathlib import Path from typing import Any @@ -37,14 +34,15 @@ class SpectrogramData: def make_spectrogram( - job_data: dict[str, Any], config: dict[str, Any], files_dir: str = "" + job_metadata: dict[str, Any], config: dict[str, Any], file_paths: list[str] ) -> plt.Figure: """Generate a spectrogram from either SigMF or DigitalRF data. Args: - job_data: Dictionary containing job configuration and file information - config: Dictionary containing job configuration - files_dir: Directory containing the input files + job_metadata: Dictionary containing job configuration and file information + config: Dictionary containing job configuration. Must contain 'capture_type' and + 'capture_ids' (list with single capture ID) + file_paths: List of file paths to search through for data files Returns: matplotlib.figure.Figure: The generated spectrogram figure @@ -58,9 +56,9 @@ def make_spectrogram( # Load data based on capture type if capture_type == CaptureType.SigMF: - spectrogram_data = _load_sigmf_data(job_data, files_dir, config) + spectrogram_data = _load_sigmf_data(file_paths) elif capture_type == CaptureType.DigitalRF: - spectrogram_data = _load_digital_rf_data(job_data, files_dir, config) + spectrogram_data = _load_digital_rf_data(file_paths, config) else: raise ValueError(f"Unsupported capture type: {capture_type}") @@ -68,14 +66,13 @@ def make_spectrogram( return _generate_spectrogram(spectrogram_data, config) -def _load_sigmf_data( - job_data: dict[str, Any], files_dir: str, config: dict[str, Any] -) -> SpectrogramData: +def _load_sigmf_data(file_paths: list[str]) -> SpectrogramData: """Load data from SigMF format. Args: - job_data: Dictionary containing job configuration and file information - files_dir: Directory containing the input files + job_metadata: Dictionary containing job configuration and file information + file_paths: List of file paths to search through + config: Dictionary containing job configuration Returns: SpectrogramData: Container with loaded data and metadata @@ -83,27 +80,23 @@ def _load_sigmf_data( Raises: ValueError: If required files are not found """ - # Get the data and metadata files - data_file = None - metadata_file = None - - for f in job_data["data"]["local_files"]: - if f["name"].endswith(".sigmf-data"): - data_file = f - elif f["name"].endswith(".sigmf-meta"): - metadata_file = f + # Find SigMF files in the provided paths + data_file = next((Path(p) for p in file_paths if p.endswith(".sigmf-data")), None) + metadata_file = next( + (Path(p) for p in file_paths if p.endswith(".sigmf-meta")), None + ) if not data_file or not metadata_file: - msg = "Data or metadata file not found in job data" + msg = "Could not find SigMF data or metadata files" raise ValueError(msg) # Get sample rate from metadata file - with Path.open(f"{files_dir}{metadata_file['name']}") as f: + with metadata_file.open() as f: metadata = json.load(f) sample_rate = metadata["global"]["core:sample_rate"] # Load data array - data_array = np.fromfile(f"{files_dir}{data_file['name']}", dtype=np.complex64) + data_array = np.fromfile(data_file, dtype=np.complex64) sample_count = len(data_array) return SpectrogramData( @@ -112,13 +105,14 @@ def _load_sigmf_data( def _load_digital_rf_data( - job_data: dict[str, Any], files_dir: str, config: dict[str, Any] + file_paths: list[str], config: dict[str, Any] ) -> SpectrogramData: """Load data from DigitalRF format. Args: - job_data: Dictionary containing job configuration and file information - files_dir: Directory containing the input files + job_metadata: Dictionary containing job configuration and file information + file_paths: List of file paths to search through + config: Dictionary containing job configuration Returns: SpectrogramData: Container with loaded data and metadata @@ -126,62 +120,50 @@ def _load_digital_rf_data( Raises: ValueError: If required files are not found """ - # Find the tar.gz file - tar_file = None - for f in job_data["data"]["local_files"]: - if f["name"].endswith(".tar.gz"): - tar_file = f"{files_dir}{f['name']}" - break - - if not tar_file: - msg = "tar.gz file not found in job data" + # Find the DigitalRF directory by looking for drf_properties.h5 + drf_props = next( + (Path(p) for p in file_paths if p.endswith("drf_properties.h5")), None + ) + if not drf_props: + msg = "Could not find DigitalRF properties file" raise ValueError(msg) - # Create a temporary directory to extract the tar.gz file - with tempfile.TemporaryDirectory() as temp_dir: - try: - # Extract the tar.gz file - with tarfile.open(tar_file, "r:gz") as tf: - for member in tf.getmembers(): - if member.name.startswith(("/", "..")): - continue - tf.extract(member, temp_dir) - - # Initialize DigitalRF reader - reader = DigitalRFReader(temp_dir) - channels = reader.get_channels() - - if not channels: - msg = "No channels found in DigitalRF data" - _raise_error(msg) - - # Use the first channel - channel = channels[0] - subchannel = config.get("subchannel", 0) - start_sample, end_sample = reader.get_bounds(channel) - - # Get sample rate from metadata - with h5py.File(f"{temp_dir}/{channel}/drf_properties.h5", "r") as f: - sample_rate = ( - f.attrs["sample_rate_numerator"] - / f.attrs["sample_rate_denominator"] - ) - - num_samples = end_sample - start_sample - data_array = reader.read_vector( - start_sample, num_samples, channel, subchannel - ) + # The DigitalRF directory is the parent of the channel directory + drf_dir = drf_props.parent.parent + channel = drf_props.parent.name + + try: + # Initialize DigitalRF reader + reader = DigitalRFReader(str(drf_dir)) + channels = reader.get_channels() + + if not channels: + msg = "No channels found in DigitalRF data" + _raise_error(msg) - return SpectrogramData( - data_array=data_array, - sample_rate=sample_rate, - sample_count=num_samples, - channel_name=channel, + # Use the specified channel + subchannel = config.get("subchannel", 0) + start_sample, end_sample = reader.get_bounds(channel) + + # Get sample rate from metadata + with h5py.File(drf_props, "r") as f: + sample_rate = ( + f.attrs["sample_rate_numerator"] / f.attrs["sample_rate_denominator"] ) - except Exception as e: - logging.error(f"Error processing DigitalRF data: {e}") - raise + num_samples = end_sample - start_sample + data_array = reader.read_vector(start_sample, num_samples, channel, subchannel) + + return SpectrogramData( + data_array=data_array, + sample_rate=sample_rate, + sample_count=num_samples, + channel_name=channel, + ) + + except Exception as e: + logging.error(f"Error processing DigitalRF data: {e}") + raise def _generate_spectrogram( diff --git a/spectrumx_visualization_platform/spx_vis/api/serializers.py b/spectrumx_visualization_platform/spx_vis/api/serializers.py index 40eb750..30409c2 100644 --- a/spectrumx_visualization_platform/spx_vis/api/serializers.py +++ b/spectrumx_visualization_platform/spx_vis/api/serializers.py @@ -291,7 +291,7 @@ def get_captures(self, obj: Visualization) -> list[dict]: if obj.capture_source == CaptureSource.SDS: try: - sds_captures, sds_errors = get_sds_captures(request) + sds_captures, sds_errors = get_sds_captures(request.user) self._handle_sds_errors(sds_errors) captures = [ capture diff --git a/spectrumx_visualization_platform/spx_vis/api/views.py b/spectrumx_visualization_platform/spx_vis/api/views.py index 9b60af2..479f1db 100644 --- a/spectrumx_visualization_platform/spx_vis/api/views.py +++ b/spectrumx_visualization_platform/spx_vis/api/views.py @@ -1,15 +1,13 @@ import io import json import logging -import os -import shutil import zipfile from datetime import UTC from datetime import datetime -from pathlib import Path from typing import TYPE_CHECKING import requests +import spectrumx from django.conf import settings from django.http import FileResponse from rest_framework import filters @@ -23,6 +21,7 @@ from rest_framework.request import Request from rest_framework.response import Response +from jobs.submission import request_job_submission from spectrumx_visualization_platform.spx_vis.api.serializers import CaptureSerializer from spectrumx_visualization_platform.spx_vis.api.serializers import FileSerializer from spectrumx_visualization_platform.spx_vis.api.serializers import ( @@ -33,13 +32,9 @@ ) from spectrumx_visualization_platform.spx_vis.api.utils import datetime_check from spectrumx_visualization_platform.spx_vis.api.utils import filter_capture -from spectrumx_visualization_platform.spx_vis.capture_utils.digital_rf import ( - DigitalRFUtility, -) from spectrumx_visualization_platform.spx_vis.capture_utils.radiohound import ( RadioHoundUtility, ) -from spectrumx_visualization_platform.spx_vis.capture_utils.sigmf import SigMFUtility from spectrumx_visualization_platform.spx_vis.models import Capture from spectrumx_visualization_platform.spx_vis.models import CaptureType from spectrumx_visualization_platform.spx_vis.models import File @@ -50,6 +45,9 @@ if TYPE_CHECKING: from spectrumx_visualization_platform.users.models import User +if settings.DEBUG: + spectrumx.enable_logging() + @api_view(["GET"]) def capture_list(request: Request) -> Response: @@ -65,7 +63,7 @@ def capture_list(request: Request) -> Response: error_messages = [] if not source_filter or "sds" in source_filter: - sds_captures, sds_error = get_sds_captures(request) + sds_captures, sds_error = get_sds_captures(request.user) if sds_error: error_messages.extend(sds_error) @@ -315,11 +313,7 @@ def create_spectrogram(self, request: Request, uuid=None) -> Response: }, ) - if visualization.capture_type == CaptureType.SigMF: - capture_utility = SigMFUtility - elif visualization.capture_type == CaptureType.DigitalRF: - capture_utility = DigitalRFUtility - else: + if visualization.capture_type not in [CaptureType.SigMF, CaptureType.DigitalRF]: return Response( {"status": "error", "message": "Unsupported capture type"}, status=status.HTTP_400_BAD_REQUEST, @@ -351,76 +345,27 @@ def create_spectrogram(self, request: Request, uuid=None) -> Response: }, status=status.HTTP_400_BAD_REQUEST, ) - file_uuids = [file.uuid for file in capture.files] - - # Download to media root - user_path = Path( - settings.MEDIA_ROOT, - "sds", - str(user.uuid), - ) - local_path = Path( - user_path, - str(datetime.now(UTC).timestamp()), - ) - file_results = sds_client.download( - from_sds_path=capture.top_level_dir, - to_local_path=local_path, - skip_contents=False, - overwrite=True, - verbose=True, - ) - downloaded_files = [result() for result in file_results if result] - download_errors = [ - result.error_info for result in file_results if not result - ] - - if download_errors: - return Response( - { - "status": "error", - "message": f"Failed to download SDS files: {download_errors}", - }, - status=status.HTTP_500_INTERNAL_SERVER_ERROR, - ) - - matching_files = [] - for f in downloaded_files: - if f.uuid in file_uuids: - matching_files.append(f) - else: - f.local_path.unlink() - file_paths = [str(f.local_path) for f in matching_files] - logging.info( - f"Files matching capture (expected): {len(file_paths)} ({len(file_uuids)})" + # Create job with metadata + final_config = { + "width": width, + "height": height, + "capture_type": visualization.capture_type, + "capture_ids": visualization.capture_ids, + } + if config: + final_config.update(config) + + job = request_job_submission( + visualization_type="spectrogram", + owner=user, + local_files=[], + config=final_config, ) - logging.info( - f"Files removed: {len(downloaded_files) - len(matching_files)}" + return Response( + {"job_id": job.id, "status": "submitted"}, + status=status.HTTP_201_CREATED, ) - common_path = os.path.commonpath(file_paths) - - # Move commonpath directory to local_path and delete the remaining empty directories - shutil.move(common_path, local_path) - sds_root = str(capture.files[0].directory).strip("/").split("/")[0] - sds_root_path = local_path / sds_root - shutil.rmtree(sds_root_path) - new_file_paths = [ - str(path) for path in Path(local_path).glob("**/*") if path.is_file() - ] - - try: - # Pass the downloaded file paths to the utility - job = capture_utility.submit_spectrogram_job( - user, new_file_paths, width, height, config - ) - return Response( - {"job_id": job.id, "status": "submitted"}, - status=status.HTTP_201_CREATED, - ) - finally: - # Clean up the temporary files - shutil.rmtree(user_path) except Exception as e: return Response( {"status": "error", "message": str(e)}, @@ -452,7 +397,7 @@ def _process_sds_file( response = requests.get( f"https://{settings.SDS_CLIENT_URL}/api/latest/assets/files/{file_uuid}/download", headers={"Authorization": f"Api-Key: {token}"}, - timeout=10, + timeout=60, stream=True, ) response.raise_for_status() @@ -497,7 +442,7 @@ def _handle_sds_captures( ValueError: If any capture processing fails """ logging.info("Getting SDS captures") - sds_captures, sds_errors = get_sds_captures(request) + sds_captures, sds_errors = get_sds_captures(request.user) if sds_errors: raise ValueError(f"Error getting SDS captures: {sds_errors}") logging.info(f"Found {len(sds_captures)} SDS captures") diff --git a/spectrumx_visualization_platform/spx_vis/capture_utils/base.py b/spectrumx_visualization_platform/spx_vis/capture_utils/base.py index 3794ccf..d6b7ded 100644 --- a/spectrumx_visualization_platform/spx_vis/capture_utils/base.py +++ b/spectrumx_visualization_platform/spx_vis/capture_utils/base.py @@ -4,8 +4,6 @@ from django.core.files.uploadedfile import UploadedFile -from spectrumx_visualization_platform.users.models import User - class CaptureUtility(ABC): """Abstract base class for capture type utilities. @@ -51,28 +49,6 @@ def get_capture_name(files: list[UploadedFile], name: str | None) -> str: str: The inferred capture name """ - @staticmethod - @abstractmethod - def submit_spectrogram_job( - user: User, - capture_files: list[str], - width: int = 10, - height: int = 10, - config: dict | None = None, - ): - """Submit a job to the backend for spectrogram generation. - - Args: - user: The user submitting the job - capture_files: The capture files to use for the job - width: The width of the spectrogram - height: The height of the spectrogram - config: The configuration for the spectrogram job - - Returns: - The submitted job - """ - @staticmethod @abstractmethod def to_waterfall_file(file: UploadedFile) -> dict: diff --git a/spectrumx_visualization_platform/spx_vis/capture_utils/digital_rf.py b/spectrumx_visualization_platform/spx_vis/capture_utils/digital_rf.py index f1b76c4..3c988c3 100644 --- a/spectrumx_visualization_platform/spx_vis/capture_utils/digital_rf.py +++ b/spectrumx_visualization_platform/spx_vis/capture_utils/digital_rf.py @@ -1,21 +1,12 @@ import logging import mimetypes -import os import re -import shutil -import tarfile -import tempfile import zipfile from datetime import UTC from datetime import datetime -from pathlib import Path -from django.conf import settings from django.core.files.uploadedfile import UploadedFile -from jobs.submission import request_job_submission -from spectrumx_visualization_platform.spx_vis.models import CaptureType - from .base import CaptureUtility logger = logging.getLogger(__name__) @@ -124,71 +115,3 @@ def get_capture_name(files: list[UploadedFile], name: str | None) -> str: # Use the file name (without extension) as the capture name return ".".join(files[0].name.split(".")[:-1]) - - @staticmethod - def submit_spectrogram_job( - user, - capture_files, - width=10, - height=10, - config=None, - ): - """Get the Digital RF data and metadata files needed for spectrogram generation. - - Args: - capture_files: List of file paths that make up a Digital RF channel directory structure - width: Width of the spectrogram in inches - height: Height of the spectrogram in inches - config: The configuration for the spectrogram job - - Returns: - Job: The submitted job - - Raises: - ValueError: If the required Digital RF files are not found - """ - # Find the metadata file and data directories - meta_file = None - - for f in capture_files: - if f.endswith("drf_properties.h5"): - meta_file = f - - if not meta_file: - error_message = "Required Digital RF metadata file not found" - logger.error(error_message) - raise ValueError(error_message) - - # Create a temporary directory for archive creation - with tempfile.TemporaryDirectory() as temp_dir: - # Create the tar file in the temporary directory - timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S") - parent_dir = os.path.commonpath(capture_files) - archive_filename = f"{parent_dir.split('/')[-1][:30]}_{timestamp}.tar.gz" - temp_archive_path = Path(temp_dir) / archive_filename - - # Create the tar archive in the temporary directory - logger.info(f"Creating tar archive in temp directory: {temp_archive_path}") - with tarfile.open(temp_archive_path, "w:gz") as tf: - tf.add(parent_dir, arcname=parent_dir.split("/")[-1]) - - # Move the archive file to its final location - final_archive_path = Path(settings.MEDIA_ROOT) - logger.info(f"Moving tar archive to final location: {final_archive_path}") - shutil.move(str(temp_archive_path), str(final_archive_path)) - - final_config = { - "width": width, - "height": height, - "capture_type": CaptureType.DigitalRF, - } - if config: - final_config.update(config) - - # Submit the job with the archive file - return request_job_submission( - visualization_type="spectrogram", - owner=user, - local_files=[archive_filename], - config=final_config, - ) diff --git a/spectrumx_visualization_platform/spx_vis/capture_utils/sigmf.py b/spectrumx_visualization_platform/spx_vis/capture_utils/sigmf.py index a0d7126..ebec000 100644 --- a/spectrumx_visualization_platform/spx_vis/capture_utils/sigmf.py +++ b/spectrumx_visualization_platform/spx_vis/capture_utils/sigmf.py @@ -5,8 +5,6 @@ from django.core.files.uploadedfile import UploadedFile -from jobs.submission import request_job_submission - from .base import CaptureUtility logger = logging.getLogger(__name__) @@ -87,38 +85,3 @@ def get_capture_name(files: list[UploadedFile], name: str | None) -> str: raise ValueError(error_message) return ".".join(meta_file.name.split(".")[:-1]) - - @staticmethod - def submit_spectrogram_job(user, capture_files, width=10, height=10, config=None): - """Get the SigMF data and metadata files needed for spectrogram generation. - - Args: - capture_files: List of file paths - width: Width of the spectrogram in inches - height: Height of the spectrogram in inches - config: The configuration for the spectrogram job - - Returns: - Job: The submitted job - - Raises: - ValueError: If the required SigMF files are not found - """ - data_file = next((f for f in capture_files if f.endswith(".sigmf-data")), None) - meta_file = next((f for f in capture_files if f.endswith(".sigmf-meta")), None) - - if not data_file or not meta_file: - error_message = "Required SigMF files (data and/or metadata) not found" - logger.error(error_message) - raise ValueError(error_message) - - final_config = {"width": width, "height": height} - if config: - final_config.update(config) - - return request_job_submission( - visualization_type="spectrogram", - owner=user, - local_files=[data_file, meta_file], - config=final_config, - ) diff --git a/spectrumx_visualization_platform/spx_vis/source_utils/sds.py b/spectrumx_visualization_platform/spx_vis/source_utils/sds.py index b6e071e..ac7d48f 100644 --- a/spectrumx_visualization_platform/spx_vis/source_utils/sds.py +++ b/spectrumx_visualization_platform/spx_vis/source_utils/sds.py @@ -2,25 +2,22 @@ from datetime import UTC from datetime import datetime -from rest_framework.request import Request - from spectrumx_visualization_platform.spx_vis.api.utils import calculate_end_time from spectrumx_visualization_platform.spx_vis.models import CaptureType from spectrumx_visualization_platform.users.models import User -def get_sds_captures(request: Request) -> tuple[list[dict], list[str]]: +def get_sds_captures(user: User) -> tuple[list[dict], list[str]]: """Get SDS captures for the current user. Args: - request: The HTTP request containing user information + user: The user object Returns: tuple: A tuple containing: - List of successfully formatted captures - List of error messages if any error occurred """ - user: User = request.user formatted_captures = [] error_messages = [] @@ -32,9 +29,9 @@ def get_sds_captures(request: Request) -> tuple[list[dict], list[str]]: for capture in captures: try: if capture["capture_type"] == CaptureType.RadioHound: - formatted_capture = format_sds_rh_capture(capture, request.user.id) + formatted_capture = format_sds_rh_capture(capture, user.id) elif capture["capture_type"] == CaptureType.DigitalRF: - formatted_capture = format_sds_drf_capture(capture, request.user.id) + formatted_capture = format_sds_drf_capture(capture, user.id) formatted_captures.append(formatted_capture) except Exception as e: logging.exception( diff --git a/spectrumx_visualization_platform/spx_vis/tests/test_drf_serializers.py b/spectrumx_visualization_platform/spx_vis/tests/test_drf_serializers.py index 4a14501..6881a12 100644 --- a/spectrumx_visualization_platform/spx_vis/tests/test_drf_serializers.py +++ b/spectrumx_visualization_platform/spx_vis/tests/test_drf_serializers.py @@ -187,9 +187,7 @@ def test_get_captures_returns_serialized_sds_captures( assert result[0]["uuid"] == fake_uuid_2 assert result[0]["name"] == "Test SDS Capture 2" - request = api_request_factory - - mock_get_sds_captures.assert_called_once_with(request) + mock_get_sds_captures.assert_called_once_with(user) def test_validate_capture_ids_loop(self, api_request_factory: APIRequestFactory): test_cases = [ diff --git a/spectrumx_visualization_platform/users/models.py b/spectrumx_visualization_platform/users/models.py index aabec27..31eed2a 100644 --- a/spectrumx_visualization_platform/users/models.py +++ b/spectrumx_visualization_platform/users/models.py @@ -77,7 +77,7 @@ def fetch_sds_token(self) -> str | None: headers={ "Authorization": f"Token {settings.SVI_SERVER_API_KEY}", }, - timeout=10, + timeout=60, ) response.raise_for_status()