Skip to content

feat: adding ability to specify time column in TDMS groups #146

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Nov 25, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 52 additions & 18 deletions nominal/nominal.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import logging
from datetime import datetime
from functools import cache
from pathlib import Path
Expand Down Expand Up @@ -28,6 +29,7 @@
import pandas as pd
import polars as pl

logger = logging.getLogger(__name__)

_DEFAULT_BASE_URL = "https://api.gov.nominal.io/api"

Expand Down Expand Up @@ -74,15 +76,21 @@ def get_user() -> User:


def upload_tdms(
file: Path | str, name: str | None = None, description: str | None = None, *, wait_until_complete: bool = True
file: Path | str, name: str | None = None, timestamp_column: str | None = None,
timestamp_type: ts._AnyTimestampType | None = None, description: str | None = None,
*, wait_until_complete: bool = True
) -> Dataset:
"""Create a dataset in the Nominal platform from a tdms file.

TDMS channel properties must have both a `wf_increment` and `wf_start_time` property to be included in the dataset.
If `name` is None, the dataset is created with the name of the file with a .csv suffix.

Channels will be named as f"{group_name}.{channel_name}" with spaces replaced with underscores.
If 'timestamp_column' is provided, it must be present in every group and the length of all data columns must be
equal to (and aligned with) with 'timestamp_column'.

If `name` is None, the dataset is created with the name of the file with a .csv suffix.
If 'timestamp_column' is None, TDMS channel properties must have both a `wf_increment` and `wf_start_time`
property to be included in the dataset.

Channels will be named as f"{group_name}.{channel_name}" with spaces replaced with underscores.

If `wait_until_complete=True` (the default), this function waits until the dataset has completed ingestion before
returning. If you are uploading many datasets, set `wait_until_complete=False` instead and call
Expand All @@ -94,38 +102,64 @@ def upload_tdms(

path = Path(file)
with TdmsFile.open(path) as tdms_file:
# identify channels to extract
channels_to_export: dict[str, TdmsChannel] = {}
group: TdmsGroup
df = None

for group in tdms_file.groups():
time_channel = None
channel: TdmsChannel

# pull out timestamp column if expected
if timestamp_column:
for channel in group.channels():
if channel.name == timestamp_column:
time_channel = channel
if not time_channel:
logger.info("Skipping channel group \"%s\" because expected timestamp_column \"%s\" does not exist",
channel.group_name, timestamp_column)
continue

# select channels for export
for channel in group.channels():
# some channels may not have the required properties to construct a time track
if ("wf_increment" in channel.properties) and ("wf_start_time" in channel.properties):
# do not export timestamp column
if channel.name == timestamp_column:
continue

# export IFF timestamp_column provided OR waveform time properties are set
if timestamp_column or (("wf_increment" in channel.properties) and ("wf_start_time" in
channel.properties)):
# skip if unexpected column length
if timestamp_column and len(channel) != len(time_channel):
logger.info("Skipping channel \"%s\" because length does not match \"%s\"",
channel.name, timestamp_column)
continue
channel_name = f"{channel.group_name.replace(' ', '_')}.{channel.name.replace(' ', '_')}"
channels_to_export[channel_name] = channel

df = pd.DataFrame.from_dict(
{
channel_name: pd.Series(
data=channel.read_data(), index=channel.time_track(absolute_time=True, accuracy="ns")
)
for channel_name, channel in channels_to_export.items()
}
)
group_df = pd.DataFrame.from_dict(
{
channel_name: pd.Series(
data=channel.read_data(), index=time_channel.read_data() if timestamp_column else
channel.time_track(absolute_time=True, accuracy="ns")
)
for channel_name, channel in channels_to_export.items()
})
df = group_df if not df else pd.merge(left=df, right=group_df, left_index=True, right_index=True)

# format for nominal upload
time_column = "time_ns"
time_column = timestamp_column if timestamp_column else "time_ns"
df.index = df.index.set_names(time_column, level=None)
df = df.reset_index()
df[time_column] = df[time_column].astype(np.int64)
if not timestamp_column:
df[time_column] = df[time_column].astype(np.int64)

return upload_pandas(
df=df,
name=name if name is not None else path.with_suffix(".csv").name,
description=description,
timestamp_column=time_column,
timestamp_type=ts.EPOCH_NANOSECONDS,
timestamp_type=timestamp_type if timestamp_type else ts.EPOCH_NANOSECONDS,
wait_until_complete=wait_until_complete,
)

Expand Down
Loading