Skip to content

feat: adding ability to specify time column in TDMS groups #146

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Nov 25, 2024
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 109 additions & 37 deletions nominal/nominal.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import logging
from datetime import datetime
from functools import cache
from pathlib import Path
Expand Down Expand Up @@ -28,6 +29,7 @@
import pandas as pd
import polars as pl

logger = logging.getLogger(__name__)

_DEFAULT_BASE_URL = "https://api.gov.nominal.io/api"

Expand Down Expand Up @@ -74,15 +76,20 @@ def get_user() -> User:


def upload_tdms(
file: Path | str, name: str | None = None, description: str | None = None, *, wait_until_complete: bool = True
file: Path | str, name: str | None = None, description: str | None = None, timestamp_column: str | None = None,
timestamp_type: ts._AnyTimestampType | None = None, *, wait_until_complete: bool = True
) -> Dataset:
"""Create a dataset in the Nominal platform from a tdms file.

TDMS channel properties must have both a `wf_increment` and `wf_start_time` property to be included in the dataset.
If `name` is None, the dataset is created with the name of the file with a .csv suffix.

Channels will be named as f"{group_name}.{channel_name}" with spaces replaced with underscores.
If 'timestamp_column' is provided, it must be present in every group and the length of all data columns must be
equal to (and aligned with) with 'timestamp_column'.

If `name` is None, the dataset is created with the name of the file with a .csv suffix.
If 'timestamp_column' is None, TDMS channel properties must have both a `wf_increment` and `wf_start_time`
property to be included in the dataset.

Channels will be named as f"{group_name}.{channel_name}" with spaces replaced with underscores.

If `wait_until_complete=True` (the default), this function waits until the dataset has completed ingestion before
returning. If you are uploading many datasets, set `wait_until_complete=False` instead and call
Expand All @@ -92,42 +99,107 @@ def upload_tdms(
import pandas as pd
from nptdms import TdmsChannel, TdmsFile, TdmsGroup

path = Path(file)
with TdmsFile.open(path) as tdms_file:
# identify channels to extract
channels_to_export: dict[str, TdmsChannel] = {}
group: TdmsGroup
for group in tdms_file.groups():
channel: TdmsChannel
for channel in group.channels():
# some channels may not have the required properties to construct a time track
if ("wf_increment" in channel.properties) and ("wf_start_time" in channel.properties):
def _tdms_upload_with_time_column():
path = Path(file)
with TdmsFile.open(path) as tdms_file:
channels_to_export: dict[str, TdmsChannel] = {}
group: TdmsGroup
df = None

for group in tdms_file.groups():
time_channel = None
channel: TdmsChannel

# pull out timestamp column if expected
for channel in group.channels():
if channel.name == timestamp_column:
time_channel = channel
if not time_channel:
logger.info(
"Skipping channel group \"%s\" because expected timestamp_column \"%s\" does not exist",
channel.group_name, timestamp_column)
continue

# select channels for export
for channel in group.channels():
# do not export timestamp column
if channel.name == timestamp_column:
continue

# skip if unexpected column length
if timestamp_column and len(channel) != len(time_channel):
logger.info("Skipping channel \"%s\" because length does not match \"%s\"",
channel.name, timestamp_column)
continue
channel_name = f"{channel.group_name.replace(' ', '_')}.{channel.name.replace(' ', '_')}"
channels_to_export[channel_name] = channel

df = pd.DataFrame.from_dict(
{
channel_name: pd.Series(
data=channel.read_data(), index=channel.time_track(absolute_time=True, accuracy="ns")
)
for channel_name, channel in channels_to_export.items()
}
)

# format for nominal upload
time_column = "time_ns"
df.index = df.index.set_names(time_column, level=None)
df = df.reset_index()
df[time_column] = df[time_column].astype(np.int64)

return upload_pandas(
df=df,
name=name if name is not None else path.with_suffix(".csv").name,
description=description,
timestamp_column=time_column,
timestamp_type=ts.EPOCH_NANOSECONDS,
wait_until_complete=wait_until_complete,
)
group_df = pd.DataFrame.from_dict(
{
channel_name: pd.Series(
data=channel.read_data(), index=time_channel.read_data()
)
for channel_name, channel in channels_to_export.items()
})
df = group_df if not df else pd.merge(left=df, right=group_df, left_index=True, right_index=True,
how='outer')

# format for nominal upload
time_column = timestamp_column
df.index = df.index.set_names(time_column, level=None)
df = df.reset_index()

return upload_pandas(
df=df,
name=name if name is not None else path.with_suffix(".csv").name,
description=description,
timestamp_column=time_column,
timestamp_type=timestamp_type,
wait_until_complete=wait_until_complete,
)

def _tdms_upload_with_waveform_props():
path = Path(file)
with TdmsFile.open(path) as tdms_file:
channels_to_export: dict[str, TdmsChannel] = {}
group: TdmsGroup

# select channels for export
for group in tdms_file.groups():
channel: TdmsChannel
for channel in group.channels():
# skip channel if it does not have the required waveform properties to construct a time track
if ("wf_increment" in channel.properties) and ("wf_start_time" in channel.properties):
channel_name = f"{channel.group_name.replace(' ', '_')}.{channel.name.replace(' ', '_')}"
channels_to_export[channel_name] = channel

df = pd.DataFrame.from_dict(
{
channel_name: pd.Series(
data=channel.read_data(), index=channel.time_track(absolute_time=True, accuracy="ns")
)
for channel_name, channel in channels_to_export.items()
}
)

# format for nominal upload
time_column = "time_ns"
df.index = df.index.set_names(time_column, level=None)
df = df.reset_index()
df[time_column] = df[time_column].astype(np.int64)

return upload_pandas(
df=df,
name=name if name is not None else path.with_suffix(".csv").name,
description=description,
timestamp_column=time_column,
timestamp_type=ts.EPOCH_NANOSECONDS,
wait_until_complete=wait_until_complete,
)
if timestamp_column:
return _tdms_upload_with_time_column()
else:
return _tdms_upload_with_waveform_props()


def upload_pandas(
Expand Down
Loading