Skip to content

feat: add data-review/batch-initiate endpoint #165

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions nominal/core/_clientsbunch.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
scout_compute_api,
scout_compute_representation_api,
scout_dataexport_api,
scout_datareview_api,
scout_datasource,
scout_datasource_connection,
scout_video,
Expand Down Expand Up @@ -54,6 +55,7 @@ class ClientsBunch:
storage_writer: storage_writer_api.NominalChannelWriterService
template: scout.TemplateService
notebook: scout.NotebookService
datareview: scout_datareview_api.DataReviewService

@classmethod
def from_config(cls, cfg: ServiceConfiguration, agent: str, token: str) -> Self:
Expand Down Expand Up @@ -82,6 +84,7 @@ def from_config(cls, cfg: ServiceConfiguration, agent: str, token: str) -> Self:
storage_writer=client_factory(storage_writer_api.NominalChannelWriterService),
template=client_factory(scout.TemplateService),
notebook=client_factory(scout.NotebookService),
datareview=client_factory(scout_datareview_api.DataReviewService),
)


Expand Down
8 changes: 8 additions & 0 deletions nominal/core/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from nominal.core.channel import Channel
from nominal.core.checklist import Checklist, ChecklistBuilder
from nominal.core.connection import Connection
from nominal.core.data_review import DataReview, DataReviewBatchBuilder
from nominal.core.dataset import Dataset, _get_dataset, _get_datasets
from nominal.core.filetype import FileType, FileTypes
from nominal.core.log import Log, LogSet, _get_log_set
Expand Down Expand Up @@ -818,6 +819,13 @@ def search_assets(
"""
return list(self._iter_search_assets(search_text, label, property))

def create_data_review_batch_builder(self) -> DataReviewBatchBuilder:
return DataReviewBatchBuilder([], [], self._clients)

def get_data_review(self, rid: str) -> DataReview:
response = self._clients.datareview.get(self._clients.auth_header, rid)
return DataReview._from_conjure(self._clients, response)


def _create_search_runs_query(
start: datetime | IntegralNanosecondsUTC | None = None,
Expand Down
134 changes: 134 additions & 0 deletions nominal/core/data_review.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
from __future__ import annotations

from dataclasses import dataclass, field
from datetime import datetime, timedelta
from time import sleep
from typing import Protocol, Sequence

from typing_extensions import Self

from nominal._api.scout_service_api import scout_checks_api, scout_datareview_api, scout_integrations_api
from nominal.core import checklist
from nominal.core._clientsbunch import HasAuthHeader
from nominal.core._utils import HasRid
from nominal.ts import IntegralNanosecondsUTC, _SecondsNanos


@dataclass(frozen=True)
class DataReview(HasRid):
rid: str
run_rid: str
checklist_rid: str
checklist_commit: str
completed: bool

_clients: _Clients = field(repr=False)

class _Clients(HasAuthHeader, Protocol):
@property
def datareview(self) -> scout_datareview_api.DataReviewService: ...

@classmethod
def _from_conjure(cls, clients: _Clients, data_review: scout_datareview_api.DataReview) -> Self:
executing_states = [
check.automatic_check.state._pending_execution or check.automatic_check.state._executing
for check in data_review.checklist.checks
if check.automatic_check
]
completed = not any(executing_states)
return cls(
rid=data_review.rid,
run_rid=data_review.run_rid,
checklist_rid=data_review.checklist.checklist.rid,
checklist_commit=data_review.checklist.checklist.commit,
completed=completed,
_clients=clients,
)

def get_violations(self) -> Sequence[CheckViolation]:
"""Retrieves the list of check violations for the data review."""
response = self._clients.datareview.get_check_alerts_for_data_review(self._clients.auth_header, self.rid)
return [CheckViolation._from_conjure(alert) for alert in response]


@dataclass(frozen=True)
class CheckViolation:
rid: str
check_rid: str
name: str
start: IntegralNanosecondsUTC
end: IntegralNanosecondsUTC | None
priority: checklist.Priority | None

@classmethod
def _from_conjure(cls, check_alert: scout_datareview_api.CheckAlert) -> CheckViolation:
return cls(
rid=check_alert.rid,
check_rid=check_alert.check_rid,
name=check_alert.name,
start=_SecondsNanos.from_api(check_alert.start).to_nanoseconds(),
end=_SecondsNanos.from_api(check_alert.end).to_nanoseconds() if check_alert.end is not None else None,
priority=checklist._conjure_priority_to_priority(check_alert.priority)
if check_alert.priority is not scout_checks_api.Priority.UNKNOWN
else None,
)


@dataclass(frozen=True)
class DataReviewBatchBuilder:
integration_rids: list[str]
_requests: list[scout_datareview_api.CreateDataReviewRequest]
_clients: DataReview._Clients = field(repr=False)

def add_integration(self, integration_rid: str) -> DataReviewBatchBuilder:
self.integration_rids.append(integration_rid)
return self

def add_request(self, run_rid: str, checklist_rid: str, commit: str) -> DataReviewBatchBuilder:
self._requests.append(scout_datareview_api.CreateDataReviewRequest(checklist_rid, run_rid, commit))
return self

def initiate(
self, wait_for_completion: bool = True, wait_timeout: timedelta = timedelta(minutes=1)
) -> Sequence[DataReview]:
"""Initiates a batch data review process.

Args:
wait_for_completion (bool): If True, waits for the data review process to complete before returning.
Default is True.
wait_timeout (timedelta): The maximum time to wait for the data review process to complete.
Default is 1 minute.

Raises:
TimeoutError: If the data review process does not complete before the wait_timeout.
"""
request = scout_datareview_api.BatchInitiateDataReviewRequest(
notification_configurations=[
scout_integrations_api.NotificationConfiguration(c) for c in self.integration_rids
],
requests=self._requests,
)
response = self._clients.datareview.batch_initiate(self._clients.auth_header, request)

if not wait_for_completion:
return [
DataReview._from_conjure(self._clients, self._clients.datareview.get(self._clients.auth_header, rid))
for rid in response.rids
]

started = datetime.now()
completed_review_rids = []
completed_reviews = []
while datetime.now() - started <= timedelta(seconds=wait_timeout.total_seconds()):
sleep(2)
for rid in response.rids:
if rid not in completed_review_rids:
review_response = self._clients.datareview.get(self._clients.auth_header, rid)
review = DataReview._from_conjure(self._clients, review_response)
if review.completed:
completed_review_rids.append(rid)
completed_reviews.append(review)
if len(completed_reviews) == len(response.rids):
return completed_reviews

raise TimeoutError(f"Data review initiation did not complete before wait_timeout. Review rids: {response.rids}")
27 changes: 27 additions & 0 deletions nominal/nominal.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
Workbook,
poll_until_ingestion_completed,
)
from nominal.core.data_review import DataReview, DataReviewBatchBuilder

if TYPE_CHECKING:
import pandas as pd
Expand Down Expand Up @@ -603,3 +604,29 @@ def create_log_set(
"""
conn = get_default_client()
return conn.create_log_set(name, logs, timestamp_type, description)


def create_data_review_batch_builder() -> DataReviewBatchBuilder:
"""Create a batch of data reviews to be initiated together.

Example:
-------
```python
builder = nm.create_data_review_batch_builder()
builder.add_integrationn("integration_rid")
builder.add_request("run_rid_1", "checklist_rid_1", "commit_1")
builder.add_request("run_rid_2", "checklist_rid_2", "commit_2")
reviews = builder.initiate()

for review in reviews:
print(review.get_violations())
```
"""
conn = get_default_client()
return conn.create_data_review_batch_builder()


def get_data_review(rid: str) -> DataReview:
"""Retrieve a data review from the Nominal platform by its RID."""
conn = get_default_client()
return conn.get_data_review(rid)
Loading