From 4985dd7572299ed1e30eb4f3fef44ce79c95c85b Mon Sep 17 00:00:00 2001 From: Mario Buikhuizen Date: Wed, 11 Dec 2024 22:02:25 +0100 Subject: [PATCH 1/5] feat: add data-review/batch-initiate endpoint --- nominal/core/_clientsbunch.py | 3 + nominal/core/client.py | 12 +++ nominal/core/data_review.py | 134 ++++++++++++++++++++++++++++++++++ nominal/nominal.py | 29 ++++++++ 4 files changed, 178 insertions(+) create mode 100644 nominal/core/data_review.py diff --git a/nominal/core/_clientsbunch.py b/nominal/core/_clientsbunch.py index 1839d2f8..b992c621 100644 --- a/nominal/core/_clientsbunch.py +++ b/nominal/core/_clientsbunch.py @@ -19,6 +19,7 @@ scout_compute_api, scout_compute_representation_api, scout_dataexport_api, + scout_datareview_api, scout_datasource, scout_datasource_connection, scout_video, @@ -54,6 +55,7 @@ class ClientsBunch: storage_writer: storage_writer_api.NominalChannelWriterService template: scout.TemplateService notebook: scout.NotebookService + datareview: scout_datareview_api.DataReviewService @classmethod def from_config(cls, cfg: ServiceConfiguration, agent: str, token: str) -> Self: @@ -82,6 +84,7 @@ def from_config(cls, cfg: ServiceConfiguration, agent: str, token: str) -> Self: storage_writer=client_factory(storage_writer_api.NominalChannelWriterService), template=client_factory(scout.TemplateService), notebook=client_factory(scout.NotebookService), + datareview=client_factory(scout_datareview_api.DataReviewService), ) diff --git a/nominal/core/client.py b/nominal/core/client.py index 4e6b15c1..39c3007b 100644 --- a/nominal/core/client.py +++ b/nominal/core/client.py @@ -38,6 +38,7 @@ from nominal.core.channel import Channel from nominal.core.checklist import Checklist, ChecklistBuilder from nominal.core.connection import Connection +from nominal.core.data_review import DataReview, DataReviewBatchBuilder from nominal.core.dataset import Dataset, _get_dataset, _get_datasets from nominal.core.filetype import FileType, FileTypes from nominal.core.log import Log, LogSet, _get_log_set @@ -818,6 +819,17 @@ def search_assets( """ return list(self._iter_search_assets(search_text, label, property)) + def create_data_review_batch_builder( + self, notification_configurations: Sequence[str] | None = None + ) -> DataReviewBatchBuilder: + return DataReviewBatchBuilder( + list(notification_configurations) if notification_configurations else [], [], self._clients + ) + + def get_data_review(self, rid: str) -> DataReview: + response = self._clients.datareview.get(self._clients.auth_header, rid) + return DataReview._from_conjure(self._clients, response) + def _create_search_runs_query( start: datetime | IntegralNanosecondsUTC | None = None, diff --git a/nominal/core/data_review.py b/nominal/core/data_review.py new file mode 100644 index 00000000..390c2150 --- /dev/null +++ b/nominal/core/data_review.py @@ -0,0 +1,134 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime, timedelta +from time import sleep +from typing import Protocol, Sequence + +from typing_extensions import Self + +from nominal._api.scout_service_api import scout_checks_api, scout_datareview_api, scout_integrations_api +from nominal.core import checklist +from nominal.core._clientsbunch import HasAuthHeader +from nominal.core._utils import HasRid +from nominal.ts import IntegralNanosecondsUTC, _SecondsNanos + + +@dataclass(frozen=True) +class DataReview(HasRid): + rid: str + run_rid: str + checklist_rid: str + checklist_commit: str + completed: bool + + _clients: _Clients = field(repr=False) + + class _Clients(HasAuthHeader, Protocol): + @property + def datareview(self) -> scout_datareview_api.DataReviewService: ... + + @classmethod + def _from_conjure(cls, clients: _Clients, data_review: scout_datareview_api.DataReview) -> Self: + executing_states = [ + check.automatic_check.state._pending_execution or check.automatic_check.state._executing + for check in data_review.checklist.checks + if check.automatic_check + ] + completed = not any(executing_states) + return cls( + rid=data_review.rid, + run_rid=data_review.run_rid, + checklist_rid=data_review.checklist.checklist.rid, + checklist_commit=data_review.checklist.checklist.commit, + completed=completed, + _clients=clients, + ) + + def get_violations(self) -> Sequence[CheckViolation]: + """Retrieves the list of check violations for the data review.""" + response = self._clients.datareview.get_check_alerts_for_data_review(self._clients.auth_header, self.rid) + return [CheckViolation._from_conjure(alert) for alert in response] + + +@dataclass(frozen=True) +class CheckViolation: + rid: str + check_rid: str + name: str + start: IntegralNanosecondsUTC + end: IntegralNanosecondsUTC | None + priority: checklist.Priority | None + + @classmethod + def _from_conjure(cls, check_alert: scout_datareview_api.CheckAlert) -> CheckViolation: + return cls( + rid=check_alert.rid, + check_rid=check_alert.check_rid, + name=check_alert.name, + start=_SecondsNanos.from_api(check_alert.start).to_nanoseconds(), + end=_SecondsNanos.from_api(check_alert.end).to_nanoseconds() if check_alert.end is not None else None, + priority=checklist._conjure_priority_to_priority(check_alert.priority) + if check_alert.priority is not scout_checks_api.Priority.UNKNOWN + else None, + ) + + +@dataclass(frozen=True) +class DataReviewBatchBuilder: + notification_configurations: list[str] + _requests: list[scout_datareview_api.CreateDataReviewRequest] + _clients: DataReview._Clients = field(repr=False) + + def add_notification_configuration(self, notification_configuration: str) -> DataReviewBatchBuilder: + self.notification_configurations.append(notification_configuration) + return self + + def add_request(self, run_rid: str, checklist_rid: str, commit: str) -> DataReviewBatchBuilder: + self._requests.append(scout_datareview_api.CreateDataReviewRequest(checklist_rid, run_rid, commit)) + return self + + def initiate( + self, wait_for_completion: bool = True, wait_timeout: timedelta = timedelta(minutes=1) + ) -> Sequence[DataReview]: + """Initiates a batch data review process. + + Args: + wait_for_completion (bool): If True, waits for the data review process to complete before returning. + Default is True. + wait_timeout (timedelta): The maximum time to wait for the data review process to complete. + Default is 1 minute. + + Raises: + TimeoutError: If the data review process does not complete before the wait_timeout. + """ + request = scout_datareview_api.BatchInitiateDataReviewRequest( + notification_configurations=[ + scout_integrations_api.NotificationConfiguration(c) for c in self.notification_configurations + ], + requests=self._requests, + ) + response = self._clients.datareview.batch_initiate(self._clients.auth_header, request) + + if not wait_for_completion: + return [ + DataReview._from_conjure(self._clients, self._clients.datareview.get(self._clients.auth_header, rid)) + for rid in response.rids + ] + + started = datetime.now() + completed_review_rids = [] + completed_reviews = [] + while datetime.now() - started <= timedelta(seconds=wait_timeout.total_seconds()): + sleep(2) + for rid in response.rids: + if rid not in completed_review_rids: + review_response = self._clients.datareview.get(self._clients.auth_header, rid) + review = DataReview._from_conjure(self._clients, review_response) + if review.completed: + completed_review_rids.append(rid) + completed_reviews.append(review) + if len(completed_reviews) == len(response.rids): + return completed_reviews + + raise TimeoutError(f"Data review initiation did not complete before wait_timeout. Review rids: {response.rids}") diff --git a/nominal/nominal.py b/nominal/nominal.py index 8eb51460..5b3783de 100644 --- a/nominal/nominal.py +++ b/nominal/nominal.py @@ -25,6 +25,7 @@ Workbook, poll_until_ingestion_completed, ) +from nominal.core.data_review import DataReview, DataReviewBatchBuilder if TYPE_CHECKING: import pandas as pd @@ -603,3 +604,31 @@ def create_log_set( """ conn = get_default_client() return conn.create_log_set(name, logs, timestamp_type, description) + + +def create_data_review_batch_builder( + notification_configurations: Sequence[str] | None = None, +) -> DataReviewBatchBuilder: + """Create a batch of data reviews to be initiated together. + + Example: + ------- + ```python + builder = nm.create_data_review_batch_builder() + builder.add_notification_configuration("notification_config_1") + builder.add_request("run_rid_1", "checklist_rid_1", "commit_1") + builder.add_request("run_rid_2", "checklist_rid_2", "commit_2") + reviews = builder.initiate() + + for review in reviews: + print(review.get_violations()) + ``` + """ + conn = get_default_client() + return conn.create_data_review_batch_builder(notification_configurations) + + +def get_data_review(rid: str) -> DataReview: + """Retrieve a data review from the Nominal platform by its RID.""" + conn = get_default_client() + return conn.get_data_review(rid) From af0efd78dc539a1d23c09b5a8036ec148bb1beb7 Mon Sep 17 00:00:00 2001 From: Mario Buikhuizen Date: Fri, 13 Dec 2024 22:04:07 +0100 Subject: [PATCH 2/5] rename to integrations and remove argument from toplevel call --- nominal/core/client.py | 8 ++------ nominal/core/data_review.py | 8 ++++---- nominal/nominal.py | 8 +++----- 3 files changed, 9 insertions(+), 15 deletions(-) diff --git a/nominal/core/client.py b/nominal/core/client.py index 39c3007b..04ff40d8 100644 --- a/nominal/core/client.py +++ b/nominal/core/client.py @@ -819,12 +819,8 @@ def search_assets( """ return list(self._iter_search_assets(search_text, label, property)) - def create_data_review_batch_builder( - self, notification_configurations: Sequence[str] | None = None - ) -> DataReviewBatchBuilder: - return DataReviewBatchBuilder( - list(notification_configurations) if notification_configurations else [], [], self._clients - ) + def create_data_review_batch_builder(self) -> DataReviewBatchBuilder: + return DataReviewBatchBuilder([], [], self._clients) def get_data_review(self, rid: str) -> DataReview: response = self._clients.datareview.get(self._clients.auth_header, rid) diff --git a/nominal/core/data_review.py b/nominal/core/data_review.py index 390c2150..7ec0f99e 100644 --- a/nominal/core/data_review.py +++ b/nominal/core/data_review.py @@ -76,12 +76,12 @@ def _from_conjure(cls, check_alert: scout_datareview_api.CheckAlert) -> CheckVio @dataclass(frozen=True) class DataReviewBatchBuilder: - notification_configurations: list[str] + integration_rids: list[str] _requests: list[scout_datareview_api.CreateDataReviewRequest] _clients: DataReview._Clients = field(repr=False) - def add_notification_configuration(self, notification_configuration: str) -> DataReviewBatchBuilder: - self.notification_configurations.append(notification_configuration) + def add_integration(self, integration_rid: str) -> DataReviewBatchBuilder: + self.integration_rids.append(integration_rid) return self def add_request(self, run_rid: str, checklist_rid: str, commit: str) -> DataReviewBatchBuilder: @@ -104,7 +104,7 @@ def initiate( """ request = scout_datareview_api.BatchInitiateDataReviewRequest( notification_configurations=[ - scout_integrations_api.NotificationConfiguration(c) for c in self.notification_configurations + scout_integrations_api.NotificationConfiguration(c) for c in self.integration_rids ], requests=self._requests, ) diff --git a/nominal/nominal.py b/nominal/nominal.py index 5b3783de..1cba2bd2 100644 --- a/nominal/nominal.py +++ b/nominal/nominal.py @@ -606,16 +606,14 @@ def create_log_set( return conn.create_log_set(name, logs, timestamp_type, description) -def create_data_review_batch_builder( - notification_configurations: Sequence[str] | None = None, -) -> DataReviewBatchBuilder: +def create_data_review_batch_builder() -> DataReviewBatchBuilder: """Create a batch of data reviews to be initiated together. Example: ------- ```python builder = nm.create_data_review_batch_builder() - builder.add_notification_configuration("notification_config_1") + builder.add_integrationn("integration_rid") builder.add_request("run_rid_1", "checklist_rid_1", "commit_1") builder.add_request("run_rid_2", "checklist_rid_2", "commit_2") reviews = builder.initiate() @@ -625,7 +623,7 @@ def create_data_review_batch_builder( ``` """ conn = get_default_client() - return conn.create_data_review_batch_builder(notification_configurations) + return conn.create_data_review_batch_builder() def get_data_review(rid: str) -> DataReview: From b2c02578f53cfb26d4cf7b9027582eaea7541c3e Mon Sep 17 00:00:00 2001 From: Mario Buikhuizen Date: Tue, 17 Dec 2024 21:03:18 +0100 Subject: [PATCH 3/5] don't recreate timedelta Co-authored-by: Alexander Reynolds --- nominal/core/data_review.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nominal/core/data_review.py b/nominal/core/data_review.py index 7ec0f99e..5d779ee2 100644 --- a/nominal/core/data_review.py +++ b/nominal/core/data_review.py @@ -119,7 +119,7 @@ def initiate( started = datetime.now() completed_review_rids = [] completed_reviews = [] - while datetime.now() - started <= timedelta(seconds=wait_timeout.total_seconds()): + while datetime.now() - started <= wait_timeout: sleep(2) for rid in response.rids: if rid not in completed_review_rids: From 83c15fb999ed96024b52c1ee2d6553470a52bd71 Mon Sep 17 00:00:00 2001 From: Mario Buikhuizen Date: Wed, 18 Dec 2024 16:13:10 +0100 Subject: [PATCH 4/5] address review issues --- nominal/core/client.py | 6 +-- nominal/core/data_review.py | 73 ++++++++++++++++++------------------- nominal/nominal.py | 10 ++--- 3 files changed, 44 insertions(+), 45 deletions(-) diff --git a/nominal/core/client.py b/nominal/core/client.py index 04ff40d8..484e957d 100644 --- a/nominal/core/client.py +++ b/nominal/core/client.py @@ -38,7 +38,7 @@ from nominal.core.channel import Channel from nominal.core.checklist import Checklist, ChecklistBuilder from nominal.core.connection import Connection -from nominal.core.data_review import DataReview, DataReviewBatchBuilder +from nominal.core.data_review import DataReview, DataReviewBuilder from nominal.core.dataset import Dataset, _get_dataset, _get_datasets from nominal.core.filetype import FileType, FileTypes from nominal.core.log import Log, LogSet, _get_log_set @@ -819,8 +819,8 @@ def search_assets( """ return list(self._iter_search_assets(search_text, label, property)) - def create_data_review_batch_builder(self) -> DataReviewBatchBuilder: - return DataReviewBatchBuilder([], [], self._clients) + def data_review_builder(self) -> DataReviewBuilder: + return DataReviewBuilder([], [], self._clients) def get_data_review(self, rid: str) -> DataReview: response = self._clients.datareview.get(self._clients.auth_header, rid) diff --git a/nominal/core/data_review.py b/nominal/core/data_review.py index 5d779ee2..cd08d718 100644 --- a/nominal/core/data_review.py +++ b/nominal/core/data_review.py @@ -1,7 +1,7 @@ from __future__ import annotations from dataclasses import dataclass, field -from datetime import datetime, timedelta +from datetime import timedelta from time import sleep from typing import Protocol, Sequence @@ -50,6 +50,20 @@ def get_violations(self) -> Sequence[CheckViolation]: response = self._clients.datareview.get_check_alerts_for_data_review(self._clients.auth_header, self.rid) return [CheckViolation._from_conjure(alert) for alert in response] + def reload(self) -> DataReview: + """Reloads the data review from the server.""" + return DataReview._from_conjure( + self._clients, self._clients.datareview.get(self._clients.auth_header, self.rid) + ) + + def poll_for_completion(self, interval: timedelta = timedelta(seconds=2)) -> DataReview: + """Polls the data review until it is completed.""" + review = self + while not review.completed: + sleep(interval.total_seconds()) + review = review.reload() + return review + @dataclass(frozen=True) class CheckViolation: @@ -75,60 +89,45 @@ def _from_conjure(cls, check_alert: scout_datareview_api.CheckAlert) -> CheckVio @dataclass(frozen=True) -class DataReviewBatchBuilder: - integration_rids: list[str] +class DataReviewBuilder: + _integration_rids: list[str] _requests: list[scout_datareview_api.CreateDataReviewRequest] _clients: DataReview._Clients = field(repr=False) - def add_integration(self, integration_rid: str) -> DataReviewBatchBuilder: - self.integration_rids.append(integration_rid) + def add_integration(self, integration_rid: str) -> DataReviewBuilder: + self._integration_rids.append(integration_rid) return self - def add_request(self, run_rid: str, checklist_rid: str, commit: str) -> DataReviewBatchBuilder: + def add_request(self, run_rid: str, checklist_rid: str, commit: str) -> DataReviewBuilder: self._requests.append(scout_datareview_api.CreateDataReviewRequest(checklist_rid, run_rid, commit)) return self - def initiate( - self, wait_for_completion: bool = True, wait_timeout: timedelta = timedelta(minutes=1) - ) -> Sequence[DataReview]: + def initiate(self, wait_for_completion: bool = True) -> Sequence[DataReview]: """Initiates a batch data review process. Args: wait_for_completion (bool): If True, waits for the data review process to complete before returning. Default is True. - wait_timeout (timedelta): The maximum time to wait for the data review process to complete. - Default is 1 minute. - - Raises: - TimeoutError: If the data review process does not complete before the wait_timeout. """ request = scout_datareview_api.BatchInitiateDataReviewRequest( notification_configurations=[ - scout_integrations_api.NotificationConfiguration(c) for c in self.integration_rids + scout_integrations_api.NotificationConfiguration(c) for c in self._integration_rids ], requests=self._requests, ) response = self._clients.datareview.batch_initiate(self._clients.auth_header, request) - if not wait_for_completion: - return [ - DataReview._from_conjure(self._clients, self._clients.datareview.get(self._clients.auth_header, rid)) - for rid in response.rids - ] - - started = datetime.now() - completed_review_rids = [] - completed_reviews = [] - while datetime.now() - started <= wait_timeout: - sleep(2) - for rid in response.rids: - if rid not in completed_review_rids: - review_response = self._clients.datareview.get(self._clients.auth_header, rid) - review = DataReview._from_conjure(self._clients, review_response) - if review.completed: - completed_review_rids.append(rid) - completed_reviews.append(review) - if len(completed_reviews) == len(response.rids): - return completed_reviews - - raise TimeoutError(f"Data review initiation did not complete before wait_timeout. Review rids: {response.rids}") + data_reviews = [ + DataReview._from_conjure(self._clients, self._clients.datareview.get(self._clients.auth_header, rid)) + for rid in response.rids + ] + if wait_for_completion: + return poll_until_completed(data_reviews) + else: + return data_reviews + + +def poll_until_completed( + data_reviews: Sequence[DataReview], interval: timedelta = timedelta(seconds=2) +) -> Sequence[DataReview]: + return [review.poll_for_completion(interval) for review in data_reviews] diff --git a/nominal/nominal.py b/nominal/nominal.py index 1cba2bd2..db9d2f17 100644 --- a/nominal/nominal.py +++ b/nominal/nominal.py @@ -25,7 +25,7 @@ Workbook, poll_until_ingestion_completed, ) -from nominal.core.data_review import DataReview, DataReviewBatchBuilder +from nominal.core.data_review import DataReview, DataReviewBuilder if TYPE_CHECKING: import pandas as pd @@ -606,14 +606,14 @@ def create_log_set( return conn.create_log_set(name, logs, timestamp_type, description) -def create_data_review_batch_builder() -> DataReviewBatchBuilder: +def data_review_builder() -> DataReviewBuilder: """Create a batch of data reviews to be initiated together. Example: ------- ```python - builder = nm.create_data_review_batch_builder() - builder.add_integrationn("integration_rid") + builder = nm.data_review_builder() + builder.add_integration("integration_rid") builder.add_request("run_rid_1", "checklist_rid_1", "commit_1") builder.add_request("run_rid_2", "checklist_rid_2", "commit_2") reviews = builder.initiate() @@ -623,7 +623,7 @@ def create_data_review_batch_builder() -> DataReviewBatchBuilder: ``` """ conn = get_default_client() - return conn.create_data_review_batch_builder() + return conn.data_review_builder() def get_data_review(rid: str) -> DataReview: From 19a0ceceeb2c419abe5e2c546b3e7581fbbe126d Mon Sep 17 00:00:00 2001 From: Mario Buikhuizen Date: Mon, 6 Jan 2025 12:15:54 +0100 Subject: [PATCH 5/5] add URL to DataReview class --- nominal/core/data_review.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/nominal/core/data_review.py b/nominal/core/data_review.py index cd08d718..cf952331 100644 --- a/nominal/core/data_review.py +++ b/nominal/core/data_review.py @@ -7,7 +7,7 @@ from typing_extensions import Self -from nominal._api.scout_service_api import scout_checks_api, scout_datareview_api, scout_integrations_api +from nominal._api.scout_service_api import scout, scout_checks_api, scout_datareview_api, scout_integrations_api from nominal.core import checklist from nominal.core._clientsbunch import HasAuthHeader from nominal.core._utils import HasRid @@ -27,6 +27,8 @@ class DataReview(HasRid): class _Clients(HasAuthHeader, Protocol): @property def datareview(self) -> scout_datareview_api.DataReviewService: ... + @property + def run(self) -> scout.RunService: ... @classmethod def _from_conjure(cls, clients: _Clients, data_review: scout_datareview_api.DataReview) -> Self: @@ -64,6 +66,13 @@ def poll_for_completion(self, interval: timedelta = timedelta(seconds=2)) -> Dat review = review.reload() return review + @property + def nominal_url(self) -> str: + """Returns a link to the page for this Data Review in the Nominal app""" + # TODO (drake): move logic into _from_conjure() factory function to accomodate different URL schemes + run = self._clients.run.get_run(self._clients.auth_header, self.run_rid) + return f"https://app.gov.nominal.io/runs/{run.run_number}/?tab=checklists&openChecklistDetails={self.rid}&openCheckExecutionErrorReview=" + @dataclass(frozen=True) class CheckViolation: