Skip to content

feat: add video support #42

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion nominal/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .core import Attachment, Dataset, NominalClient, Run
from .core import Attachment, Dataset, NominalClient, Run, Video
from .nominal import (
create_run,
create_run_csv,
Expand All @@ -7,12 +7,14 @@
get_dataset,
get_default_client,
get_run,
get_video,
search_runs,
set_base_url,
upload_attachment,
upload_csv,
upload_pandas,
upload_polars,
upload_video,
)

__all__ = [
Expand All @@ -29,8 +31,11 @@
"upload_attachment",
"get_attachment",
"download_attachment",
"upload_video",
"get_video",
"Dataset",
"Run",
"Attachment",
"NominalClient",
"Video",
]
33 changes: 20 additions & 13 deletions nominal/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import BinaryIO, Iterable, Iterator, Literal, NamedTuple, TypeVar, Union
from typing import BinaryIO, Iterable, Iterator, Literal, NamedTuple, Type, TypeVar, Union

import dateutil.parser
from typing_extensions import TypeAlias # typing.TypeAlias in 3.10+
Expand Down Expand Up @@ -71,21 +71,29 @@ def _timestamp_type_to_conjure_ingest_api(
raise ValueError(f"invalid timestamp type: {ts_type}")


def _flexible_time_to_conjure_scout_run_api(
def _flexible_time_to_conjure_scout_run_api(timestamp: datetime | IntegralNanosecondsUTC) -> scout_run_api.UtcTimestamp:
seconds, nanos = _flexible_time_to_seconds_nanos(timestamp)
return scout_run_api.UtcTimestamp(seconds_since_epoch=seconds, offset_nanoseconds=nanos)


def _flexible_time_to_conjure_ingest_api(
timestamp: datetime | IntegralNanosecondsUTC,
) -> ingest_api.UtcTimestamp:
seconds, nanos = _flexible_time_to_seconds_nanos(timestamp)
return ingest_api.UtcTimestamp(seconds_since_epoch=seconds, offset_nanoseconds=nanos)


def _flexible_time_to_seconds_nanos(
timestamp: datetime | IntegralNanosecondsUTC,
) -> scout_run_api.UtcTimestamp:
) -> tuple[int, int]:
if isinstance(timestamp, datetime):
seconds, nanos = _datetime_to_seconds_nanos(timestamp)
return scout_run_api.UtcTimestamp(seconds_since_epoch=seconds, offset_nanoseconds=nanos)
return _datetime_to_seconds_nanos(timestamp)
elif isinstance(timestamp, IntegralNanosecondsUTC):
seconds, nanos = divmod(timestamp, 1_000_000_000)
return scout_run_api.UtcTimestamp(seconds_since_epoch=seconds, offset_nanoseconds=nanos)
return divmod(timestamp, 1_000_000_000)
raise TypeError(f"expected {datetime} or {IntegralNanosecondsUTC}, got {type(timestamp)}")


def _conjure_time_to_integral_nanoseconds(
ts: scout_run_api.UtcTimestamp,
) -> IntegralNanosecondsUTC:
def _conjure_time_to_integral_nanoseconds(ts: scout_run_api.UtcTimestamp) -> IntegralNanosecondsUTC:
return ts.seconds_since_epoch * 1_000_000_000 + (ts.offset_nanoseconds or 0)


Expand All @@ -97,9 +105,7 @@ def _datetime_to_seconds_nanos(dt: datetime) -> tuple[int, int]:


def _datetime_to_integral_nanoseconds(dt: datetime) -> IntegralNanosecondsUTC:
dt = dt.astimezone(timezone.utc)
seconds = int(dt.timestamp())
nanos = dt.microsecond * 1000
seconds, nanos = _datetime_to_seconds_nanos(dt)
return seconds * 1_000_000_000 + nanos


Expand Down Expand Up @@ -169,6 +175,7 @@ class FileTypes:
CSV_GZ: FileType = FileType(".csv.gz", "text/csv")
# https://issues.apache.org/jira/browse/PARQUET-1889
PARQUET: FileType = FileType(".parquet", "application/vnd.apache.parquet")
MP4: FileType = FileType(".mp4", "video/mp4")
BINARY: FileType = FileType("", "application/octet-stream")


Expand Down
167 changes: 157 additions & 10 deletions nominal/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,16 @@

from nominal import _config

from ._api.combined import attachments_api, ingest_api, scout, scout_catalog, scout_run_api, upload_api
from ._api.combined import (
attachments_api,
ingest_api,
scout,
scout_catalog,
scout_run_api,
scout_video,
scout_video_api,
upload_api,
)
from ._multipart import put_multipart_upload
from ._utils import (
CustomTimestampFormat,
Expand All @@ -25,6 +34,7 @@
IntegralNanosecondsUTC,
TimestampColumnType,
_conjure_time_to_integral_nanoseconds,
_flexible_time_to_conjure_ingest_api,
_flexible_time_to_conjure_scout_run_api,
_timestamp_type_to_conjure_ingest_api,
construct_user_agent_string,
Expand All @@ -37,6 +47,7 @@
"Run",
"Dataset",
"Attachment",
"Video",
"IntegralNanosecondsUTC",
"CustomTimestampFormat",
]
Expand Down Expand Up @@ -349,11 +360,7 @@ def write(self, path: Path, mkdir: bool = True) -> None:
shutil.copyfileobj(self.get_contents(), wf)

@classmethod
def _from_conjure(
cls,
client: NominalClient,
attachment: attachments_api.Attachment,
) -> Self:
def _from_conjure(cls, client: NominalClient, attachment: attachments_api.Attachment) -> Self:
return cls(
rid=attachment.rid,
name=attachment.title,
Expand All @@ -364,6 +371,90 @@ def _from_conjure(
)


@dataclass(frozen=True)
class Video:
rid: str
name: str
description: str | None
properties: Mapping[str, str]
labels: Sequence[str]
_client: NominalClient = field(repr=False)

def poll_until_ingestion_completed(self, interval: timedelta = timedelta(seconds=1)) -> None:
"""Block until video ingestion has completed.
This method polls Nominal for ingest status after uploading a video on an interval.

Raises:
NominalIngestFailed: if the ingest failed
NominalIngestError: if the ingest status is not known
"""

while True:
progress = self._client._video_client.get_ingest_status(self._client._auth_header, self.rid)
if progress.type == "success":
return
elif progress.type == "inProgress": # "type" strings are camelCase
pass
elif progress.type == "error":
error = progress.error
if error is not None:
error_messages = ", ".join([e.message for e in error.errors])
error_types = ", ".join([e.error_type for e in error.errors])
raise NominalIngestFailed(f"ingest failed for video {self.rid!r}: {error_messages} ({error_types})")
raise NominalIngestError(
f"ingest status type marked as 'error' but with no instance for video {self.rid!r}"
)
else:
raise NominalIngestError(f"unhandled ingest status {progress.type!r} for video {self.rid!r}")
time.sleep(interval.total_seconds())

def update(
self,
*,
name: str | None = None,
description: str | None = None,
properties: Mapping[str, str] | None = None,
labels: Sequence[str] | None = None,
) -> Self:
"""Replace video metadata.
Updates the current instance, and returns it.

Only the metadata passed in will be replaced, the rest will remain untouched.

Note: This replaces the metadata rather than appending it. To append to labels or properties, merge them before
calling this method. E.g.:

new_labels = ["new-label-a", "new-label-b"]
for old_label in video.labels:
new_labels.append(old_label)
video = video.update(labels=new_labels)
"""
# TODO(alkasm): properties SHOULD be optional here, but they're not.
# For uniformity with other methods, will always "update" with current props on the client.
request = scout_video_api.UpdateVideoMetadataRequest(
description=description,
labels=None if labels is None else list(labels),
title=name,
properties=dict(self.properties if properties is None else properties),
)
response = self._client._video_client.update_metadata(self._client._auth_header, request, self.rid)

video = self.__class__._from_conjure(self._client, response)
update_dataclass(self, video, fields=self.__dataclass_fields__)
return self

@classmethod
def _from_conjure(cls, client: NominalClient, video: scout_video_api.Video) -> Self:
return cls(
rid=video.rid,
name=video.title,
description=video.description,
properties=MappingProxyType(video.properties),
labels=tuple(video.labels),
_client=client,
)


@dataclass(frozen=True)
class NominalClient:
_auth_header: str = field(repr=False)
Expand All @@ -372,6 +463,7 @@ class NominalClient:
_ingest_client: ingest_api.IngestService = field(repr=False)
_catalog_client: scout_catalog.CatalogService = field(repr=False)
_attachment_client: attachments_api.AttachmentService = field(repr=False)
_video_client: scout_video.VideoService = field(repr=False)

@classmethod
def create(cls, base_url: str, token: str | None, trust_store_path: str | None = None) -> Self:
Expand All @@ -393,6 +485,7 @@ def create(cls, base_url: str, token: str | None, trust_store_path: str | None =
ingest_client = RequestsClient.create(ingest_api.IngestService, agent, cfg)
catalog_client = RequestsClient.create(scout_catalog.CatalogService, agent, cfg)
attachment_client = RequestsClient.create(attachments_api.AttachmentService, agent, cfg)
video_client = RequestsClient.create(scout_video.VideoService, agent, cfg)
auth_header = f"Bearer {token}"
return cls(
_auth_header=auth_header,
Expand All @@ -401,6 +494,7 @@ def create(cls, base_url: str, token: str | None, trust_store_path: str | None =
_ingest_client=ingest_client,
_catalog_client=catalog_client,
_attachment_client=attachment_client,
_video_client=video_client,
)

def create_run(
Expand Down Expand Up @@ -490,7 +584,7 @@ def create_dataset_from_io(
name: str,
timestamp_column: str,
timestamp_type: TimestampColumnType,
file_type: FileType = FileTypes.CSV,
file_type: tuple[str, str] | FileType = FileTypes.CSV,
description: str | None = None,
*,
labels: Sequence[str] = (),
Expand Down Expand Up @@ -536,6 +630,59 @@ def create_dataset_from_io(
response = self._ingest_client.trigger_file_ingest(self._auth_header, request)
return self.get_dataset(response.dataset_rid)

def create_video_from_io(
self,
video: BinaryIO,
name: str,
start: datetime | IntegralNanosecondsUTC,
description: str | None = None,
file_type: tuple[str, str] | FileType = FileTypes.MP4,
*,
labels: Sequence[str] = (),
properties: Mapping[str, str] | None = None,
) -> Video:
"""Create a video from a file-like object.

The video must be a file-like object in binary mode, e.g. open(path, "rb") or io.BytesIO.
"""
if isinstance(video, TextIOBase):
raise TypeError(f"video {video} must be open in binary mode, rather than text mode")

file_type = FileType(*file_type)
urlsafe_name = urllib.parse.quote_plus(name)
filename = f"{urlsafe_name}{file_type.extension}"

s3_path = put_multipart_upload(self._auth_header, video, filename, file_type.mimetype, self._upload_client)
request = ingest_api.IngestVideoRequest(
labels=list(labels),
properties={} if properties is None else dict(properties),
sources=[ingest_api.IngestSource(s3=ingest_api.S3IngestSource(path=s3_path))],
timestamps=ingest_api.VideoTimestampManifest(
no_manifest=ingest_api.NoTimestampManifest(
starting_timestamp=_flexible_time_to_conjure_ingest_api(start)
)
),
description=description,
title=name,
)
response = self._ingest_client.ingest_video(self._auth_header, request)
return self.get_video(response.video_rid)

def get_video(self, video: Video | str) -> Video:
"""Retrieve a video by video or video RID."""
video_rid = _rid_from_instance_or_string(video)
response = self._video_client.get(self._auth_header, video_rid)
return Video._from_conjure(self, response)

def _iter_get_videos(self, video_rids: Iterable[str]) -> Iterable[Video]:
request = scout_video_api.GetVideosRequest(video_rids=list(video_rids))
for response in self._video_client.batch_get(self._auth_header, request).responses:
yield Video._from_conjure(self, response)

def get_videos(self, videos: Iterable[Video] | Iterable[str]) -> Sequence[Video]:
"""Retrieve videos by video or video RID."""
return list(self._iter_get_videos(_rid_from_instance_or_string(v) for v in videos))

def get_dataset(self, dataset: Dataset | str) -> Dataset:
"""Retrieve a dataset by dataset or dataset RID."""
dataset_rid = _rid_from_instance_or_string(dataset)
Expand Down Expand Up @@ -571,7 +718,7 @@ def create_attachment_from_io(
self,
attachment: BinaryIO,
name: str,
file_type: FileType = FileTypes.BINARY,
file_type: tuple[str, str] | FileType = FileTypes.BINARY,
description: str | None = None,
*,
properties: Mapping[str, str] | None = None,
Expand Down Expand Up @@ -636,10 +783,10 @@ def _get_dataset(
return datasets[0]


def _rid_from_instance_or_string(value: Attachment | Run | Dataset | str) -> str:
def _rid_from_instance_or_string(value: Attachment | Run | Dataset | Video | str) -> str:
if isinstance(value, str):
return value
elif isinstance(value, (Attachment, Run, Dataset)):
elif isinstance(value, (Attachment, Run, Dataset, Video)):
return value.rid
elif hasattr(value, "rid"):
return value.rid
Expand Down
19 changes: 18 additions & 1 deletion nominal/nominal.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
_parse_timestamp,
reader_writer,
)
from .core import Attachment, Dataset, NominalClient, Run
from .core import Attachment, Dataset, NominalClient, Run, Video

if TYPE_CHECKING:
import pandas as pd
Expand Down Expand Up @@ -292,6 +292,23 @@ def download_attachment(rid: str, file: Path | str) -> None:
attachment.write(Path(file))


def upload_video(
file: Path | str, name: str, start: datetime | str | IntegralNanosecondsUTC, description: str | None = None
) -> Video:
"""Upload a video to Nominal from a file."""
conn = get_default_client()
path = Path(file)
file_type = FileType.from_path(path)
with open(file, "rb") as f:
return conn.create_video_from_io(f, name, _parse_timestamp(start), description, file_type)


def get_video(rid: str) -> Video:
"""Retrieve a video from the Nominal platform by its RID."""
conn = get_default_client()
return conn.get_video(rid)


def _get_start_end_timestamp_csv_file(
file: Path | str, timestamp_column: str, timestamp_type: TimestampColumnType
) -> tuple[IntegralNanosecondsUTC, IntegralNanosecondsUTC]:
Expand Down
Loading
Loading