diff --git a/nominal/_utils.py b/nominal/_utils.py index 58b5f31f..7743ac92 100644 --- a/nominal/_utils.py +++ b/nominal/_utils.py @@ -41,13 +41,14 @@ def from_path_dataset(cls, path: Path | str) -> FileType: class FileTypes: + BINARY: FileType = FileType("", "application/octet-stream") CSV: FileType = FileType(".csv", "text/csv") CSV_GZ: FileType = FileType(".csv.gz", "text/csv") - # https://issues.apache.org/jira/browse/PARQUET-1889 - PARQUET: FileType = FileType(".parquet", "application/vnd.apache.parquet") + JSON: FileType = FileType(".json", "application/json") MP4: FileType = FileType(".mp4", "video/mp4") - BINARY: FileType = FileType("", "application/octet-stream") MCAP: FileType = FileType(".mcap", "application/octet-stream") + # https://issues.apache.org/jira/browse/PARQUET-1889 + PARQUET: FileType = FileType(".parquet", "application/vnd.apache.parquet") @contextmanager diff --git a/nominal/core/_multipart.py b/nominal/core/_multipart.py index a5768677..6b0177f1 100644 --- a/nominal/core/_multipart.py +++ b/nominal/core/_multipart.py @@ -2,6 +2,8 @@ import concurrent.futures import logging +import pathlib +import urllib.parse from functools import partial from queue import Queue from typing import BinaryIO, Iterable @@ -9,10 +11,14 @@ import requests from nominal._api.scout_service_api import ingest_api, upload_api +from nominal._utils import FileType from nominal.exceptions import NominalMultipartUploadFailed logger = logging.getLogger(__name__) +DEFAULT_CHUNK_SIZE = 64_000_000 +DEFAULT_NUM_WORKERS = 8 + def _sign_and_upload_part_job( upload_client: upload_api.UploadService, @@ -54,15 +60,27 @@ def put_multipart_upload( filename: str, mimetype: str, upload_client: upload_api.UploadService, - chunk_size: int = 64_000_000, - max_workers: int = 8, + chunk_size: int = DEFAULT_CHUNK_SIZE, + max_workers: int = DEFAULT_NUM_WORKERS, ) -> str: """Execute a multipart upload to S3. All metadata-style requests (init, sign, complete) proxy through Nominal servers, while the upload PUT requests for each part go to a pre-signed URL to the storage provider. - Ref: https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html + Args: + auth_header: Nominal authorization token + f: Binary IO to upload + filename: URL-safe filename to use when uploading to S3 + mimetype: Type of data contained within binary stream + upload_client: Conjure upload client + chunk_size: Maximum size of chunk to upload to S3 at once + max_workers: Number of worker threads to use when processing and uploading data + + Returns: Path to the uploaded object in S3 + + See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html + """ # muiltithreaded multipart upload: # - create a worker thread pool and a queue for all threads to share @@ -118,6 +136,84 @@ def put_multipart_upload( raise e +def upload_multipart_io( + auth_header: str, + f: BinaryIO, + name: str, + file_type: FileType, + upload_client: upload_api.UploadService, + chunk_size: int = DEFAULT_CHUNK_SIZE, + max_workers: int = DEFAULT_NUM_WORKERS, +) -> str: + """Execute a multipart upload to S3 proxied via Nominal servers + + Args: + auth_header: Nominal authorization token + f: Binary IO to upload + name: Name of the file to create in S3 + NOTE: does not need to be URL Safe + file_type: Type of data being uploaded + upload_client: Conjure upload client + chunk_size: Maximum size of chunk to upload to S3 at once + max_workers: Number of worker threads to use when processing and uploading data + + Returns: Path to the uploaded object in S3 + + Note: see put_multipart_upload for more details + + """ + urlsafe_name = urllib.parse.quote_plus(name) + safe_filename = f"{urlsafe_name}{file_type.extension}" + return put_multipart_upload( + auth_header, + f, + safe_filename, + file_type.mimetype, + upload_client, + chunk_size=chunk_size, + max_workers=max_workers, + ) + + +def upload_multipart_file( + auth_header: str, + file: pathlib.Path, + upload_client: upload_api.UploadService, + file_type: FileType | None = None, + chunk_size: int = DEFAULT_CHUNK_SIZE, + max_workers: int = DEFAULT_NUM_WORKERS, +) -> str: + """Execute a multipart upload to S3 proxied via Nominal servers. + + Args: + auth_header: Nominal authorization token + file: File to upload to S3 + upload_client: Conjure upload client + file_type: Manually override inferred file type for the given file + chunk_size: Maximum size of chunk to upload to S3 at once + max_workers: Number of worker threads to use when processing and uploading data + + Returns: Path to the uploaded object in S3 + + Note: see put_multipart_upload for more details + + """ + if file_type is None: + file_type = FileType.from_path(file) + + file_name = file.stem.split(".")[0] + with file.open("rb") as file_handle: + return upload_multipart_io( + auth_header, + file_handle, + file_name, + file_type, + upload_client, + chunk_size=chunk_size, + max_workers=max_workers, + ) + + def _abort(upload_client: upload_api.UploadService, auth_header: str, key: str, upload_id: str, e: Exception) -> None: logger.error( "aborting multipart upload due to an exception", exc_info=e, extra={"key": key, "upload_id": upload_id} diff --git a/nominal/core/client.py b/nominal/core/client.py index cf01de94..61a04909 100644 --- a/nominal/core/client.py +++ b/nominal/core/client.py @@ -1,9 +1,10 @@ from __future__ import annotations -import urllib.parse +import json +import logging from dataclasses import dataclass, field from datetime import datetime -from io import TextIOBase +from io import BytesIO, TextIOBase, TextIOWrapper from pathlib import Path from typing import BinaryIO, Iterable, Mapping, Sequence @@ -34,7 +35,7 @@ ) from nominal.core._clientsbunch import ClientsBunch from nominal.core._conjure_utils import _available_units, _build_unit_update -from nominal.core._multipart import put_multipart_upload +from nominal.core._multipart import upload_multipart_file, upload_multipart_io from nominal.core._utils import construct_user_agent_string, rid_from_instance_or_string from nominal.core.attachment import Attachment, _iter_get_attachments from nominal.core.channel import Channel @@ -58,6 +59,8 @@ from .asset import Asset +logger = logging.getLogger(__name__) + @dataclass(frozen=True) class NominalClient: @@ -235,13 +238,12 @@ def create_mcap_dataset( ) mcap_path = Path(path) - file_type = FileTypes.MCAP - urlsafe_name = urllib.parse.quote_plus(mcap_path.stem) - filename = f"{urlsafe_name}{file_type.extension}" - with open(mcap_path, "rb") as f: - s3_path = put_multipart_upload( - self._clients.auth_header, f, filename, file_type.mimetype, self._clients.upload - ) + s3_path = upload_multipart_file( + self._clients.auth_header, + mcap_path, + self._clients.upload, + file_type=FileTypes.MCAP, + ) source = ingest_api.IngestSource(s3=ingest_api.S3IngestSource(path=s3_path)) request = ingest_api.IngestMcapRequest( channel_config=[], @@ -288,12 +290,7 @@ def create_dataset_from_io( raise TypeError(f"dataset {dataset} must be open in binary mode, rather than text mode") file_type = FileType(*file_type) - urlsafe_name = urllib.parse.quote_plus(name) - filename = f"{urlsafe_name}{file_type.extension}" - - s3_path = put_multipart_upload( - self._clients.auth_header, dataset, filename, file_type.mimetype, self._clients.upload - ) + s3_path = upload_multipart_io(self._clients.auth_header, dataset, name, file_type, self._clients.upload) request = ingest_api.TriggerFileIngest( destination=ingest_api.IngestDestination( new_dataset=ingest_api.NewDatasetIngestDestination( @@ -319,7 +316,8 @@ def create_video_from_io( self, video: BinaryIO, name: str, - start: datetime | IntegralNanosecondsUTC, + start: datetime | IntegralNanosecondsUTC | None = None, + frame_timestamps: Sequence[IntegralNanosecondsUTC] | None = None, description: str | None = None, file_type: tuple[str, str] | FileType = FileTypes.MP4, *, @@ -327,28 +325,80 @@ def create_video_from_io( properties: Mapping[str, str] | None = None, ) -> Video: """Create a video from a file-like object. - The video must be a file-like object in binary mode, e.g. open(path, "rb") or io.BytesIO. + + Args: + ---- + video: file-like object to read video data from + name: Name of the video to create in Nominal + start: Starting timestamp of the video + frame_timestamps: Per-frame timestamps (in nanoseconds since unix epoch) for every frame of the video + description: Description of the video to create in nominal + file_type: Type of data being uploaded + labels: Labels to apply to the video in nominal + properties: Properties to apply to the video in nominal + + Returns: + ------- + Handle to the created video + + Note: + ---- + Exactly one of 'start' and 'frame_timestamps' **must** be provided. Most users will + want to provide a starting timestamp: frame_timestamps is primarily useful when the scale + of the video data is not 1:1 with the playback speed or non-uniform over the course of the video, + for example, 200fps video artificially slowed to 30 fps without dropping frames. This will result + in the playhead on charts within the product playing at the rate of the underlying data rather than + time elapsed in the video playback. + """ + if (start is None and frame_timestamps is None) or (None not in (start, frame_timestamps)): + raise ValueError("One of 'start' or 'frame_timestamps' must be provided") + if isinstance(video, TextIOBase): raise TypeError(f"video {video} must be open in binary mode, rather than text mode") - file_type = FileType(*file_type) - urlsafe_name = urllib.parse.quote_plus(name) - filename = f"{urlsafe_name}{file_type.extension}" + if start is None: + # Dump timestamp array into an in-memory file-like IO object + json_io = BytesIO() + text_json_io = TextIOWrapper(json_io) + json.dump(frame_timestamps, text_json_io) + text_json_io.flush() + json_io.seek(0) + + logger.debug("Uploading timestamp manifests to s3") + manifest_s3_path = upload_multipart_io( + self._clients.auth_header, + json_io, + "timestamp_manifest", + FileTypes.JSON, + self._clients.upload, + ) + timestamp_manifest = ingest_api.VideoTimestampManifest( + timestamp_manifests=ingest_api.TimestampManifest( + sources=[ + ingest_api.IngestSource( + s3=ingest_api.S3IngestSource( + path=manifest_s3_path, + ) + ) + ] + ) + ) + else: + timestamp_manifest = ingest_api.VideoTimestampManifest( + no_manifest=ingest_api.NoTimestampManifest( + starting_timestamp=_SecondsNanos.from_flexible(start).to_ingest_api() + ) + ) - s3_path = put_multipart_upload( - self._clients.auth_header, video, filename, file_type.mimetype, self._clients.upload - ) + file_type = FileType(*file_type) + s3_path = upload_multipart_io(self._clients.auth_header, video, name, file_type, self._clients.upload) request = ingest_api.IngestVideoRequest( labels=list(labels), properties={} if properties is None else dict(properties), sources=[ingest_api.IngestSource(s3=ingest_api.S3IngestSource(path=s3_path))], - timestamps=ingest_api.VideoTimestampManifest( - no_manifest=ingest_api.NoTimestampManifest( - starting_timestamp=_SecondsNanos.from_flexible(start).to_ingest_api() - ) - ), + timestamps=timestamp_manifest, description=description, title=name, ) @@ -481,11 +531,12 @@ def create_attachment_from_io( raise TypeError(f"attachment {attachment} must be open in binary mode, rather than text mode") file_type = FileType(*file_type) - urlsafe_name = urllib.parse.quote_plus(name) - filename = f"{urlsafe_name}{file_type.extension}" - - s3_path = put_multipart_upload( - self._clients.auth_header, attachment, filename, file_type.mimetype, self._clients.upload + s3_path = upload_multipart_io( + self._clients.auth_header, + attachment, + name, + file_type, + self._clients.upload, ) request = attachments_api.CreateAttachmentRequest( description=description or "", @@ -520,7 +571,7 @@ def get_unit(self, unit_symbol: str) -> Unit | None: NOTE: This currently requires that units are formatted as laid out in the latest UCUM standards (see https://ucum.org/ucum) - Returns + Returns: ------- Rendered Unit metadata if the symbol is valid and supported by Nominal, or None if no such unit symbol matches. @@ -556,7 +607,7 @@ def set_channel_units(self, rids_to_types: Mapping[str, str | None]) -> Sequence rids_to_types: Mapping of channel RIDs -> unit symbols (e.g. 'm/s'). NOTE: Providing `None` as the unit symbol clears any existing units for the channels. - Returns + Returns: ------- A sequence of metadata for all updated channels Raises: @@ -604,12 +655,7 @@ def create_video_from_mcap_io( raise TypeError(f"dataset {mcap} must be open in binary mode, rather than text mode") file_type = FileType(*file_type) - urlsafe_name = urllib.parse.quote_plus(name) - filename = f"{urlsafe_name}{file_type.extension}" - - s3_path = put_multipart_upload( - self._clients.auth_header, mcap, filename, file_type.mimetype, self._clients.upload - ) + s3_path = upload_multipart_io(self._clients.auth_header, mcap, name, file_type, self._clients.upload) request = ingest_api.IngestMcapRequest( channel_config=[ ingest_api.McapChannelConfig( diff --git a/nominal/core/dataset.py b/nominal/core/dataset.py index 60510c54..c641cdc8 100644 --- a/nominal/core/dataset.py +++ b/nominal/core/dataset.py @@ -2,7 +2,6 @@ import logging import time -import urllib.parse from dataclasses import dataclass, field from datetime import timedelta from io import TextIOBase @@ -27,7 +26,7 @@ from nominal._utils import FileType, FileTypes from nominal.core._clientsbunch import HasAuthHeader from nominal.core._conjure_utils import _available_units, _build_unit_update -from nominal.core._multipart import put_multipart_upload +from nominal.core._multipart import upload_multipart_io from nominal.core._utils import HasRid, update_dataclass from nominal.core.channel import Channel, _get_series_values_csv from nominal.exceptions import NominalIngestError, NominalIngestFailed, NominalIngestMultiError @@ -92,7 +91,7 @@ def poll_until_ingestion_completed(self, interval: timedelta = timedelta(seconds """Block until dataset ingestion has completed. This method polls Nominal for ingest status after uploading a dataset on an interval. - Raises + Raises: ------ NominalIngestFailed: if the ingest failed NominalIngestError: if the ingest status is not known @@ -181,13 +180,15 @@ def add_to_dataset_from_io( if isinstance(dataset, TextIOBase): raise TypeError(f"dataset {dataset!r} must be open in binary mode, rather than text mode") - file_type = FileType(*file_type) - self.poll_until_ingestion_completed() - urlsafe_name = urllib.parse.quote_plus(self.name) - filename = f"{urlsafe_name}{file_type.extension}" - s3_path = put_multipart_upload( - self._clients.auth_header, dataset, filename, file_type.mimetype, self._clients.upload + + file_type = FileType(*file_type) + s3_path = upload_multipart_io( + self._clients.auth_header, + dataset, + self.name, + file_type, + self._clients.upload, ) request = ingest_api.TriggerFileIngest( destination=ingest_api.IngestDestination( @@ -394,7 +395,7 @@ def poll_until_ingestion_completed(datasets: Iterable[Dataset], interval: timede This method polls Nominal for ingest status on each of the datasets on an interval. No specific ordering is guaranteed, but all datasets will be checked at least once. - Raises + Raises: ------ NominalIngestMultiError: if any of the datasets failed to ingest diff --git a/nominal/core/video.py b/nominal/core/video.py index e80735b3..0a89b185 100644 --- a/nominal/core/video.py +++ b/nominal/core/video.py @@ -31,7 +31,7 @@ def poll_until_ingestion_completed(self, interval: timedelta = timedelta(seconds """Block until video ingestion has completed. This method polls Nominal for ingest status after uploading a video on an interval. - Raises + Raises: ------ NominalIngestFailed: if the ingest failed NominalIngestError: if the ingest status is not known diff --git a/nominal/exceptions.py b/nominal/exceptions.py index dee24fdd..5baa3dc4 100644 --- a/nominal/exceptions.py +++ b/nominal/exceptions.py @@ -12,7 +12,7 @@ class NominalIngestError(NominalError): class NominalIngestMultiError(NominalError): """Error(s) occurred during ingest. - Attributes + Attributes: ---------- errors: A mapping of dataset RIDs to the errors that occurred during ingest. diff --git a/nominal/nominal.py b/nominal/nominal.py index 510a31d3..091a9db0 100644 --- a/nominal/nominal.py +++ b/nominal/nominal.py @@ -376,7 +376,11 @@ def upload_video( file_type = FileType.from_path(path) with open(file, "rb") as f: return conn.create_video_from_io( - f, name, ts._SecondsNanos.from_flexible(start).to_nanoseconds(), description, file_type + f, + name, + start=ts._SecondsNanos.from_flexible(start).to_nanoseconds(), + description=description, + file_type=file_type, ) diff --git a/pyproject.toml b/pyproject.toml index 2d8ed6ae..2e1c481d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -67,6 +67,9 @@ line-length = 120 exclude = ["nominal/_api/*"] include = ["nominal/**/*.py", "tests/**/*.py"] +[tool.ruff.lint.pydocstyle] +convention = "google" + [tool.ruff.lint.pylint] max-args = 10