From 06720fc76b5eb929c1f38e60e85de572cc7acd62 Mon Sep 17 00:00:00 2001 From: Max Najork Date: Fri, 13 Dec 2024 17:04:45 -0800 Subject: [PATCH] feat: add channel name delimiter to file ingest --- nominal/cli/dataset.py | 8 ++++++++ nominal/core/client.py | 43 +++++++++++++++++++++++++----------------- nominal/nominal.py | 16 +++++++++++++++- 3 files changed, 49 insertions(+), 18 deletions(-) diff --git a/nominal/cli/dataset.py b/nominal/cli/dataset.py index 2cd105ba..aee4204d 100644 --- a/nominal/cli/dataset.py +++ b/nominal/cli/dataset.py @@ -43,6 +43,12 @@ def dataset_cmd() -> None: help="interpretation the primary timestamp column", ) @click.option("-d", "--desc") +@click.option( + "--channel-name-delimiter", + default=".", + show_default=True, + help="the character used to delimit the hierarchy in the channel name", +) @click.option("--wait/--no-wait", default=True, help="wait until the upload is complete") @client_options @global_options @@ -52,6 +58,7 @@ def upload_csv( timestamp_column: str, timestamp_type: _LiteralAbsolute, desc: str | None, + channel_name_delimiter: str | None, wait: bool, client: NominalClient, ) -> None: @@ -64,6 +71,7 @@ def upload_csv( timestamp_column=timestamp_column, timestamp_type=timestamp_type, description=desc, + prefix_tree_delimiter=channel_name_delimiter, ) # block until ingestion completed, if requested diff --git a/nominal/core/client.py b/nominal/core/client.py index 12457421..377c7b74 100644 --- a/nominal/core/client.py +++ b/nominal/core/client.py @@ -180,6 +180,7 @@ def create_csv_dataset( *, labels: Sequence[str] = (), properties: Mapping[str, str] | None = None, + prefix_tree_delimiter: str | None = None, ) -> Dataset: """Create a dataset from a CSV file. @@ -188,7 +189,14 @@ def create_csv_dataset( See `create_dataset_from_io` for more details. """ return self.create_tabular_dataset( - path, name, timestamp_column, timestamp_type, description, labels=labels, properties=properties + path, + name, + timestamp_column, + timestamp_type, + description, + labels=labels, + properties=properties, + prefix_tree_delimiter=prefix_tree_delimiter, ) def create_tabular_dataset( @@ -201,6 +209,7 @@ def create_tabular_dataset( *, labels: Sequence[str] = (), properties: Mapping[str, str] | None = None, + prefix_tree_delimiter: str | None = None, ) -> Dataset: """Create a dataset from a table-like file (CSV, parquet, etc.). @@ -223,6 +232,7 @@ def create_tabular_dataset( description=description, labels=labels, properties=properties, + prefix_tree_delimiter=prefix_tree_delimiter, ) def create_mcap_dataset( @@ -293,6 +303,7 @@ def create_dataset_from_io( *, labels: Sequence[str] = (), properties: Mapping[str, str] | None = None, + prefix_tree_delimiter: str | None = None, ) -> Dataset: """Create a dataset from a file-like object. The dataset must be a file-like object in binary mode, e.g. open(path, "rb") or io.BytesIO. @@ -310,25 +321,23 @@ def create_dataset_from_io( file_type = FileType(*file_type) s3_path = upload_multipart_io(self._clients.auth_header, dataset, name, file_type, self._clients.upload) - request = ingest_api.TriggerFileIngest( - destination=ingest_api.IngestDestination( - new_dataset=ingest_api.NewDatasetIngestDestination( - labels=list(labels), - properties={} if properties is None else dict(properties), - channel_config=None, # TODO(alkasm): support offsets - dataset_description=description, - dataset_name=name, - ) - ), + request = ingest_api.TriggerIngest( + labels=list(labels), + properties={} if properties is None else dict(properties), source=ingest_api.IngestSource(s3=ingest_api.S3IngestSource(path=s3_path)), - source_metadata=ingest_api.IngestSourceMetadata( - timestamp_metadata=ingest_api.TimestampMetadata( - series_name=timestamp_column, - timestamp_type=_to_typed_timestamp_type(timestamp_type)._to_conjure_ingest_api(), - ), + channel_config=( + None + if prefix_tree_delimiter is None + else ingest_api.ChannelConfig(prefix_tree_delimiter=prefix_tree_delimiter) + ), + dataset_description=description, + dataset_name=name, + timestamp_metadata=ingest_api.TimestampMetadata( + series_name=timestamp_column, + timestamp_type=_to_typed_timestamp_type(timestamp_type)._to_conjure_ingest_api(), ), ) - response = self._clients.ingest.trigger_file_ingest(self._clients.auth_header, request) + response = self._clients.ingest.trigger_ingest(self._clients.auth_header, request) return self.get_dataset(response.dataset_rid) def create_video( diff --git a/nominal/nominal.py b/nominal/nominal.py index 8eb51460..4491a588 100644 --- a/nominal/nominal.py +++ b/nominal/nominal.py @@ -125,6 +125,7 @@ def upload_pandas( timestamp_column: str, timestamp_type: ts._AnyTimestampType, description: str | None = None, + channel_name_delimiter: str | None = None, *, wait_until_complete: bool = True, ) -> Dataset: @@ -153,6 +154,7 @@ def write_and_close(df: pd.DataFrame, w: BinaryIO) -> None: timestamp_type=timestamp_type, file_type=FileTypes.CSV, description=description, + prefix_tree_delimiter=channel_name_delimiter, ) t.join() if wait_until_complete: @@ -166,6 +168,7 @@ def upload_polars( timestamp_column: str, timestamp_type: ts._AnyTimestampType, description: str | None = None, + channel_name_delimiter: str | None = None, *, wait_until_complete: bool = True, ) -> Dataset: @@ -192,6 +195,7 @@ def write_and_close(df: pl.DataFrame, w: BinaryIO) -> None: timestamp_type=timestamp_type, file_type=FileTypes.CSV, description=description, + prefix_tree_delimiter=channel_name_delimiter, ) t.join() if wait_until_complete: @@ -205,6 +209,7 @@ def upload_csv( timestamp_column: str, timestamp_type: ts._AnyTimestampType, description: str | None = None, + channel_name_delimiter: str | None = None, *, wait_until_complete: bool = True, ) -> Dataset: @@ -218,7 +223,14 @@ def upload_csv( """ conn = get_default_client() return _upload_csv( - conn, file, name, timestamp_column, timestamp_type, description, wait_until_complete=wait_until_complete + conn, + file, + name, + timestamp_column, + timestamp_type, + description, + channel_name_delimiter, + wait_until_complete=wait_until_complete, ) @@ -229,6 +241,7 @@ def _upload_csv( timestamp_column: str, timestamp_type: ts._AnyTimestampType, description: str | None = None, + channel_name_delimiter: str | None = None, *, wait_until_complete: bool = True, ) -> Dataset: @@ -238,6 +251,7 @@ def _upload_csv( timestamp_column=timestamp_column, timestamp_type=timestamp_type, description=description, + prefix_tree_delimiter=channel_name_delimiter, ) if wait_until_complete: dataset.poll_until_ingestion_completed()