From c35b5b434f19df1059236381d7ae563509fe8740 Mon Sep 17 00:00:00 2001 From: Mario Buikhuizen Date: Fri, 15 Nov 2024 17:25:27 +0100 Subject: [PATCH 1/9] feat: add streaming checklists --- nominal/core/_clientsbunch.py | 3 ++ nominal/core/client.py | 11 +++++ nominal/core/streaming_checklist.py | 64 +++++++++++++++++++++++++++++ nominal/nominal.py | 13 ++++++ 4 files changed, 91 insertions(+) create mode 100644 nominal/core/streaming_checklist.py diff --git a/nominal/core/_clientsbunch.py b/nominal/core/_clientsbunch.py index d6efeb1c..3b7f9482 100644 --- a/nominal/core/_clientsbunch.py +++ b/nominal/core/_clientsbunch.py @@ -15,6 +15,7 @@ scout, scout_assets, scout_catalog, + scout_checklistexecution_api, scout_checks_api, scout_compute_api, scout_compute_representation_api, @@ -54,6 +55,7 @@ class ClientsBunch: storage_writer: storage_writer_api.NominalChannelWriterService template: scout.TemplateService notebook: scout.NotebookService + checklist_execution: scout_checklistexecution_api.ChecklistExecutionService @classmethod def from_config(cls, cfg: ServiceConfiguration, agent: str, token: str) -> Self: @@ -82,6 +84,7 @@ def from_config(cls, cfg: ServiceConfiguration, agent: str, token: str) -> Self: storage_writer=client_factory(storage_writer_api.NominalChannelWriterService), template=client_factory(scout.TemplateService), notebook=client_factory(scout.NotebookService), + checklist_execution=client_factory(scout_checklistexecution_api.ChecklistExecutionService), ) diff --git a/nominal/core/client.py b/nominal/core/client.py index a3bd8471..a3197161 100644 --- a/nominal/core/client.py +++ b/nominal/core/client.py @@ -57,6 +57,7 @@ ) from .asset import Asset +from .streaming_checklist import StreamingChecklist @dataclass(frozen=True) @@ -666,6 +667,16 @@ def get_asset(self, rid: str) -> Asset: raise ValueError(f"multiple assets found with RID {rid!r}: {response!r}") return Asset._from_conjure(self._clients, response[rid]) + def get_streaming_checklist(self, rid: str) -> StreamingChecklist: + """Retrieve a Streaming Checklist by its RID.""" + response = self._clients.checklist_execution.get_streaming_checklist(self._clients.auth_header, rid) + return StreamingChecklist._from_conjure(self._clients, response) + + def list_streaming_checklists(self) -> Sequence[str]: + """List all Streaming Checklists.""" + response = self._clients.checklist_execution.list_streaming_checklist(self._clients.auth_header) + return response + def _create_search_runs_query( start: datetime | IntegralNanosecondsUTC | None = None, diff --git a/nominal/core/streaming_checklist.py b/nominal/core/streaming_checklist.py new file mode 100644 index 00000000..8370ddd6 --- /dev/null +++ b/nominal/core/streaming_checklist.py @@ -0,0 +1,64 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Protocol, Sequence + +from typing_extensions import Self + +from nominal._api.combined import scout_checklistexecution_api, scout_integrations_api, scout_run_api +from nominal.core._clientsbunch import HasAuthHeader +from nominal.core._utils import HasRid, rid_from_instance_or_string +from nominal.core.asset import Asset + + +@dataclass(frozen=True) +class StreamingChecklist(HasRid): + rid: str + + _clients: "_Clients" = field(repr=False) + + class _Clients(HasAuthHeader, Protocol): + @property + def checklist_execution(self) -> scout_checklistexecution_api.ChecklistExecutionService: ... + + def execute( + self, assets: Sequence[Asset | str], *, notification_configurations: Sequence[str] | None = None + ) -> None: + """Execute the checklist for the given assets. + `assets` can be `Asset` instances, or asset RIDs. + `notification_configurations` are Integration RIDs. + """ + self._clients.checklist_execution.execute_streaming_checklist( + self._clients.auth_header, + scout_checklistexecution_api.ExecuteChecklistForAssetsRequest( + assets=[rid_from_instance_or_string(asset) for asset in assets], + checklist=self.rid, + notification_configurations=[ + scout_integrations_api.NotificationConfiguration(c) for c in notification_configurations or [] + ], + stream_delay=scout_run_api.Duration(seconds=0, nanos=0), + ), + ) + + def stop(self) -> None: + """Stop the checklist.""" + self._clients.checklist_execution.stop_streaming_checklist(self._clients.auth_header, self.rid) + + def stop_for_assets(self, assets: Sequence[Asset | str]) -> None: + """Stop the checklist for the given assets.""" + self._clients.checklist_execution.stop_streaming_checklist_for_assets( + self._clients.auth_header, + scout_checklistexecution_api.StopStreamingChecklistForAssetsRequest( + assets=[rid_from_instance_or_string(asset) for asset in assets], + checklist=self.rid, + ), + ) + + @classmethod + def _from_conjure( + cls, clients: _Clients, streaming_checklist: scout_checklistexecution_api.StreamingChecklistInfo + ) -> Self: + return cls( + rid=streaming_checklist.checklist_rid, + _clients=clients, + ) diff --git a/nominal/nominal.py b/nominal/nominal.py index 6e531ca6..81384056 100644 --- a/nominal/nominal.py +++ b/nominal/nominal.py @@ -22,6 +22,7 @@ Workbook, poll_until_ingestion_completed, ) +from nominal.core.streaming_checklist import StreamingChecklist if TYPE_CHECKING: import pandas as pd @@ -416,6 +417,18 @@ def get_asset(rid: str) -> Asset: return conn.get_asset(rid) +def get_streaming_checklist(rid: str) -> StreamingChecklist: + """Retrieve a Streaming Checklist by its RID.""" + conn = get_default_client() + return conn.get_streaming_checklist(rid) + + +def list_streaming_checklists() -> Sequence[str]: + """List all Streaming Checklists.""" + conn = get_default_client() + return conn.list_streaming_checklists() + + def wait_until_ingestions_complete(datasets: list[Dataset]) -> None: """Wait until all datasets have completed ingestion. From 6ecf3b119e67c143f57802f53686a7e99967ec27 Mon Sep 17 00:00:00 2001 From: Mario Buikhuizen Date: Thu, 21 Nov 2024 16:13:53 +0100 Subject: [PATCH 2/9] support newly generated conjure types --- nominal/core/streaming_checklist.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/nominal/core/streaming_checklist.py b/nominal/core/streaming_checklist.py index 8370ddd6..f68b4f9a 100644 --- a/nominal/core/streaming_checklist.py +++ b/nominal/core/streaming_checklist.py @@ -36,7 +36,8 @@ def execute( notification_configurations=[ scout_integrations_api.NotificationConfiguration(c) for c in notification_configurations or [] ], - stream_delay=scout_run_api.Duration(seconds=0, nanos=0), + evaluation_delay=scout_run_api.Duration(seconds=0, nanos=0), + recovery_delay=scout_run_api.Duration(seconds=0, nanos=0), ), ) From d94868bbee036f77f5e4cf1078a4da5ec0c5b433 Mon Sep 17 00:00:00 2001 From: Mario Buikhuizen Date: Fri, 22 Nov 2024 14:33:03 +0100 Subject: [PATCH 3/9] move execute to checklist and remove get_streaming_checklist --- nominal/core/checklist.py | 56 ++++++++++++++++++++++++- nominal/core/client.py | 6 --- nominal/core/streaming_checklist.py | 65 ----------------------------- nominal/nominal.py | 7 ---- 4 files changed, 54 insertions(+), 80 deletions(-) delete mode 100644 nominal/core/streaming_checklist.py diff --git a/nominal/core/checklist.py b/nominal/core/checklist.py index 4bdbe893..136d812b 100644 --- a/nominal/core/checklist.py +++ b/nominal/core/checklist.py @@ -1,6 +1,7 @@ from __future__ import annotations from dataclasses import dataclass, field +from datetime import timedelta from typing import Literal, Mapping, Protocol, Sequence from typing_extensions import Self @@ -8,13 +9,16 @@ from nominal._api.combined import ( api, scout_api, + scout_checklistexecution_api, scout_checks_api, scout_compute_api, scout_compute_representation_api, + scout_integrations_api, scout_run_api, ) from nominal.core._clientsbunch import HasAuthHeader -from nominal.core._utils import HasRid +from nominal.core._utils import HasRid, rid_from_instance_or_string +from nominal.core.asset import Asset # TODO(ritwikdixit): add support for more fields i.e. lineage @@ -112,6 +116,8 @@ class _Clients(HasAuthHeader, Protocol): def checklist(self) -> scout_checks_api.ChecklistService: ... @property def compute_representation(self) -> scout_compute_representation_api.ComputeRepresentationService: ... + @property + def checklist_execution(self) -> scout_checklistexecution_api.ChecklistExecutionService: ... @classmethod def _from_conjure(cls, clients: _Clients, checklist: scout_checks_api.VersionedChecklist) -> Self: @@ -133,7 +139,6 @@ def _from_conjure(cls, clients: _Clients, checklist: scout_checks_api.VersionedC ) } - # # TODO(ritwikdixit): remove the need for these extraneous network requests variable_names_to_expressions = clients.compute_representation.batch_compute_to_expression( clients.auth_header, variable_name_to_graph_map ) @@ -170,6 +175,49 @@ def _from_conjure(cls, clients: _Clients, checklist: scout_checks_api.VersionedC _clients=clients, ) + def execute( + self, + assets: Sequence[Asset | str], + notification_configurations: Sequence[str], + *, + evaluation_delay: timedelta = timedelta(), + recovery_delay: timedelta = timedelta(seconds=15), + ) -> None: + """Execute the checklist for the given assets. + - `assets`: Can be `Asset` instances, or Asset RIDs. + - `notification_configurations`: Integration RIDs, checklist violations will be sent to the specified + integrations. + - `evaluation_delay`: Delays the evaluation of the streaming checklist. This is useful for when data is delayed. + - `recovery_delay`: Specifies the minimum amount of time that must pass before a check can recover from a + failure. Minimum value is 15 seconds. + """ + self._clients.checklist_execution.execute_streaming_checklist( + self._clients.auth_header, + scout_checklistexecution_api.ExecuteChecklistForAssetsRequest( + assets=[rid_from_instance_or_string(asset) for asset in assets], + checklist=self.rid, + notification_configurations=[ + scout_integrations_api.NotificationConfiguration(c) for c in notification_configurations or [] + ], + evaluation_delay=_to_api_duration(evaluation_delay), + recovery_delay=_to_api_duration(recovery_delay), + ), + ) + + def stop(self) -> None: + """Stop the checklist.""" + self._clients.checklist_execution.stop_streaming_checklist(self._clients.auth_header, self.rid) + + def stop_for_assets(self, assets: Sequence[Asset | str]) -> None: + """Stop the checklist for the given assets.""" + self._clients.checklist_execution.stop_streaming_checklist_for_assets( + self._clients.auth_header, + scout_checklistexecution_api.StopStreamingChecklistForAssetsRequest( + assets=[rid_from_instance_or_string(asset) for asset in assets], + checklist=self.rid, + ), + ) + Priority = Literal[0, 1, 2, 3, 4] @@ -467,3 +515,7 @@ def _timestamp( self, timestamp: scout_checks_api.TimestampLocator ) -> scout_compute_representation_api.ComputeRepresentationVariableValue | None: return None + + +def _to_api_duration(duration: timedelta) -> scout_run_api.Duration: + return scout_run_api.Duration(seconds=duration.seconds, nanos=duration.microseconds * 1000) diff --git a/nominal/core/client.py b/nominal/core/client.py index 018dfab3..31cf00fc 100644 --- a/nominal/core/client.py +++ b/nominal/core/client.py @@ -57,7 +57,6 @@ ) from .asset import Asset -from .streaming_checklist import StreamingChecklist @dataclass(frozen=True) @@ -730,11 +729,6 @@ def search_assets( """ return list(self._iter_search_assets(search_text, label, property)) - def get_streaming_checklist(self, rid: str) -> StreamingChecklist: - """Retrieve a Streaming Checklist by its RID.""" - response = self._clients.checklist_execution.get_streaming_checklist(self._clients.auth_header, rid) - return StreamingChecklist._from_conjure(self._clients, response) - def list_streaming_checklists(self) -> Sequence[str]: """List all Streaming Checklists.""" response = self._clients.checklist_execution.list_streaming_checklist(self._clients.auth_header) diff --git a/nominal/core/streaming_checklist.py b/nominal/core/streaming_checklist.py deleted file mode 100644 index f68b4f9a..00000000 --- a/nominal/core/streaming_checklist.py +++ /dev/null @@ -1,65 +0,0 @@ -from __future__ import annotations - -from dataclasses import dataclass, field -from typing import Protocol, Sequence - -from typing_extensions import Self - -from nominal._api.combined import scout_checklistexecution_api, scout_integrations_api, scout_run_api -from nominal.core._clientsbunch import HasAuthHeader -from nominal.core._utils import HasRid, rid_from_instance_or_string -from nominal.core.asset import Asset - - -@dataclass(frozen=True) -class StreamingChecklist(HasRid): - rid: str - - _clients: "_Clients" = field(repr=False) - - class _Clients(HasAuthHeader, Protocol): - @property - def checklist_execution(self) -> scout_checklistexecution_api.ChecklistExecutionService: ... - - def execute( - self, assets: Sequence[Asset | str], *, notification_configurations: Sequence[str] | None = None - ) -> None: - """Execute the checklist for the given assets. - `assets` can be `Asset` instances, or asset RIDs. - `notification_configurations` are Integration RIDs. - """ - self._clients.checklist_execution.execute_streaming_checklist( - self._clients.auth_header, - scout_checklistexecution_api.ExecuteChecklistForAssetsRequest( - assets=[rid_from_instance_or_string(asset) for asset in assets], - checklist=self.rid, - notification_configurations=[ - scout_integrations_api.NotificationConfiguration(c) for c in notification_configurations or [] - ], - evaluation_delay=scout_run_api.Duration(seconds=0, nanos=0), - recovery_delay=scout_run_api.Duration(seconds=0, nanos=0), - ), - ) - - def stop(self) -> None: - """Stop the checklist.""" - self._clients.checklist_execution.stop_streaming_checklist(self._clients.auth_header, self.rid) - - def stop_for_assets(self, assets: Sequence[Asset | str]) -> None: - """Stop the checklist for the given assets.""" - self._clients.checklist_execution.stop_streaming_checklist_for_assets( - self._clients.auth_header, - scout_checklistexecution_api.StopStreamingChecklistForAssetsRequest( - assets=[rid_from_instance_or_string(asset) for asset in assets], - checklist=self.rid, - ), - ) - - @classmethod - def _from_conjure( - cls, clients: _Clients, streaming_checklist: scout_checklistexecution_api.StreamingChecklistInfo - ) -> Self: - return cls( - rid=streaming_checklist.checklist_rid, - _clients=clients, - ) diff --git a/nominal/nominal.py b/nominal/nominal.py index 71869b43..3d30ad8b 100644 --- a/nominal/nominal.py +++ b/nominal/nominal.py @@ -23,7 +23,6 @@ Workbook, poll_until_ingestion_completed, ) -from nominal.core.streaming_checklist import StreamingChecklist if TYPE_CHECKING: import pandas as pd @@ -441,12 +440,6 @@ def search_assets( return list(assets) -def get_streaming_checklist(rid: str) -> StreamingChecklist: - """Retrieve a Streaming Checklist by its RID.""" - conn = get_default_client() - return conn.get_streaming_checklist(rid) - - def list_streaming_checklists() -> Sequence[str]: """List all Streaming Checklists.""" conn = get_default_client() From 75d2720cd0ce85d39a58ea3bd67cc1f18f15cbd0 Mon Sep 17 00:00:00 2001 From: Mario Buikhuizen Date: Mon, 25 Nov 2024 11:55:25 +0100 Subject: [PATCH 4/9] add streaming back to execute and stop function names --- nominal/core/checklist.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/nominal/core/checklist.py b/nominal/core/checklist.py index b2b70733..3a53eb5c 100644 --- a/nominal/core/checklist.py +++ b/nominal/core/checklist.py @@ -175,7 +175,7 @@ def _from_conjure(cls, clients: _Clients, checklist: scout_checks_api.VersionedC _clients=clients, ) - def execute( + def execute_streaming( self, assets: Sequence[Asset | str], notification_configurations: Sequence[str], @@ -204,11 +204,11 @@ def execute( ), ) - def stop(self) -> None: + def stop_streaming(self) -> None: """Stop the checklist.""" self._clients.checklist_execution.stop_streaming_checklist(self._clients.auth_header, self.rid) - def stop_for_assets(self, assets: Sequence[Asset | str]) -> None: + def stop_streaming_for_assets(self, assets: Sequence[Asset | str]) -> None: """Stop the checklist for the given assets.""" self._clients.checklist_execution.stop_streaming_checklist_for_assets( self._clients.auth_header, From 6fb7275b2b3785fe2f6b63d760a8add247b228fc Mon Sep 17 00:00:00 2001 From: Mario Buikhuizen Date: Mon, 9 Dec 2024 15:37:01 +0100 Subject: [PATCH 5/9] 1 notifaction_configuration required --- nominal/core/checklist.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nominal/core/checklist.py b/nominal/core/checklist.py index 3a53eb5c..a1a074ab 100644 --- a/nominal/core/checklist.py +++ b/nominal/core/checklist.py @@ -186,7 +186,7 @@ def execute_streaming( """Execute the checklist for the given assets. - `assets`: Can be `Asset` instances, or Asset RIDs. - `notification_configurations`: Integration RIDs, checklist violations will be sent to the specified - integrations. + integrations. At least one integration must be specified. - `evaluation_delay`: Delays the evaluation of the streaming checklist. This is useful for when data is delayed. - `recovery_delay`: Specifies the minimum amount of time that must pass before a check can recover from a failure. Minimum value is 15 seconds. From 6df179ba323c9d1ad226f8c7738feb0ea9c9c39a Mon Sep 17 00:00:00 2001 From: Mario Buikhuizen Date: Mon, 9 Dec 2024 17:13:59 +0100 Subject: [PATCH 6/9] add reload_streaming_checklist endpoint --- nominal/core/checklist.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/nominal/core/checklist.py b/nominal/core/checklist.py index a1a074ab..fa778e25 100644 --- a/nominal/core/checklist.py +++ b/nominal/core/checklist.py @@ -218,6 +218,10 @@ def stop_streaming_for_assets(self, assets: Sequence[Asset | str]) -> None: ), ) + def reload_streaming(self) -> None: + """Reload the checklist.""" + self._clients.checklist_execution.reload_streaming_checklist(self._clients.auth_header, self.rid) + Priority = Literal[0, 1, 2, 3, 4] From 1528a3e7d3d0223de0ee708df545eb9ad7bf2d48 Mon Sep 17 00:00:00 2001 From: Mario Buikhuizen Date: Fri, 13 Dec 2024 17:39:16 +0100 Subject: [PATCH 7/9] fix wrong seconds used Co-authored-by: Alexander Reynolds --- nominal/core/checklist.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nominal/core/checklist.py b/nominal/core/checklist.py index fa778e25..7049c654 100644 --- a/nominal/core/checklist.py +++ b/nominal/core/checklist.py @@ -522,4 +522,4 @@ def _timestamp( def _to_api_duration(duration: timedelta) -> scout_run_api.Duration: - return scout_run_api.Duration(seconds=duration.seconds, nanos=duration.microseconds * 1000) + return scout_run_api.Duration(seconds=int(duration.total_seconds()), nanos=duration.microseconds * 1000) From bb39bc7c56485458ae2a91921ce12584bebcf1d1 Mon Sep 17 00:00:00 2001 From: Mario Buikhuizen Date: Fri, 13 Dec 2024 17:50:38 +0100 Subject: [PATCH 8/9] rename to integration_rids --- nominal/core/checklist.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/nominal/core/checklist.py b/nominal/core/checklist.py index 7049c654..efc15ae1 100644 --- a/nominal/core/checklist.py +++ b/nominal/core/checklist.py @@ -178,15 +178,15 @@ def _from_conjure(cls, clients: _Clients, checklist: scout_checks_api.VersionedC def execute_streaming( self, assets: Sequence[Asset | str], - notification_configurations: Sequence[str], + integration_rids: Sequence[str], *, evaluation_delay: timedelta = timedelta(), recovery_delay: timedelta = timedelta(seconds=15), ) -> None: """Execute the checklist for the given assets. - `assets`: Can be `Asset` instances, or Asset RIDs. - - `notification_configurations`: Integration RIDs, checklist violations will be sent to the specified - integrations. At least one integration must be specified. + - `integration_rids`: Checklist violations will be sent to the specified integrations. At least one integration + must be specified. See https://app.gov.nominal.io/settings/integrations for a list of available integrations. - `evaluation_delay`: Delays the evaluation of the streaming checklist. This is useful for when data is delayed. - `recovery_delay`: Specifies the minimum amount of time that must pass before a check can recover from a failure. Minimum value is 15 seconds. @@ -197,7 +197,7 @@ def execute_streaming( assets=[rid_from_instance_or_string(asset) for asset in assets], checklist=self.rid, notification_configurations=[ - scout_integrations_api.NotificationConfiguration(c) for c in notification_configurations or [] + scout_integrations_api.NotificationConfiguration(c) for c in integration_rids ], evaluation_delay=_to_api_duration(evaluation_delay), recovery_delay=_to_api_duration(recovery_delay), From c23c2f684753bead34e85bf9f968c9d04fc89bfc Mon Sep 17 00:00:00 2001 From: Mario Buikhuizen Date: Tue, 17 Dec 2024 20:12:25 +0100 Subject: [PATCH 9/9] support paging of new Conjure interface and add list_streaming_checklist_for_asset --- nominal/core/client.py | 33 +++++++++++++++++++++++++++++---- nominal/nominal.py | 10 +++++++--- 2 files changed, 36 insertions(+), 7 deletions(-) diff --git a/nominal/core/client.py b/nominal/core/client.py index d5855cb0..6d870690 100644 --- a/nominal/core/client.py +++ b/nominal/core/client.py @@ -21,6 +21,7 @@ ingest_api, scout_asset_api, scout_catalog, + scout_checklistexecution_api, scout_datasource_connection_api, scout_notebook_api, scout_run_api, @@ -884,10 +885,34 @@ def search_assets( """ return list(self._iter_search_assets(search_text, label, property)) - def list_streaming_checklists(self) -> Sequence[str]: - """List all Streaming Checklists.""" - response = self._clients.checklist_execution.list_streaming_checklist(self._clients.auth_header) - return response + def list_streaming_checklists(self, asset: Asset | str | None = None) -> Iterable[str]: + """List all Streaming Checklists. + + Args: + asset: if provided, only return checklists associated with the given asset. + """ + next_page_token = None + + while True: + if asset is None: + response = self._clients.checklist_execution.list_streaming_checklist( + self._clients.auth_header, + scout_checklistexecution_api.ListStreamingChecklistRequest(page_token=next_page_token), + ) + yield from response.checklists + next_page_token = response.next_page_token + else: + for_asset_response = self._clients.checklist_execution.list_streaming_checklist_for_asset( + self._clients.auth_header, + scout_checklistexecution_api.ListStreamingChecklistForAssetRequest( + asset_rid=rid_from_instance_or_string(asset), page_token=next_page_token + ), + ) + yield from for_asset_response.checklists + next_page_token = for_asset_response.next_page_token + + if next_page_token is None: + break def _create_search_runs_query( diff --git a/nominal/nominal.py b/nominal/nominal.py index 076dc888..e8fdfe56 100644 --- a/nominal/nominal.py +++ b/nominal/nominal.py @@ -447,10 +447,14 @@ def search_assets( return list(assets) -def list_streaming_checklists() -> Sequence[str]: - """List all Streaming Checklists.""" +def list_streaming_checklists(asset: Asset | str | None = None) -> Iterable[str]: + """List all Streaming Checklists. + + Args: + asset: if provided, only return checklists associated with the given asset. + """ conn = get_default_client() - return conn.list_streaming_checklists() + return conn.list_streaming_checklists(asset) def wait_until_ingestions_complete(datasets: list[Dataset]) -> None: