Skip to content

Use IngestProgressV2 for polling for ingestion status #16

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 5, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 24 additions & 12 deletions nominal/sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,22 +159,34 @@ class Dataset:
labels: Sequence[str]
_client: NominalClient = field(repr=False)

def poll_until_ingestion_completed(self, interval: timedelta = timedelta(seconds=2)) -> None:
def poll_until_ingestion_completed(self, interval: timedelta = timedelta(seconds=1)) -> None:
"""Block until dataset ingestion has completed.
This method polls Nominal for ingest status after uploading a dataset on an interval.

Raises:
NominalIngestError: if the ingest status is not known
NominalIngestFailed: if the ingest failed
NominalIngestError: if the ingest status is not known
"""

while True:
dataset = _get_dataset(self._client._auth_header, self._client._catalog_client, self.rid)
if dataset.ingest_status == scout_catalog.IngestStatus.COMPLETED:
progress = self._client._catalog_client.get_ingest_progress_v2(self._client._auth_header, self.rid)
if progress.ingest_status.type == "success":
return
elif dataset.ingest_status == scout_catalog.IngestStatus.FAILED:
raise NominalIngestFailed(f"ingest failed for dataset: {self.rid}")
elif dataset.ingest_status == scout_catalog.IngestStatus.UNKNOWN:
raise NominalIngestError(f"ingest status unknown for dataset: {self.rid}")
elif progress.ingest_status.type == "inProgress": # "type" strings are camelCase
pass
elif progress.ingest_status.type == "error":
error = progress.ingest_status.error
if error is not None:
raise NominalIngestFailed(
f"ingest failed for dataset {self.rid!r}: {error.message} ({error.error_type})"
)
raise NominalIngestError(
f"ingest status type marked as 'error' but with no instance for dataset {self.rid!r}"
)
else:
raise NominalIngestError(
f"unhandled ingest status {progress.ingest_status.type!r} for dataset {self.rid!r}"
)
time.sleep(interval.total_seconds())

def update(
Expand Down Expand Up @@ -226,7 +238,7 @@ def add_to_dataset_from_io(
)

if isinstance(dataset, TextIOBase):
raise TypeError(f"dataset {dataset} must be open in binary mode, rather than text mode")
raise TypeError(f"dataset {dataset!r} must be open in binary mode, rather than text mode")

self.poll_until_ingestion_completed()
urlsafe_name = urllib.parse.quote_plus(self.name)
Expand Down Expand Up @@ -564,9 +576,9 @@ def _get_dataset(
) -> scout_catalog.EnrichedDataset:
datasets = list(_get_datasets(auth_header, client, [dataset_rid]))
if not datasets:
raise ValueError(f"dataset '{dataset_rid}' not found")
raise ValueError(f"dataset {dataset_rid!r} not found")
if len(datasets) > 1:
raise ValueError(f"expected exactly one dataset, got: {len(datasets)}")
raise ValueError(f"expected exactly one dataset, got {len(datasets)}")
return datasets[0]


Expand All @@ -577,4 +589,4 @@ def _rid_from_instance_or_string(value: Attachment | Run | Dataset | str) -> str
return value.rid
elif hasattr(value, "rid"):
return value.rid
raise TypeError("{value} is not a string nor has the attribute 'rid'")
raise TypeError("{value!r} is not a string nor has the attribute 'rid'")