diff --git a/nominal/__init__.py b/nominal/__init__.py index 4a484d86..aefd9d43 100644 --- a/nominal/__init__.py +++ b/nominal/__init__.py @@ -1,4 +1,4 @@ -from .core import Attachment, Dataset, NominalClient, Run +from .core import Attachment, Dataset, NominalClient, Run, Video from .nominal import ( create_run, create_run_csv, @@ -7,12 +7,14 @@ get_dataset, get_default_client, get_run, + get_video, search_runs, set_base_url, upload_attachment, upload_csv, upload_pandas, upload_polars, + upload_video, ) __all__ = [ @@ -29,8 +31,11 @@ "upload_attachment", "get_attachment", "download_attachment", + "upload_video", + "get_video", "Dataset", "Run", "Attachment", "NominalClient", + "Video", ] diff --git a/nominal/_utils.py b/nominal/_utils.py index 9866f650..bef4c10c 100644 --- a/nominal/_utils.py +++ b/nominal/_utils.py @@ -7,7 +7,7 @@ from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path -from typing import BinaryIO, Iterable, Iterator, Literal, NamedTuple, TypeVar, Union +from typing import BinaryIO, Iterable, Iterator, Literal, NamedTuple, Type, TypeVar, Union import dateutil.parser from typing_extensions import TypeAlias # typing.TypeAlias in 3.10+ @@ -71,21 +71,29 @@ def _timestamp_type_to_conjure_ingest_api( raise ValueError(f"invalid timestamp type: {ts_type}") -def _flexible_time_to_conjure_scout_run_api( +def _flexible_time_to_conjure_scout_run_api(timestamp: datetime | IntegralNanosecondsUTC) -> scout_run_api.UtcTimestamp: + seconds, nanos = _flexible_time_to_seconds_nanos(timestamp) + return scout_run_api.UtcTimestamp(seconds_since_epoch=seconds, offset_nanoseconds=nanos) + + +def _flexible_time_to_conjure_ingest_api( + timestamp: datetime | IntegralNanosecondsUTC, +) -> ingest_api.UtcTimestamp: + seconds, nanos = _flexible_time_to_seconds_nanos(timestamp) + return ingest_api.UtcTimestamp(seconds_since_epoch=seconds, offset_nanoseconds=nanos) + + +def _flexible_time_to_seconds_nanos( timestamp: datetime | IntegralNanosecondsUTC, -) -> scout_run_api.UtcTimestamp: +) -> tuple[int, int]: if isinstance(timestamp, datetime): - seconds, nanos = _datetime_to_seconds_nanos(timestamp) - return scout_run_api.UtcTimestamp(seconds_since_epoch=seconds, offset_nanoseconds=nanos) + return _datetime_to_seconds_nanos(timestamp) elif isinstance(timestamp, IntegralNanosecondsUTC): - seconds, nanos = divmod(timestamp, 1_000_000_000) - return scout_run_api.UtcTimestamp(seconds_since_epoch=seconds, offset_nanoseconds=nanos) + return divmod(timestamp, 1_000_000_000) raise TypeError(f"expected {datetime} or {IntegralNanosecondsUTC}, got {type(timestamp)}") -def _conjure_time_to_integral_nanoseconds( - ts: scout_run_api.UtcTimestamp, -) -> IntegralNanosecondsUTC: +def _conjure_time_to_integral_nanoseconds(ts: scout_run_api.UtcTimestamp) -> IntegralNanosecondsUTC: return ts.seconds_since_epoch * 1_000_000_000 + (ts.offset_nanoseconds or 0) @@ -97,9 +105,7 @@ def _datetime_to_seconds_nanos(dt: datetime) -> tuple[int, int]: def _datetime_to_integral_nanoseconds(dt: datetime) -> IntegralNanosecondsUTC: - dt = dt.astimezone(timezone.utc) - seconds = int(dt.timestamp()) - nanos = dt.microsecond * 1000 + seconds, nanos = _datetime_to_seconds_nanos(dt) return seconds * 1_000_000_000 + nanos @@ -169,6 +175,7 @@ class FileTypes: CSV_GZ: FileType = FileType(".csv.gz", "text/csv") # https://issues.apache.org/jira/browse/PARQUET-1889 PARQUET: FileType = FileType(".parquet", "application/vnd.apache.parquet") + MP4: FileType = FileType(".mp4", "video/mp4") BINARY: FileType = FileType("", "application/octet-stream") diff --git a/nominal/core.py b/nominal/core.py index 57aae19a..d36414e3 100644 --- a/nominal/core.py +++ b/nominal/core.py @@ -16,7 +16,16 @@ from nominal import _config -from ._api.combined import attachments_api, ingest_api, scout, scout_catalog, scout_run_api, upload_api +from ._api.combined import ( + attachments_api, + ingest_api, + scout, + scout_catalog, + scout_run_api, + scout_video, + scout_video_api, + upload_api, +) from ._multipart import put_multipart_upload from ._utils import ( CustomTimestampFormat, @@ -25,6 +34,7 @@ IntegralNanosecondsUTC, TimestampColumnType, _conjure_time_to_integral_nanoseconds, + _flexible_time_to_conjure_ingest_api, _flexible_time_to_conjure_scout_run_api, _timestamp_type_to_conjure_ingest_api, construct_user_agent_string, @@ -37,6 +47,7 @@ "Run", "Dataset", "Attachment", + "Video", "IntegralNanosecondsUTC", "CustomTimestampFormat", ] @@ -350,11 +361,7 @@ def write(self, path: Path, mkdir: bool = True) -> None: shutil.copyfileobj(self.get_contents(), wf) @classmethod - def _from_conjure( - cls, - client: NominalClient, - attachment: attachments_api.Attachment, - ) -> Self: + def _from_conjure(cls, client: NominalClient, attachment: attachments_api.Attachment) -> Self: return cls( rid=attachment.rid, name=attachment.title, @@ -365,6 +372,90 @@ def _from_conjure( ) +@dataclass(frozen=True) +class Video: + rid: str + name: str + description: str | None + properties: Mapping[str, str] + labels: Sequence[str] + _client: NominalClient = field(repr=False) + + def poll_until_ingestion_completed(self, interval: timedelta = timedelta(seconds=1)) -> None: + """Block until video ingestion has completed. + This method polls Nominal for ingest status after uploading a video on an interval. + + Raises: + NominalIngestFailed: if the ingest failed + NominalIngestError: if the ingest status is not known + """ + + while True: + progress = self._client._video_client.get_ingest_status(self._client._auth_header, self.rid) + if progress.type == "success": + return + elif progress.type == "inProgress": # "type" strings are camelCase + pass + elif progress.type == "error": + error = progress.error + if error is not None: + error_messages = ", ".join([e.message for e in error.errors]) + error_types = ", ".join([e.error_type for e in error.errors]) + raise NominalIngestFailed(f"ingest failed for video {self.rid!r}: {error_messages} ({error_types})") + raise NominalIngestError( + f"ingest status type marked as 'error' but with no instance for video {self.rid!r}" + ) + else: + raise NominalIngestError(f"unhandled ingest status {progress.type!r} for video {self.rid!r}") + time.sleep(interval.total_seconds()) + + def update( + self, + *, + name: str | None = None, + description: str | None = None, + properties: Mapping[str, str] | None = None, + labels: Sequence[str] | None = None, + ) -> Self: + """Replace video metadata. + Updates the current instance, and returns it. + + Only the metadata passed in will be replaced, the rest will remain untouched. + + Note: This replaces the metadata rather than appending it. To append to labels or properties, merge them before + calling this method. E.g.: + + new_labels = ["new-label-a", "new-label-b"] + for old_label in video.labels: + new_labels.append(old_label) + video = video.update(labels=new_labels) + """ + # TODO(alkasm): properties SHOULD be optional here, but they're not. + # For uniformity with other methods, will always "update" with current props on the client. + request = scout_video_api.UpdateVideoMetadataRequest( + description=description, + labels=None if labels is None else list(labels), + title=name, + properties=dict(self.properties if properties is None else properties), + ) + response = self._client._video_client.update_metadata(self._client._auth_header, request, self.rid) + + video = self.__class__._from_conjure(self._client, response) + update_dataclass(self, video, fields=self.__dataclass_fields__) + return self + + @classmethod + def _from_conjure(cls, client: NominalClient, video: scout_video_api.Video) -> Self: + return cls( + rid=video.rid, + name=video.title, + description=video.description, + properties=MappingProxyType(video.properties), + labels=tuple(video.labels), + _client=client, + ) + + @dataclass(frozen=True) class NominalClient: _auth_header: str = field(repr=False) @@ -373,6 +464,7 @@ class NominalClient: _ingest_client: ingest_api.IngestService = field(repr=False) _catalog_client: scout_catalog.CatalogService = field(repr=False) _attachment_client: attachments_api.AttachmentService = field(repr=False) + _video_client: scout_video.VideoService = field(repr=False) @classmethod def create(cls, base_url: str, token: str | None, trust_store_path: str | None = None) -> Self: @@ -394,6 +486,7 @@ def create(cls, base_url: str, token: str | None, trust_store_path: str | None = ingest_client = RequestsClient.create(ingest_api.IngestService, agent, cfg) catalog_client = RequestsClient.create(scout_catalog.CatalogService, agent, cfg) attachment_client = RequestsClient.create(attachments_api.AttachmentService, agent, cfg) + video_client = RequestsClient.create(scout_video.VideoService, agent, cfg) auth_header = f"Bearer {token}" return cls( _auth_header=auth_header, @@ -402,6 +495,7 @@ def create(cls, base_url: str, token: str | None, trust_store_path: str | None = _ingest_client=ingest_client, _catalog_client=catalog_client, _attachment_client=attachment_client, + _video_client=video_client, ) def create_run( @@ -523,7 +617,7 @@ def create_dataset_from_io( name: str, timestamp_column: str, timestamp_type: TimestampColumnType, - file_type: FileType = FileTypes.CSV, + file_type: tuple[str, str] | FileType = FileTypes.CSV, description: str | None = None, *, labels: Sequence[str] = (), @@ -569,6 +663,59 @@ def create_dataset_from_io( response = self._ingest_client.trigger_file_ingest(self._auth_header, request) return self.get_dataset(response.dataset_rid) + def create_video_from_io( + self, + video: BinaryIO, + name: str, + start: datetime | IntegralNanosecondsUTC, + description: str | None = None, + file_type: tuple[str, str] | FileType = FileTypes.MP4, + *, + labels: Sequence[str] = (), + properties: Mapping[str, str] | None = None, + ) -> Video: + """Create a video from a file-like object. + + The video must be a file-like object in binary mode, e.g. open(path, "rb") or io.BytesIO. + """ + if isinstance(video, TextIOBase): + raise TypeError(f"video {video} must be open in binary mode, rather than text mode") + + file_type = FileType(*file_type) + urlsafe_name = urllib.parse.quote_plus(name) + filename = f"{urlsafe_name}{file_type.extension}" + + s3_path = put_multipart_upload(self._auth_header, video, filename, file_type.mimetype, self._upload_client) + request = ingest_api.IngestVideoRequest( + labels=list(labels), + properties={} if properties is None else dict(properties), + sources=[ingest_api.IngestSource(s3=ingest_api.S3IngestSource(path=s3_path))], + timestamps=ingest_api.VideoTimestampManifest( + no_manifest=ingest_api.NoTimestampManifest( + starting_timestamp=_flexible_time_to_conjure_ingest_api(start) + ) + ), + description=description, + title=name, + ) + response = self._ingest_client.ingest_video(self._auth_header, request) + return self.get_video(response.video_rid) + + def get_video(self, video: Video | str) -> Video: + """Retrieve a video by video or video RID.""" + video_rid = _rid_from_instance_or_string(video) + response = self._video_client.get(self._auth_header, video_rid) + return Video._from_conjure(self, response) + + def _iter_get_videos(self, video_rids: Iterable[str]) -> Iterable[Video]: + request = scout_video_api.GetVideosRequest(video_rids=list(video_rids)) + for response in self._video_client.batch_get(self._auth_header, request).responses: + yield Video._from_conjure(self, response) + + def get_videos(self, videos: Iterable[Video] | Iterable[str]) -> Sequence[Video]: + """Retrieve videos by video or video RID.""" + return list(self._iter_get_videos(_rid_from_instance_or_string(v) for v in videos)) + def get_dataset(self, dataset: Dataset | str) -> Dataset: """Retrieve a dataset by dataset or dataset RID.""" dataset_rid = _rid_from_instance_or_string(dataset) @@ -604,7 +751,7 @@ def create_attachment_from_io( self, attachment: BinaryIO, name: str, - file_type: FileType = FileTypes.BINARY, + file_type: tuple[str, str] | FileType = FileTypes.BINARY, description: str | None = None, *, properties: Mapping[str, str] | None = None, @@ -669,10 +816,10 @@ def _get_dataset( return datasets[0] -def _rid_from_instance_or_string(value: Attachment | Run | Dataset | str) -> str: +def _rid_from_instance_or_string(value: Attachment | Run | Dataset | Video | str) -> str: if isinstance(value, str): return value - elif isinstance(value, (Attachment, Run, Dataset)): + elif isinstance(value, (Attachment, Run, Dataset, Video)): return value.rid elif hasattr(value, "rid"): return value.rid diff --git a/nominal/nominal.py b/nominal/nominal.py index c06c6f4c..16f09ee5 100644 --- a/nominal/nominal.py +++ b/nominal/nominal.py @@ -17,7 +17,7 @@ _parse_timestamp, reader_writer, ) -from .core import Attachment, Dataset, NominalClient, Run +from .core import Attachment, Dataset, NominalClient, Run, Video if TYPE_CHECKING: import pandas as pd @@ -289,6 +289,23 @@ def download_attachment(rid: str, file: Path | str) -> None: attachment.write(Path(file)) +def upload_video( + file: Path | str, name: str, start: datetime | str | IntegralNanosecondsUTC, description: str | None = None +) -> Video: + """Upload a video to Nominal from a file.""" + conn = get_default_client() + path = Path(file) + file_type = FileType.from_path(path) + with open(file, "rb") as f: + return conn.create_video_from_io(f, name, _parse_timestamp(start), description, file_type) + + +def get_video(rid: str) -> Video: + """Retrieve a video from the Nominal platform by its RID.""" + conn = get_default_client() + return conn.get_video(rid) + + def _get_start_end_timestamp_csv_file( file: Path | str, timestamp_column: str, timestamp_type: TimestampColumnType ) -> tuple[IntegralNanosecondsUTC, IntegralNanosecondsUTC]: diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py index d2cf08f8..eb5bc15c 100644 --- a/tests/e2e/conftest.py +++ b/tests/e2e/conftest.py @@ -1,3 +1,4 @@ +from pathlib import Path from typing import Iterator from unittest import mock @@ -46,7 +47,6 @@ def csv_data(): """ -@pytest.fixture(scope="session") def csv_data2(): return b"""\ timestamp,temperature,humidity @@ -61,3 +61,14 @@ def csv_data2(): 2024-09-05T18:18:00Z,38,32 2024-09-05T18:19:00Z,39,31 """ + + +@pytest.fixture(scope="session") +def mp4_data(): + """From chromium tests: https://github.com/chromium/chromium/blob/main/media/test/data/bear-1280x720.mp4 + + To download: + curl https://raw.githubusercontent.com/chromium/chromium/main/media/test/data/bear-1280x720.mp4 -o data/bear-1280x720.mp4 + """ + path = Path(__file__).parent / "data/bear-1280x720.mp4" + return open(path, "rb").read() diff --git a/tests/e2e/data/bear-1280x720.mp4 b/tests/e2e/data/bear-1280x720.mp4 new file mode 100644 index 00000000..b424a0f7 Binary files /dev/null and b/tests/e2e/data/bear-1280x720.mp4 differ diff --git a/tests/e2e/test_toplevel.py b/tests/e2e/test_toplevel.py index 81390e35..0e4cf5f7 100644 --- a/tests/e2e/test_toplevel.py +++ b/tests/e2e/test_toplevel.py @@ -153,7 +153,7 @@ def test_search_runs(): def test_upload_attachment(csv_data): at_title = f"attachment-{uuid4()}" - at_desc = f"top-level test to add a attachment to a run {uuid4()}" + at_desc = f"top-level test to upload an attachment {uuid4()}" with mock.patch("builtins.open", mock.mock_open(read_data=csv_data)): at = nm.upload_attachment("fake_path.csv", at_title, at_desc) @@ -167,7 +167,7 @@ def test_upload_attachment(csv_data): def test_get_attachment(csv_data): at_title = f"attachment-{uuid4()}" - at_desc = f"top-level test to add a attachment to a run {uuid4()}" + at_desc = f"top-level test to get an attachment {uuid4()}" with mock.patch("builtins.open", mock.mock_open(read_data=csv_data)): at = nm.upload_attachment("fake_path.csv", at_title, at_desc) @@ -182,7 +182,7 @@ def test_get_attachment(csv_data): def test_download_attachment(csv_data): at_title = f"attachment-{uuid4()}" - at_desc = f"top-level test to add a attachment to a run {uuid4()}" + at_desc = f"top-level test to download an attachment {uuid4()}" with mock.patch("builtins.open", mock.mock_open(read_data=csv_data)): at = nm.upload_attachment("fake_path.csv", at_title, at_desc) @@ -191,3 +191,35 @@ def test_download_attachment(csv_data): with mock.patch("builtins.open", return_value=w): nm.download_attachment(at.rid, "fake_path.csv") assert r.read() == csv_data + + +def test_upload_video(mp4_data): + title = f"video-{uuid4()}" + desc = f"top-level test to ingest a video {uuid4()}" + start, _ = _create_random_start_end() + + with mock.patch("builtins.open", mock.mock_open(read_data=mp4_data)): + v = nm.upload_video("fake_path.mp4", title, start, desc) + v.poll_until_ingestion_completed(interval=timedelta(seconds=0.1)) + + assert v.rid != "" + assert v.name == title + assert v.description == desc + assert len(v.properties) == 0 + assert len(v.labels) == 0 + + +def test_get_video(mp4_data): + title = f"video-{uuid4()}" + desc = f"top-level test to get a video {uuid4()}" + start, _ = _create_random_start_end() + + with mock.patch("builtins.open", mock.mock_open(read_data=mp4_data)): + v = nm.upload_video("fake_path.mp4", title, start, desc) + v2 = nm.get_video(v.rid) + + assert v2.rid == v.rid != "" + assert v2.name == v.name == title + assert v2.description == v.description == desc + assert v2.properties == v.properties == {} + assert v2.labels == v.labels == ()