diff --git a/nominal/core/_clientsbunch.py b/nominal/core/_clientsbunch.py index 1839d2f8..464077af 100644 --- a/nominal/core/_clientsbunch.py +++ b/nominal/core/_clientsbunch.py @@ -15,6 +15,7 @@ scout, scout_assets, scout_catalog, + scout_checklistexecution_api, scout_checks_api, scout_compute_api, scout_compute_representation_api, @@ -54,6 +55,7 @@ class ClientsBunch: storage_writer: storage_writer_api.NominalChannelWriterService template: scout.TemplateService notebook: scout.NotebookService + checklist_execution: scout_checklistexecution_api.ChecklistExecutionService @classmethod def from_config(cls, cfg: ServiceConfiguration, agent: str, token: str) -> Self: @@ -82,6 +84,7 @@ def from_config(cls, cfg: ServiceConfiguration, agent: str, token: str) -> Self: storage_writer=client_factory(storage_writer_api.NominalChannelWriterService), template=client_factory(scout.TemplateService), notebook=client_factory(scout.NotebookService), + checklist_execution=client_factory(scout_checklistexecution_api.ChecklistExecutionService), ) diff --git a/nominal/core/checklist.py b/nominal/core/checklist.py index 20e08103..efc15ae1 100644 --- a/nominal/core/checklist.py +++ b/nominal/core/checklist.py @@ -1,6 +1,7 @@ from __future__ import annotations from dataclasses import dataclass, field +from datetime import timedelta from typing import Literal, Mapping, Protocol, Sequence from typing_extensions import Self @@ -8,13 +9,16 @@ from nominal._api.scout_service_api import ( api, scout_api, + scout_checklistexecution_api, scout_checks_api, scout_compute_api, scout_compute_representation_api, + scout_integrations_api, scout_run_api, ) from nominal.core._clientsbunch import HasAuthHeader -from nominal.core._utils import HasRid +from nominal.core._utils import HasRid, rid_from_instance_or_string +from nominal.core.asset import Asset # TODO(ritwikdixit): add support for more fields i.e. lineage @@ -112,6 +116,8 @@ class _Clients(HasAuthHeader, Protocol): def checklist(self) -> scout_checks_api.ChecklistService: ... @property def compute_representation(self) -> scout_compute_representation_api.ComputeRepresentationService: ... + @property + def checklist_execution(self) -> scout_checklistexecution_api.ChecklistExecutionService: ... @classmethod def _from_conjure(cls, clients: _Clients, checklist: scout_checks_api.VersionedChecklist) -> Self: @@ -133,7 +139,6 @@ def _from_conjure(cls, clients: _Clients, checklist: scout_checks_api.VersionedC ) } - # # TODO(ritwikdixit): remove the need for these extraneous network requests variable_names_to_expressions = clients.compute_representation.batch_compute_to_expression( clients.auth_header, variable_name_to_graph_map ) @@ -170,6 +175,53 @@ def _from_conjure(cls, clients: _Clients, checklist: scout_checks_api.VersionedC _clients=clients, ) + def execute_streaming( + self, + assets: Sequence[Asset | str], + integration_rids: Sequence[str], + *, + evaluation_delay: timedelta = timedelta(), + recovery_delay: timedelta = timedelta(seconds=15), + ) -> None: + """Execute the checklist for the given assets. + - `assets`: Can be `Asset` instances, or Asset RIDs. + - `integration_rids`: Checklist violations will be sent to the specified integrations. At least one integration + must be specified. See https://app.gov.nominal.io/settings/integrations for a list of available integrations. + - `evaluation_delay`: Delays the evaluation of the streaming checklist. This is useful for when data is delayed. + - `recovery_delay`: Specifies the minimum amount of time that must pass before a check can recover from a + failure. Minimum value is 15 seconds. + """ + self._clients.checklist_execution.execute_streaming_checklist( + self._clients.auth_header, + scout_checklistexecution_api.ExecuteChecklistForAssetsRequest( + assets=[rid_from_instance_or_string(asset) for asset in assets], + checklist=self.rid, + notification_configurations=[ + scout_integrations_api.NotificationConfiguration(c) for c in integration_rids + ], + evaluation_delay=_to_api_duration(evaluation_delay), + recovery_delay=_to_api_duration(recovery_delay), + ), + ) + + def stop_streaming(self) -> None: + """Stop the checklist.""" + self._clients.checklist_execution.stop_streaming_checklist(self._clients.auth_header, self.rid) + + def stop_streaming_for_assets(self, assets: Sequence[Asset | str]) -> None: + """Stop the checklist for the given assets.""" + self._clients.checklist_execution.stop_streaming_checklist_for_assets( + self._clients.auth_header, + scout_checklistexecution_api.StopStreamingChecklistForAssetsRequest( + assets=[rid_from_instance_or_string(asset) for asset in assets], + checklist=self.rid, + ), + ) + + def reload_streaming(self) -> None: + """Reload the checklist.""" + self._clients.checklist_execution.reload_streaming_checklist(self._clients.auth_header, self.rid) + Priority = Literal[0, 1, 2, 3, 4] @@ -467,3 +519,7 @@ def _timestamp( self, timestamp: scout_checks_api.TimestampLocator ) -> scout_compute_representation_api.ComputeRepresentationVariableValue | None: return None + + +def _to_api_duration(duration: timedelta) -> scout_run_api.Duration: + return scout_run_api.Duration(seconds=int(duration.total_seconds()), nanos=duration.microseconds * 1000) diff --git a/nominal/core/client.py b/nominal/core/client.py index 377c7b74..6d870690 100644 --- a/nominal/core/client.py +++ b/nominal/core/client.py @@ -21,6 +21,7 @@ ingest_api, scout_asset_api, scout_catalog, + scout_checklistexecution_api, scout_datasource_connection_api, scout_notebook_api, scout_run_api, @@ -884,6 +885,35 @@ def search_assets( """ return list(self._iter_search_assets(search_text, label, property)) + def list_streaming_checklists(self, asset: Asset | str | None = None) -> Iterable[str]: + """List all Streaming Checklists. + + Args: + asset: if provided, only return checklists associated with the given asset. + """ + next_page_token = None + + while True: + if asset is None: + response = self._clients.checklist_execution.list_streaming_checklist( + self._clients.auth_header, + scout_checklistexecution_api.ListStreamingChecklistRequest(page_token=next_page_token), + ) + yield from response.checklists + next_page_token = response.next_page_token + else: + for_asset_response = self._clients.checklist_execution.list_streaming_checklist_for_asset( + self._clients.auth_header, + scout_checklistexecution_api.ListStreamingChecklistForAssetRequest( + asset_rid=rid_from_instance_or_string(asset), page_token=next_page_token + ), + ) + yield from for_asset_response.checklists + next_page_token = for_asset_response.next_page_token + + if next_page_token is None: + break + def _create_search_runs_query( start: datetime | IntegralNanosecondsUTC | None = None, diff --git a/nominal/nominal.py b/nominal/nominal.py index 4491a588..e8fdfe56 100644 --- a/nominal/nominal.py +++ b/nominal/nominal.py @@ -447,6 +447,16 @@ def search_assets( return list(assets) +def list_streaming_checklists(asset: Asset | str | None = None) -> Iterable[str]: + """List all Streaming Checklists. + + Args: + asset: if provided, only return checklists associated with the given asset. + """ + conn = get_default_client() + return conn.list_streaming_checklists(asset) + + def wait_until_ingestions_complete(datasets: list[Dataset]) -> None: """Wait until all datasets have completed ingestion.