Skip to content

feat: allow uploading manually timestamped videos #156

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions nominal/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,14 @@ def from_path_dataset(cls, path: Path | str) -> FileType:


class FileTypes:
BINARY: FileType = FileType("", "application/octet-stream")
CSV: FileType = FileType(".csv", "text/csv")
CSV_GZ: FileType = FileType(".csv.gz", "text/csv")
# https://issues.apache.org/jira/browse/PARQUET-1889
PARQUET: FileType = FileType(".parquet", "application/vnd.apache.parquet")
JSON: FileType = FileType(".json", "application/json")
MP4: FileType = FileType(".mp4", "video/mp4")
BINARY: FileType = FileType("", "application/octet-stream")
MCAP: FileType = FileType(".mcap", "application/octet-stream")
# https://issues.apache.org/jira/browse/PARQUET-1889
PARQUET: FileType = FileType(".parquet", "application/vnd.apache.parquet")


@contextmanager
Expand Down
102 changes: 99 additions & 3 deletions nominal/core/_multipart.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,23 @@

import concurrent.futures
import logging
import pathlib
import urllib.parse
from functools import partial
from queue import Queue
from typing import BinaryIO, Iterable

import requests

from nominal._api.scout_service_api import ingest_api, upload_api
from nominal._utils import FileType
from nominal.exceptions import NominalMultipartUploadFailed

logger = logging.getLogger(__name__)

DEFAULT_CHUNK_SIZE = 64_000_000
DEFAULT_NUM_WORKERS = 8


def _sign_and_upload_part_job(
upload_client: upload_api.UploadService,
Expand Down Expand Up @@ -54,15 +60,27 @@ def put_multipart_upload(
filename: str,
mimetype: str,
upload_client: upload_api.UploadService,
chunk_size: int = 64_000_000,
max_workers: int = 8,
chunk_size: int = DEFAULT_CHUNK_SIZE,
max_workers: int = DEFAULT_NUM_WORKERS,
) -> str:
"""Execute a multipart upload to S3.

All metadata-style requests (init, sign, complete) proxy through Nominal servers, while the upload PUT requests for
each part go to a pre-signed URL to the storage provider.

Ref: https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
Args:
auth_header: Nominal authorization token
f: Binary IO to upload
filename: URL-safe filename to use when uploading to S3
mimetype: Type of data contained within binary stream
upload_client: Conjure upload client
chunk_size: Maximum size of chunk to upload to S3 at once
max_workers: Number of worker threads to use when processing and uploading data

Returns: Path to the uploaded object in S3

See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html

"""
# muiltithreaded multipart upload:
# - create a worker thread pool and a queue for all threads to share
Expand Down Expand Up @@ -118,6 +136,84 @@ def put_multipart_upload(
raise e


def upload_multipart_io(
auth_header: str,
f: BinaryIO,
name: str,
file_type: FileType,
upload_client: upload_api.UploadService,
chunk_size: int = DEFAULT_CHUNK_SIZE,
max_workers: int = DEFAULT_NUM_WORKERS,
) -> str:
"""Execute a multipart upload to S3 proxied via Nominal servers

Args:
auth_header: Nominal authorization token
f: Binary IO to upload
name: Name of the file to create in S3
NOTE: does not need to be URL Safe
file_type: Type of data being uploaded
upload_client: Conjure upload client
chunk_size: Maximum size of chunk to upload to S3 at once
max_workers: Number of worker threads to use when processing and uploading data

Returns: Path to the uploaded object in S3

Note: see put_multipart_upload for more details

"""
urlsafe_name = urllib.parse.quote_plus(name)
safe_filename = f"{urlsafe_name}{file_type.extension}"
return put_multipart_upload(
auth_header,
f,
safe_filename,
file_type.mimetype,
upload_client,
chunk_size=chunk_size,
max_workers=max_workers,
)


def upload_multipart_file(
auth_header: str,
file: pathlib.Path,
upload_client: upload_api.UploadService,
file_type: FileType | None = None,
chunk_size: int = DEFAULT_CHUNK_SIZE,
max_workers: int = DEFAULT_NUM_WORKERS,
) -> str:
"""Execute a multipart upload to S3 proxied via Nominal servers.

Args:
auth_header: Nominal authorization token
file: File to upload to S3
upload_client: Conjure upload client
file_type: Manually override inferred file type for the given file
chunk_size: Maximum size of chunk to upload to S3 at once
max_workers: Number of worker threads to use when processing and uploading data

Returns: Path to the uploaded object in S3

Note: see put_multipart_upload for more details

"""
if file_type is None:
file_type = FileType.from_path(file)

file_name = file.stem.split(".")[0]
with file.open("rb") as file_handle:
return upload_multipart_io(
auth_header,
file_handle,
file_name,
file_type,
upload_client,
chunk_size=chunk_size,
max_workers=max_workers,
)


def _abort(upload_client: upload_api.UploadService, auth_header: str, key: str, upload_id: str, e: Exception) -> None:
logger.error(
"aborting multipart upload due to an exception", exc_info=e, extra={"key": key, "upload_id": upload_id}
Expand Down
130 changes: 88 additions & 42 deletions nominal/core/client.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from __future__ import annotations

import urllib.parse
import json
import logging
from dataclasses import dataclass, field
from datetime import datetime
from io import TextIOBase
from io import BytesIO, TextIOBase, TextIOWrapper
from pathlib import Path
from typing import BinaryIO, Iterable, Mapping, Sequence

Expand Down Expand Up @@ -34,7 +35,7 @@
)
from nominal.core._clientsbunch import ClientsBunch
from nominal.core._conjure_utils import _available_units, _build_unit_update
from nominal.core._multipart import put_multipart_upload
from nominal.core._multipart import upload_multipart_file, upload_multipart_io
from nominal.core._utils import construct_user_agent_string, rid_from_instance_or_string
from nominal.core.attachment import Attachment, _iter_get_attachments
from nominal.core.channel import Channel
Expand All @@ -58,6 +59,8 @@

from .asset import Asset

logger = logging.getLogger(__name__)


@dataclass(frozen=True)
class NominalClient:
Expand Down Expand Up @@ -235,13 +238,12 @@ def create_mcap_dataset(
)

mcap_path = Path(path)
file_type = FileTypes.MCAP
urlsafe_name = urllib.parse.quote_plus(mcap_path.stem)
filename = f"{urlsafe_name}{file_type.extension}"
with open(mcap_path, "rb") as f:
s3_path = put_multipart_upload(
self._clients.auth_header, f, filename, file_type.mimetype, self._clients.upload
)
s3_path = upload_multipart_file(
self._clients.auth_header,
mcap_path,
self._clients.upload,
file_type=FileTypes.MCAP,
)
source = ingest_api.IngestSource(s3=ingest_api.S3IngestSource(path=s3_path))
request = ingest_api.IngestMcapRequest(
channel_config=[],
Expand Down Expand Up @@ -288,12 +290,7 @@ def create_dataset_from_io(
raise TypeError(f"dataset {dataset} must be open in binary mode, rather than text mode")

file_type = FileType(*file_type)
urlsafe_name = urllib.parse.quote_plus(name)
filename = f"{urlsafe_name}{file_type.extension}"

s3_path = put_multipart_upload(
self._clients.auth_header, dataset, filename, file_type.mimetype, self._clients.upload
)
s3_path = upload_multipart_io(self._clients.auth_header, dataset, name, file_type, self._clients.upload)
request = ingest_api.TriggerFileIngest(
destination=ingest_api.IngestDestination(
new_dataset=ingest_api.NewDatasetIngestDestination(
Expand All @@ -319,36 +316,89 @@ def create_video_from_io(
self,
video: BinaryIO,
name: str,
start: datetime | IntegralNanosecondsUTC,
start: datetime | IntegralNanosecondsUTC | None = None,
frame_timestamps: Sequence[IntegralNanosecondsUTC] | None = None,
description: str | None = None,
file_type: tuple[str, str] | FileType = FileTypes.MP4,
*,
labels: Sequence[str] = (),
properties: Mapping[str, str] | None = None,
) -> Video:
"""Create a video from a file-like object.

The video must be a file-like object in binary mode, e.g. open(path, "rb") or io.BytesIO.

Args:
----
video: file-like object to read video data from
name: Name of the video to create in Nominal
start: Starting timestamp of the video
frame_timestamps: Per-frame timestamps (in nanoseconds since unix epoch) for every frame of the video
description: Description of the video to create in nominal
file_type: Type of data being uploaded
labels: Labels to apply to the video in nominal
properties: Properties to apply to the video in nominal

Returns:
-------
Handle to the created video

Note:
----
Exactly one of 'start' and 'frame_timestamps' **must** be provided. Most users will
want to provide a starting timestamp: frame_timestamps is primarily useful when the scale
of the video data is not 1:1 with the playback speed or non-uniform over the course of the video,
for example, 200fps video artificially slowed to 30 fps without dropping frames. This will result
in the playhead on charts within the product playing at the rate of the underlying data rather than
time elapsed in the video playback.

"""
if (start is None and frame_timestamps is None) or (None not in (start, frame_timestamps)):
raise ValueError("One of 'start' or 'frame_timestamps' must be provided")

if isinstance(video, TextIOBase):
raise TypeError(f"video {video} must be open in binary mode, rather than text mode")

file_type = FileType(*file_type)
urlsafe_name = urllib.parse.quote_plus(name)
filename = f"{urlsafe_name}{file_type.extension}"
if start is None:
# Dump timestamp array into an in-memory file-like IO object
json_io = BytesIO()
text_json_io = TextIOWrapper(json_io)
json.dump(frame_timestamps, text_json_io)
text_json_io.flush()
json_io.seek(0)

logger.debug("Uploading timestamp manifests to s3")
manifest_s3_path = upload_multipart_io(
self._clients.auth_header,
json_io,
"timestamp_manifest",
FileTypes.JSON,
self._clients.upload,
)
timestamp_manifest = ingest_api.VideoTimestampManifest(
timestamp_manifests=ingest_api.TimestampManifest(
sources=[
ingest_api.IngestSource(
s3=ingest_api.S3IngestSource(
path=manifest_s3_path,
)
)
]
)
)
else:
timestamp_manifest = ingest_api.VideoTimestampManifest(
no_manifest=ingest_api.NoTimestampManifest(
starting_timestamp=_SecondsNanos.from_flexible(start).to_ingest_api()
)
)

s3_path = put_multipart_upload(
self._clients.auth_header, video, filename, file_type.mimetype, self._clients.upload
)
file_type = FileType(*file_type)
s3_path = upload_multipart_io(self._clients.auth_header, video, name, file_type, self._clients.upload)
request = ingest_api.IngestVideoRequest(
labels=list(labels),
properties={} if properties is None else dict(properties),
sources=[ingest_api.IngestSource(s3=ingest_api.S3IngestSource(path=s3_path))],
timestamps=ingest_api.VideoTimestampManifest(
no_manifest=ingest_api.NoTimestampManifest(
starting_timestamp=_SecondsNanos.from_flexible(start).to_ingest_api()
)
),
timestamps=timestamp_manifest,
description=description,
title=name,
)
Expand Down Expand Up @@ -481,11 +531,12 @@ def create_attachment_from_io(
raise TypeError(f"attachment {attachment} must be open in binary mode, rather than text mode")

file_type = FileType(*file_type)
urlsafe_name = urllib.parse.quote_plus(name)
filename = f"{urlsafe_name}{file_type.extension}"

s3_path = put_multipart_upload(
self._clients.auth_header, attachment, filename, file_type.mimetype, self._clients.upload
s3_path = upload_multipart_io(
self._clients.auth_header,
attachment,
name,
file_type,
self._clients.upload,
)
request = attachments_api.CreateAttachmentRequest(
description=description or "",
Expand Down Expand Up @@ -520,7 +571,7 @@ def get_unit(self, unit_symbol: str) -> Unit | None:
NOTE: This currently requires that units are formatted as laid out in
the latest UCUM standards (see https://ucum.org/ucum)

Returns
Returns:
-------
Rendered Unit metadata if the symbol is valid and supported by Nominal, or None
if no such unit symbol matches.
Expand Down Expand Up @@ -556,7 +607,7 @@ def set_channel_units(self, rids_to_types: Mapping[str, str | None]) -> Sequence
rids_to_types: Mapping of channel RIDs -> unit symbols (e.g. 'm/s').
NOTE: Providing `None` as the unit symbol clears any existing units for the channels.

Returns
Returns:
-------
A sequence of metadata for all updated channels
Raises:
Expand Down Expand Up @@ -604,12 +655,7 @@ def create_video_from_mcap_io(
raise TypeError(f"dataset {mcap} must be open in binary mode, rather than text mode")

file_type = FileType(*file_type)
urlsafe_name = urllib.parse.quote_plus(name)
filename = f"{urlsafe_name}{file_type.extension}"

s3_path = put_multipart_upload(
self._clients.auth_header, mcap, filename, file_type.mimetype, self._clients.upload
)
s3_path = upload_multipart_io(self._clients.auth_header, mcap, name, file_type, self._clients.upload)
request = ingest_api.IngestMcapRequest(
channel_config=[
ingest_api.McapChannelConfig(
Expand Down
Loading