Skip to content

feat: add channel name delimiter to file ingest #169

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions nominal/cli/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ def dataset_cmd() -> None:
help="interpretation the primary timestamp column",
)
@click.option("-d", "--desc")
@click.option(
"--channel-name-delimiter",
default=".",
show_default=True,
help="the character used to delimit the hierarchy in the channel name",
)
@click.option("--wait/--no-wait", default=True, help="wait until the upload is complete")
@client_options
@global_options
Expand All @@ -52,6 +58,7 @@ def upload_csv(
timestamp_column: str,
timestamp_type: _LiteralAbsolute,
desc: str | None,
channel_name_delimiter: str | None,
wait: bool,
client: NominalClient,
) -> None:
Expand All @@ -64,6 +71,7 @@ def upload_csv(
timestamp_column=timestamp_column,
timestamp_type=timestamp_type,
description=desc,
prefix_tree_delimiter=channel_name_delimiter,
)

# block until ingestion completed, if requested
Expand Down
43 changes: 26 additions & 17 deletions nominal/core/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ def create_csv_dataset(
*,
labels: Sequence[str] = (),
properties: Mapping[str, str] | None = None,
prefix_tree_delimiter: str | None = None,
) -> Dataset:
"""Create a dataset from a CSV file.

Expand All @@ -188,7 +189,14 @@ def create_csv_dataset(
See `create_dataset_from_io` for more details.
"""
return self.create_tabular_dataset(
path, name, timestamp_column, timestamp_type, description, labels=labels, properties=properties
path,
name,
timestamp_column,
timestamp_type,
description,
labels=labels,
properties=properties,
prefix_tree_delimiter=prefix_tree_delimiter,
)

def create_tabular_dataset(
Expand All @@ -201,6 +209,7 @@ def create_tabular_dataset(
*,
labels: Sequence[str] = (),
properties: Mapping[str, str] | None = None,
prefix_tree_delimiter: str | None = None,
) -> Dataset:
"""Create a dataset from a table-like file (CSV, parquet, etc.).

Expand All @@ -223,6 +232,7 @@ def create_tabular_dataset(
description=description,
labels=labels,
properties=properties,
prefix_tree_delimiter=prefix_tree_delimiter,
)

def create_mcap_dataset(
Expand Down Expand Up @@ -293,6 +303,7 @@ def create_dataset_from_io(
*,
labels: Sequence[str] = (),
properties: Mapping[str, str] | None = None,
prefix_tree_delimiter: str | None = None,
) -> Dataset:
"""Create a dataset from a file-like object.
The dataset must be a file-like object in binary mode, e.g. open(path, "rb") or io.BytesIO.
Expand All @@ -310,25 +321,23 @@ def create_dataset_from_io(

file_type = FileType(*file_type)
s3_path = upload_multipart_io(self._clients.auth_header, dataset, name, file_type, self._clients.upload)
request = ingest_api.TriggerFileIngest(
destination=ingest_api.IngestDestination(
new_dataset=ingest_api.NewDatasetIngestDestination(
labels=list(labels),
properties={} if properties is None else dict(properties),
channel_config=None, # TODO(alkasm): support offsets
dataset_description=description,
dataset_name=name,
)
),
request = ingest_api.TriggerIngest(
labels=list(labels),
properties={} if properties is None else dict(properties),
source=ingest_api.IngestSource(s3=ingest_api.S3IngestSource(path=s3_path)),
source_metadata=ingest_api.IngestSourceMetadata(
timestamp_metadata=ingest_api.TimestampMetadata(
series_name=timestamp_column,
timestamp_type=_to_typed_timestamp_type(timestamp_type)._to_conjure_ingest_api(),
),
channel_config=(
None
if prefix_tree_delimiter is None
else ingest_api.ChannelConfig(prefix_tree_delimiter=prefix_tree_delimiter)
),
dataset_description=description,
dataset_name=name,
timestamp_metadata=ingest_api.TimestampMetadata(
series_name=timestamp_column,
timestamp_type=_to_typed_timestamp_type(timestamp_type)._to_conjure_ingest_api(),
),
)
response = self._clients.ingest.trigger_file_ingest(self._clients.auth_header, request)
response = self._clients.ingest.trigger_ingest(self._clients.auth_header, request)
return self.get_dataset(response.dataset_rid)

def create_video(
Expand Down
16 changes: 15 additions & 1 deletion nominal/nominal.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ def upload_pandas(
timestamp_column: str,
timestamp_type: ts._AnyTimestampType,
description: str | None = None,
channel_name_delimiter: str | None = None,
*,
wait_until_complete: bool = True,
) -> Dataset:
Expand Down Expand Up @@ -153,6 +154,7 @@ def write_and_close(df: pd.DataFrame, w: BinaryIO) -> None:
timestamp_type=timestamp_type,
file_type=FileTypes.CSV,
description=description,
prefix_tree_delimiter=channel_name_delimiter,
)
t.join()
if wait_until_complete:
Expand All @@ -166,6 +168,7 @@ def upload_polars(
timestamp_column: str,
timestamp_type: ts._AnyTimestampType,
description: str | None = None,
channel_name_delimiter: str | None = None,
*,
wait_until_complete: bool = True,
) -> Dataset:
Expand All @@ -192,6 +195,7 @@ def write_and_close(df: pl.DataFrame, w: BinaryIO) -> None:
timestamp_type=timestamp_type,
file_type=FileTypes.CSV,
description=description,
prefix_tree_delimiter=channel_name_delimiter,
)
t.join()
if wait_until_complete:
Expand All @@ -205,6 +209,7 @@ def upload_csv(
timestamp_column: str,
timestamp_type: ts._AnyTimestampType,
description: str | None = None,
channel_name_delimiter: str | None = None,
*,
wait_until_complete: bool = True,
) -> Dataset:
Expand All @@ -218,7 +223,14 @@ def upload_csv(
"""
conn = get_default_client()
return _upload_csv(
conn, file, name, timestamp_column, timestamp_type, description, wait_until_complete=wait_until_complete
conn,
file,
name,
timestamp_column,
timestamp_type,
description,
channel_name_delimiter,
wait_until_complete=wait_until_complete,
)


Expand All @@ -229,6 +241,7 @@ def _upload_csv(
timestamp_column: str,
timestamp_type: ts._AnyTimestampType,
description: str | None = None,
channel_name_delimiter: str | None = None,
*,
wait_until_complete: bool = True,
) -> Dataset:
Expand All @@ -238,6 +251,7 @@ def _upload_csv(
timestamp_column=timestamp_column,
timestamp_type=timestamp_type,
description=description,
prefix_tree_delimiter=channel_name_delimiter,
)
if wait_until_complete:
dataset.poll_until_ingestion_completed()
Expand Down
Loading