From 7e618565b2222316b8c7e46fcb3cf07389044476 Mon Sep 17 00:00:00 2001 From: Drake Eidukas Date: Wed, 27 Nov 2024 09:35:56 -0800 Subject: [PATCH 1/8] Create more convenient methods for s3 upload --- nominal/core/_multipart.py | 45 ++++++++++++++++++++++++++++++++++ nominal/core/client.py | 50 ++++++++++++-------------------------- nominal/core/dataset.py | 17 +++++++------ 3 files changed, 70 insertions(+), 42 deletions(-) diff --git a/nominal/core/_multipart.py b/nominal/core/_multipart.py index a5768677..fa26b076 100644 --- a/nominal/core/_multipart.py +++ b/nominal/core/_multipart.py @@ -2,6 +2,8 @@ import concurrent.futures import logging +import pathlib +import urllib.parse from functools import partial from queue import Queue from typing import BinaryIO, Iterable @@ -9,6 +11,7 @@ import requests from nominal._api.scout_service_api import ingest_api, upload_api +from nominal._utils import FileType from nominal.exceptions import NominalMultipartUploadFailed logger = logging.getLogger(__name__) @@ -118,6 +121,48 @@ def put_multipart_upload( raise e +def upload_multipart_io( + auth_header: str, + f: BinaryIO, + name: str, + file_type: FileType, + upload_client: upload_api.UploadService, + **put_multipart_kwargs, +) -> str: + urlsafe_name = urllib.parse.quote_plus(name) + safe_filename = f"{urlsafe_name}{file_type.extension}" + return put_multipart_upload( + auth_header, + f, + safe_filename, + file_type.mimetype, + upload_client, + **put_multipart_kwargs, + ) + + +def upload_multipart_file( + auth_header: str, + file: pathlib.Path, + upload_client: upload_api.UploadService, + file_type: FileType | None = None, + **put_multipart_kwargs, +) -> str: + if file_type is None: + file_type = FileType.from_path(file) + + file_name = file.stem.split(".")[0] + with file.open("rb") as file_handle: + return upload_multipart_io( + auth_header, + file_handle, + file_name, + file_type, + upload_client, + **put_multipart_kwargs, + ) + + def _abort(upload_client: upload_api.UploadService, auth_header: str, key: str, upload_id: str, e: Exception) -> None: logger.error( "aborting multipart upload due to an exception", exc_info=e, extra={"key": key, "upload_id": upload_id} diff --git a/nominal/core/client.py b/nominal/core/client.py index cf01de94..c812366d 100644 --- a/nominal/core/client.py +++ b/nominal/core/client.py @@ -1,6 +1,5 @@ from __future__ import annotations -import urllib.parse from dataclasses import dataclass, field from datetime import datetime from io import TextIOBase @@ -34,7 +33,7 @@ ) from nominal.core._clientsbunch import ClientsBunch from nominal.core._conjure_utils import _available_units, _build_unit_update -from nominal.core._multipart import put_multipart_upload +from nominal.core._multipart import upload_multipart_file, upload_multipart_io from nominal.core._utils import construct_user_agent_string, rid_from_instance_or_string from nominal.core.attachment import Attachment, _iter_get_attachments from nominal.core.channel import Channel @@ -235,13 +234,12 @@ def create_mcap_dataset( ) mcap_path = Path(path) - file_type = FileTypes.MCAP - urlsafe_name = urllib.parse.quote_plus(mcap_path.stem) - filename = f"{urlsafe_name}{file_type.extension}" - with open(mcap_path, "rb") as f: - s3_path = put_multipart_upload( - self._clients.auth_header, f, filename, file_type.mimetype, self._clients.upload - ) + s3_path = upload_multipart_file( + self._clients.auth_header, + mcap_path, + self._clients.upload, + file_type=FileTypes.MCAP, + ) source = ingest_api.IngestSource(s3=ingest_api.S3IngestSource(path=s3_path)) request = ingest_api.IngestMcapRequest( channel_config=[], @@ -287,13 +285,7 @@ def create_dataset_from_io( if isinstance(dataset, TextIOBase): raise TypeError(f"dataset {dataset} must be open in binary mode, rather than text mode") - file_type = FileType(*file_type) - urlsafe_name = urllib.parse.quote_plus(name) - filename = f"{urlsafe_name}{file_type.extension}" - - s3_path = put_multipart_upload( - self._clients.auth_header, dataset, filename, file_type.mimetype, self._clients.upload - ) + s3_path = upload_multipart_io(self._clients.auth_header, dataset, name, file_type, self._clients.upload) request = ingest_api.TriggerFileIngest( destination=ingest_api.IngestDestination( new_dataset=ingest_api.NewDatasetIngestDestination( @@ -333,13 +325,7 @@ def create_video_from_io( if isinstance(video, TextIOBase): raise TypeError(f"video {video} must be open in binary mode, rather than text mode") - file_type = FileType(*file_type) - urlsafe_name = urllib.parse.quote_plus(name) - filename = f"{urlsafe_name}{file_type.extension}" - - s3_path = put_multipart_upload( - self._clients.auth_header, video, filename, file_type.mimetype, self._clients.upload - ) + s3_path = upload_multipart_io(self._clients.auth_header, video, name, file_type, self._clients.upload) request = ingest_api.IngestVideoRequest( labels=list(labels), properties={} if properties is None else dict(properties), @@ -481,11 +467,12 @@ def create_attachment_from_io( raise TypeError(f"attachment {attachment} must be open in binary mode, rather than text mode") file_type = FileType(*file_type) - urlsafe_name = urllib.parse.quote_plus(name) - filename = f"{urlsafe_name}{file_type.extension}" - - s3_path = put_multipart_upload( - self._clients.auth_header, attachment, filename, file_type.mimetype, self._clients.upload + s3_path = upload_multipart_io( + self._clients.auth_header, + attachment, + name, + file_type, + self._clients.upload, ) request = attachments_api.CreateAttachmentRequest( description=description or "", @@ -604,12 +591,7 @@ def create_video_from_mcap_io( raise TypeError(f"dataset {mcap} must be open in binary mode, rather than text mode") file_type = FileType(*file_type) - urlsafe_name = urllib.parse.quote_plus(name) - filename = f"{urlsafe_name}{file_type.extension}" - - s3_path = put_multipart_upload( - self._clients.auth_header, mcap, filename, file_type.mimetype, self._clients.upload - ) + s3_path = upload_multipart_io(self._clients.auth_header, mcap, name, file_type, self._clients.upload) request = ingest_api.IngestMcapRequest( channel_config=[ ingest_api.McapChannelConfig( diff --git a/nominal/core/dataset.py b/nominal/core/dataset.py index 60510c54..1954d65b 100644 --- a/nominal/core/dataset.py +++ b/nominal/core/dataset.py @@ -2,7 +2,6 @@ import logging import time -import urllib.parse from dataclasses import dataclass, field from datetime import timedelta from io import TextIOBase @@ -27,7 +26,7 @@ from nominal._utils import FileType, FileTypes from nominal.core._clientsbunch import HasAuthHeader from nominal.core._conjure_utils import _available_units, _build_unit_update -from nominal.core._multipart import put_multipart_upload +from nominal.core._multipart import upload_multipart_io from nominal.core._utils import HasRid, update_dataclass from nominal.core.channel import Channel, _get_series_values_csv from nominal.exceptions import NominalIngestError, NominalIngestFailed, NominalIngestMultiError @@ -181,13 +180,15 @@ def add_to_dataset_from_io( if isinstance(dataset, TextIOBase): raise TypeError(f"dataset {dataset!r} must be open in binary mode, rather than text mode") - file_type = FileType(*file_type) - self.poll_until_ingestion_completed() - urlsafe_name = urllib.parse.quote_plus(self.name) - filename = f"{urlsafe_name}{file_type.extension}" - s3_path = put_multipart_upload( - self._clients.auth_header, dataset, filename, file_type.mimetype, self._clients.upload + + file_type = FileType(*file_type) + s3_path = upload_multipart_io( + self._clients.auth_header, + dataset, + self.name, + file_type, + self._clients.upload, ) request = ingest_api.TriggerFileIngest( destination=ingest_api.IngestDestination( From 6f9b61b5a20fd3c42bf833ef3a275180df75d8b8 Mon Sep 17 00:00:00 2001 From: Drake Eidukas Date: Wed, 27 Nov 2024 09:56:13 -0800 Subject: [PATCH 2/8] Allow uploading timestamped videos --- nominal/_utils.py | 7 +++--- nominal/core/_multipart.py | 19 +++++++++----- nominal/core/client.py | 51 +++++++++++++++++++++++++++++++++----- 3 files changed, 62 insertions(+), 15 deletions(-) diff --git a/nominal/_utils.py b/nominal/_utils.py index 58b5f31f..7743ac92 100644 --- a/nominal/_utils.py +++ b/nominal/_utils.py @@ -41,13 +41,14 @@ def from_path_dataset(cls, path: Path | str) -> FileType: class FileTypes: + BINARY: FileType = FileType("", "application/octet-stream") CSV: FileType = FileType(".csv", "text/csv") CSV_GZ: FileType = FileType(".csv.gz", "text/csv") - # https://issues.apache.org/jira/browse/PARQUET-1889 - PARQUET: FileType = FileType(".parquet", "application/vnd.apache.parquet") + JSON: FileType = FileType(".json", "application/json") MP4: FileType = FileType(".mp4", "video/mp4") - BINARY: FileType = FileType("", "application/octet-stream") MCAP: FileType = FileType(".mcap", "application/octet-stream") + # https://issues.apache.org/jira/browse/PARQUET-1889 + PARQUET: FileType = FileType(".parquet", "application/vnd.apache.parquet") @contextmanager diff --git a/nominal/core/_multipart.py b/nominal/core/_multipart.py index fa26b076..94305052 100644 --- a/nominal/core/_multipart.py +++ b/nominal/core/_multipart.py @@ -16,6 +16,9 @@ logger = logging.getLogger(__name__) +DEFAULT_CHUNK_SIZE = 64_000_000 +DEFAULT_NUM_WORKERS = 8 + def _sign_and_upload_part_job( upload_client: upload_api.UploadService, @@ -57,8 +60,8 @@ def put_multipart_upload( filename: str, mimetype: str, upload_client: upload_api.UploadService, - chunk_size: int = 64_000_000, - max_workers: int = 8, + chunk_size: int = DEFAULT_CHUNK_SIZE, + max_workers: int = DEFAULT_NUM_WORKERS, ) -> str: """Execute a multipart upload to S3. @@ -127,7 +130,8 @@ def upload_multipart_io( name: str, file_type: FileType, upload_client: upload_api.UploadService, - **put_multipart_kwargs, + chunk_size: int = DEFAULT_CHUNK_SIZE, + max_workers: int = DEFAULT_NUM_WORKERS, ) -> str: urlsafe_name = urllib.parse.quote_plus(name) safe_filename = f"{urlsafe_name}{file_type.extension}" @@ -137,7 +141,8 @@ def upload_multipart_io( safe_filename, file_type.mimetype, upload_client, - **put_multipart_kwargs, + chunk_size=chunk_size, + max_workers=max_workers, ) @@ -146,7 +151,8 @@ def upload_multipart_file( file: pathlib.Path, upload_client: upload_api.UploadService, file_type: FileType | None = None, - **put_multipart_kwargs, + chunk_size: int = DEFAULT_CHUNK_SIZE, + max_workers: int = DEFAULT_NUM_WORKERS, ) -> str: if file_type is None: file_type = FileType.from_path(file) @@ -159,7 +165,8 @@ def upload_multipart_file( file_name, file_type, upload_client, - **put_multipart_kwargs, + chunk_size=chunk_size, + max_workers=max_workers, ) diff --git a/nominal/core/client.py b/nominal/core/client.py index c812366d..aba6997f 100644 --- a/nominal/core/client.py +++ b/nominal/core/client.py @@ -1,5 +1,8 @@ from __future__ import annotations +import json +import logging +import tempfile from dataclasses import dataclass, field from datetime import datetime from io import TextIOBase @@ -57,6 +60,8 @@ from .asset import Asset +logger = logging.getLogger(__name__) + @dataclass(frozen=True) class NominalClient: @@ -285,6 +290,7 @@ def create_dataset_from_io( if isinstance(dataset, TextIOBase): raise TypeError(f"dataset {dataset} must be open in binary mode, rather than text mode") + file_type = FileType(*file_type) s3_path = upload_multipart_io(self._clients.auth_header, dataset, name, file_type, self._clients.upload) request = ingest_api.TriggerFileIngest( destination=ingest_api.IngestDestination( @@ -311,30 +317,63 @@ def create_video_from_io( self, video: BinaryIO, name: str, - start: datetime | IntegralNanosecondsUTC, + start: datetime | IntegralNanosecondsUTC | None = None, description: str | None = None, file_type: tuple[str, str] | FileType = FileTypes.MP4, *, labels: Sequence[str] = (), properties: Mapping[str, str] | None = None, + frame_timestamps: Sequence[IntegralNanosecondsUTC] | None = None, ) -> Video: """Create a video from a file-like object. The video must be a file-like object in binary mode, e.g. open(path, "rb") or io.BytesIO. """ + if (start is None and frame_timestamps is None) and (None not in (start, frame_timestamps)): + raise ValueError("One of 'start' or 'frame_timestamps' must be provided") + if isinstance(video, TextIOBase): raise TypeError(f"video {video} must be open in binary mode, rather than text mode") + # if frame_timestamps is None: + if start is None: + with tempfile.NamedTemporaryFile(suffix=".json", delete_on_close=False) as tmp: + tmp.close() + timestamp_manifest_path = Path(tmp.name) + + logger.debug(f"Writing timestamp manifest to '{timestamp_manifest_path}'") + timestamp_manifest_path.write_text(json.dumps(frame_timestamps)) + + logger.debug("Uploading timestamp manifests to s3") + manifest_s3_path = upload_multipart_file( + self._clients.auth_header, timestamp_manifest_path, self._clients.upload + ) + + timestamp_manifest = ingest_api.VideoTimestampManifest( + timestamp_manifests=ingest_api.TimestampManifest( + sources=[ + ingest_api.IngestSource( + s3=ingest_api.S3IngestSource( + path=manifest_s3_path, + ) + ) + ] + ) + ) + else: + timestamp_manifest = ingest_api.VideoTimestampManifest( + no_manifest=ingest_api.NoTimestampManifest( + starting_timestamp=_SecondsNanos.from_flexible(start).to_ingest_api() + ) + ) + + file_type = FileType(*file_type) s3_path = upload_multipart_io(self._clients.auth_header, video, name, file_type, self._clients.upload) request = ingest_api.IngestVideoRequest( labels=list(labels), properties={} if properties is None else dict(properties), sources=[ingest_api.IngestSource(s3=ingest_api.S3IngestSource(path=s3_path))], - timestamps=ingest_api.VideoTimestampManifest( - no_manifest=ingest_api.NoTimestampManifest( - starting_timestamp=_SecondsNanos.from_flexible(start).to_ingest_api() - ) - ), + timestamps=timestamp_manifest, description=description, title=name, ) From 343b5195cddf7fc9886c79cbe74f5ceacb557ad0 Mon Sep 17 00:00:00 2001 From: Drake Eidukas Date: Wed, 27 Nov 2024 10:59:44 -0800 Subject: [PATCH 3/8] Add documentation --- nominal/core/_multipart.py | 46 +++++++++++++++++++++++++++++++++++++- nominal/core/client.py | 32 ++++++++++++++++++++++---- nominal/core/dataset.py | 4 ++-- nominal/core/video.py | 2 +- nominal/exceptions.py | 2 +- nominal/nominal.py | 6 ++++- pyproject.toml | 3 +++ 7 files changed, 85 insertions(+), 10 deletions(-) diff --git a/nominal/core/_multipart.py b/nominal/core/_multipart.py index 94305052..6b0177f1 100644 --- a/nominal/core/_multipart.py +++ b/nominal/core/_multipart.py @@ -68,7 +68,19 @@ def put_multipart_upload( All metadata-style requests (init, sign, complete) proxy through Nominal servers, while the upload PUT requests for each part go to a pre-signed URL to the storage provider. - Ref: https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html + Args: + auth_header: Nominal authorization token + f: Binary IO to upload + filename: URL-safe filename to use when uploading to S3 + mimetype: Type of data contained within binary stream + upload_client: Conjure upload client + chunk_size: Maximum size of chunk to upload to S3 at once + max_workers: Number of worker threads to use when processing and uploading data + + Returns: Path to the uploaded object in S3 + + See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html + """ # muiltithreaded multipart upload: # - create a worker thread pool and a queue for all threads to share @@ -133,6 +145,23 @@ def upload_multipart_io( chunk_size: int = DEFAULT_CHUNK_SIZE, max_workers: int = DEFAULT_NUM_WORKERS, ) -> str: + """Execute a multipart upload to S3 proxied via Nominal servers + + Args: + auth_header: Nominal authorization token + f: Binary IO to upload + name: Name of the file to create in S3 + NOTE: does not need to be URL Safe + file_type: Type of data being uploaded + upload_client: Conjure upload client + chunk_size: Maximum size of chunk to upload to S3 at once + max_workers: Number of worker threads to use when processing and uploading data + + Returns: Path to the uploaded object in S3 + + Note: see put_multipart_upload for more details + + """ urlsafe_name = urllib.parse.quote_plus(name) safe_filename = f"{urlsafe_name}{file_type.extension}" return put_multipart_upload( @@ -154,6 +183,21 @@ def upload_multipart_file( chunk_size: int = DEFAULT_CHUNK_SIZE, max_workers: int = DEFAULT_NUM_WORKERS, ) -> str: + """Execute a multipart upload to S3 proxied via Nominal servers. + + Args: + auth_header: Nominal authorization token + file: File to upload to S3 + upload_client: Conjure upload client + file_type: Manually override inferred file type for the given file + chunk_size: Maximum size of chunk to upload to S3 at once + max_workers: Number of worker threads to use when processing and uploading data + + Returns: Path to the uploaded object in S3 + + Note: see put_multipart_upload for more details + + """ if file_type is None: file_type = FileType.from_path(file) diff --git a/nominal/core/client.py b/nominal/core/client.py index aba6997f..cec05d35 100644 --- a/nominal/core/client.py +++ b/nominal/core/client.py @@ -318,16 +318,40 @@ def create_video_from_io( video: BinaryIO, name: str, start: datetime | IntegralNanosecondsUTC | None = None, + frame_timestamps: Sequence[IntegralNanosecondsUTC] | None = None, description: str | None = None, file_type: tuple[str, str] | FileType = FileTypes.MP4, *, labels: Sequence[str] = (), properties: Mapping[str, str] | None = None, - frame_timestamps: Sequence[IntegralNanosecondsUTC] | None = None, ) -> Video: """Create a video from a file-like object. - The video must be a file-like object in binary mode, e.g. open(path, "rb") or io.BytesIO. + + Args: + ---- + video: file-like object to read video data from + name: Name of the video to create in Nominal + start: Starting timestamp of the video + frame_timestamps: Per-frame timestamps (in nanoseconds since unix epoch) for every frame of the video + description: Description of the video to create in nominal + file_type: Type of data being uploaded + labels: Labels to apply to the video in nominal + properties: Properties to apply to the video in nominal + + Returns: + ------- + Handle to the created video + + Note: + ---- + Exactly one of 'start' and 'frame_timestamps' **must** be provided. Most users will + want to provide a starting timestamp: frame_timestamps is primarily useful when the scale + of the video data is not 1:1 with the playback speed or non-uniform over the course of the video, + for example, 200fps video artificially slowed to 30 fps without dropping frames. This will result + in the playhead on charts within the product playing at the rate of the underlying data rather than + time elapsed in the video playback. + """ if (start is None and frame_timestamps is None) and (None not in (start, frame_timestamps)): raise ValueError("One of 'start' or 'frame_timestamps' must be provided") @@ -546,7 +570,7 @@ def get_unit(self, unit_symbol: str) -> Unit | None: NOTE: This currently requires that units are formatted as laid out in the latest UCUM standards (see https://ucum.org/ucum) - Returns + Returns: ------- Rendered Unit metadata if the symbol is valid and supported by Nominal, or None if no such unit symbol matches. @@ -582,7 +606,7 @@ def set_channel_units(self, rids_to_types: Mapping[str, str | None]) -> Sequence rids_to_types: Mapping of channel RIDs -> unit symbols (e.g. 'm/s'). NOTE: Providing `None` as the unit symbol clears any existing units for the channels. - Returns + Returns: ------- A sequence of metadata for all updated channels Raises: diff --git a/nominal/core/dataset.py b/nominal/core/dataset.py index 1954d65b..c641cdc8 100644 --- a/nominal/core/dataset.py +++ b/nominal/core/dataset.py @@ -91,7 +91,7 @@ def poll_until_ingestion_completed(self, interval: timedelta = timedelta(seconds """Block until dataset ingestion has completed. This method polls Nominal for ingest status after uploading a dataset on an interval. - Raises + Raises: ------ NominalIngestFailed: if the ingest failed NominalIngestError: if the ingest status is not known @@ -395,7 +395,7 @@ def poll_until_ingestion_completed(datasets: Iterable[Dataset], interval: timede This method polls Nominal for ingest status on each of the datasets on an interval. No specific ordering is guaranteed, but all datasets will be checked at least once. - Raises + Raises: ------ NominalIngestMultiError: if any of the datasets failed to ingest diff --git a/nominal/core/video.py b/nominal/core/video.py index e80735b3..0a89b185 100644 --- a/nominal/core/video.py +++ b/nominal/core/video.py @@ -31,7 +31,7 @@ def poll_until_ingestion_completed(self, interval: timedelta = timedelta(seconds """Block until video ingestion has completed. This method polls Nominal for ingest status after uploading a video on an interval. - Raises + Raises: ------ NominalIngestFailed: if the ingest failed NominalIngestError: if the ingest status is not known diff --git a/nominal/exceptions.py b/nominal/exceptions.py index dee24fdd..5baa3dc4 100644 --- a/nominal/exceptions.py +++ b/nominal/exceptions.py @@ -12,7 +12,7 @@ class NominalIngestError(NominalError): class NominalIngestMultiError(NominalError): """Error(s) occurred during ingest. - Attributes + Attributes: ---------- errors: A mapping of dataset RIDs to the errors that occurred during ingest. diff --git a/nominal/nominal.py b/nominal/nominal.py index 510a31d3..091a9db0 100644 --- a/nominal/nominal.py +++ b/nominal/nominal.py @@ -376,7 +376,11 @@ def upload_video( file_type = FileType.from_path(path) with open(file, "rb") as f: return conn.create_video_from_io( - f, name, ts._SecondsNanos.from_flexible(start).to_nanoseconds(), description, file_type + f, + name, + start=ts._SecondsNanos.from_flexible(start).to_nanoseconds(), + description=description, + file_type=file_type, ) diff --git a/pyproject.toml b/pyproject.toml index 2d8ed6ae..2e1c481d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -67,6 +67,9 @@ line-length = 120 exclude = ["nominal/_api/*"] include = ["nominal/**/*.py", "tests/**/*.py"] +[tool.ruff.lint.pydocstyle] +convention = "google" + [tool.ruff.lint.pylint] max-args = 10 From 81ba8f82e01e6e815036906d304b44ed8317282f Mon Sep 17 00:00:00 2001 From: Drake Eidukas Date: Wed, 27 Nov 2024 11:06:08 -0800 Subject: [PATCH 4/8] Support py<3.12 --- nominal/core/client.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/nominal/core/client.py b/nominal/core/client.py index cec05d35..dde8d685 100644 --- a/nominal/core/client.py +++ b/nominal/core/client.py @@ -361,8 +361,7 @@ def create_video_from_io( # if frame_timestamps is None: if start is None: - with tempfile.NamedTemporaryFile(suffix=".json", delete_on_close=False) as tmp: - tmp.close() + with tempfile.NamedTemporaryFile(suffix=".json", delete=False) as tmp: timestamp_manifest_path = Path(tmp.name) logger.debug(f"Writing timestamp manifest to '{timestamp_manifest_path}'") @@ -373,6 +372,12 @@ def create_video_from_io( self._clients.auth_header, timestamp_manifest_path, self._clients.upload ) + # TODO (drake): once 3.12 is minimally supported version, use delete_on_close to + # simply close at the time of creation to keep delete=True when creating + # tempfile. + logger.debug(f"Deleting timestamp manifest file '{timestamp_manifest_path}'") + timestamp_manifest_path.unlink() + timestamp_manifest = ingest_api.VideoTimestampManifest( timestamp_manifests=ingest_api.TimestampManifest( sources=[ From 0e539f88e69004fdb8b2f7165eba0729264703f4 Mon Sep 17 00:00:00 2001 From: Drake Eidukas Date: Wed, 27 Nov 2024 11:09:38 -0800 Subject: [PATCH 5/8] Fix checking that one of start or frame_timestamps is provided --- nominal/core/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nominal/core/client.py b/nominal/core/client.py index dde8d685..50bfccd8 100644 --- a/nominal/core/client.py +++ b/nominal/core/client.py @@ -353,7 +353,7 @@ def create_video_from_io( time elapsed in the video playback. """ - if (start is None and frame_timestamps is None) and (None not in (start, frame_timestamps)): + if (start is None and frame_timestamps is None) or (None not in (start, frame_timestamps)): raise ValueError("One of 'start' or 'frame_timestamps' must be provided") if isinstance(video, TextIOBase): From b4bbb0748a4ca123e76bc85a5a23dee0dd3e3590 Mon Sep 17 00:00:00 2001 From: Drake Eidukas Date: Wed, 27 Nov 2024 11:16:13 -0800 Subject: [PATCH 6/8] Use BinaryIO instead of temporary file for timestamp manifest --- nominal/core/client.py | 29 +++++++++++------------------ 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/nominal/core/client.py b/nominal/core/client.py index 50bfccd8..030eaea2 100644 --- a/nominal/core/client.py +++ b/nominal/core/client.py @@ -2,10 +2,9 @@ import json import logging -import tempfile from dataclasses import dataclass, field from datetime import datetime -from io import TextIOBase +from io import BytesIO, TextIOBase from pathlib import Path from typing import BinaryIO, Iterable, Mapping, Sequence @@ -361,22 +360,16 @@ def create_video_from_io( # if frame_timestamps is None: if start is None: - with tempfile.NamedTemporaryFile(suffix=".json", delete=False) as tmp: - timestamp_manifest_path = Path(tmp.name) - - logger.debug(f"Writing timestamp manifest to '{timestamp_manifest_path}'") - timestamp_manifest_path.write_text(json.dumps(frame_timestamps)) - - logger.debug("Uploading timestamp manifests to s3") - manifest_s3_path = upload_multipart_file( - self._clients.auth_header, timestamp_manifest_path, self._clients.upload - ) - - # TODO (drake): once 3.12 is minimally supported version, use delete_on_close to - # simply close at the time of creation to keep delete=True when creating - # tempfile. - logger.debug(f"Deleting timestamp manifest file '{timestamp_manifest_path}'") - timestamp_manifest_path.unlink() + logger.debug("Uploading timestamp manifests to s3") + json_io = BytesIO() + json_io.write(json.dumps(frame_timestamps).encode()) + manifest_s3_path = upload_multipart_io( + self._clients.auth_header, + json_io, + "timestamp_manifest", + FileTypes.JSON, + self._clients.upload, + ) timestamp_manifest = ingest_api.VideoTimestampManifest( timestamp_manifests=ingest_api.TimestampManifest( From 26621c33d82ba1edba1950f6ae837a2ca127b3db Mon Sep 17 00:00:00 2001 From: Drake Eidukas Date: Wed, 27 Nov 2024 16:39:30 -0800 Subject: [PATCH 7/8] PR Feedback --- nominal/core/client.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/nominal/core/client.py b/nominal/core/client.py index 030eaea2..86c57057 100644 --- a/nominal/core/client.py +++ b/nominal/core/client.py @@ -4,7 +4,7 @@ import logging from dataclasses import dataclass, field from datetime import datetime -from io import BytesIO, TextIOBase +from io import BytesIO, TextIOBase, TextIOWrapper from pathlib import Path from typing import BinaryIO, Iterable, Mapping, Sequence @@ -358,11 +358,14 @@ def create_video_from_io( if isinstance(video, TextIOBase): raise TypeError(f"video {video} must be open in binary mode, rather than text mode") - # if frame_timestamps is None: if start is None: - logger.debug("Uploading timestamp manifests to s3") + # Dump timestamp array into an in-memory file-like IO object json_io = BytesIO() - json_io.write(json.dumps(frame_timestamps).encode()) + text_json_io = TextIOWrapper(json_io) + json.dump(frame_timestamps, text_json_io) + text_json_io.flush() + + logger.debug("Uploading timestamp manifests to s3") manifest_s3_path = upload_multipart_io( self._clients.auth_header, json_io, @@ -370,7 +373,6 @@ def create_video_from_io( FileTypes.JSON, self._clients.upload, ) - timestamp_manifest = ingest_api.VideoTimestampManifest( timestamp_manifests=ingest_api.TimestampManifest( sources=[ From db21a71d7a5f2953ed695a17a9f90b6ead0d124e Mon Sep 17 00:00:00 2001 From: Drake Eidukas Date: Wed, 27 Nov 2024 17:45:59 -0800 Subject: [PATCH 8/8] seek bytesIO to start before upload --- nominal/core/client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/nominal/core/client.py b/nominal/core/client.py index 86c57057..61a04909 100644 --- a/nominal/core/client.py +++ b/nominal/core/client.py @@ -364,6 +364,7 @@ def create_video_from_io( text_json_io = TextIOWrapper(json_io) json.dump(frame_timestamps, text_json_io) text_json_io.flush() + json_io.seek(0) logger.debug("Uploading timestamp manifests to s3") manifest_s3_path = upload_multipart_io(