Skip to content

feat: add streaming checklists #126

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions nominal/core/_clientsbunch.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
scout,
scout_assets,
scout_catalog,
scout_checklistexecution_api,
scout_checks_api,
scout_compute_api,
scout_compute_representation_api,
Expand Down Expand Up @@ -54,6 +55,7 @@ class ClientsBunch:
storage_writer: storage_writer_api.NominalChannelWriterService
template: scout.TemplateService
notebook: scout.NotebookService
checklist_execution: scout_checklistexecution_api.ChecklistExecutionService

@classmethod
def from_config(cls, cfg: ServiceConfiguration, agent: str, token: str) -> Self:
Expand Down Expand Up @@ -82,6 +84,7 @@ def from_config(cls, cfg: ServiceConfiguration, agent: str, token: str) -> Self:
storage_writer=client_factory(storage_writer_api.NominalChannelWriterService),
template=client_factory(scout.TemplateService),
notebook=client_factory(scout.NotebookService),
checklist_execution=client_factory(scout_checklistexecution_api.ChecklistExecutionService),
)


Expand Down
56 changes: 54 additions & 2 deletions nominal/core/checklist.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
from __future__ import annotations

from dataclasses import dataclass, field
from datetime import timedelta
from typing import Literal, Mapping, Protocol, Sequence

from typing_extensions import Self

from nominal._api.scout_service_api import (
api,
scout_api,
scout_checklistexecution_api,
scout_checks_api,
scout_compute_api,
scout_compute_representation_api,
scout_integrations_api,
scout_run_api,
)
from nominal.core._clientsbunch import HasAuthHeader
from nominal.core._utils import HasRid
from nominal.core._utils import HasRid, rid_from_instance_or_string
from nominal.core.asset import Asset


# TODO(ritwikdixit): add support for more fields i.e. lineage
Expand Down Expand Up @@ -112,6 +116,8 @@ class _Clients(HasAuthHeader, Protocol):
def checklist(self) -> scout_checks_api.ChecklistService: ...
@property
def compute_representation(self) -> scout_compute_representation_api.ComputeRepresentationService: ...
@property
def checklist_execution(self) -> scout_checklistexecution_api.ChecklistExecutionService: ...

@classmethod
def _from_conjure(cls, clients: _Clients, checklist: scout_checks_api.VersionedChecklist) -> Self:
Expand All @@ -133,7 +139,6 @@ def _from_conjure(cls, clients: _Clients, checklist: scout_checks_api.VersionedC
)
}

# # TODO(ritwikdixit): remove the need for these extraneous network requests
variable_names_to_expressions = clients.compute_representation.batch_compute_to_expression(
clients.auth_header, variable_name_to_graph_map
)
Expand Down Expand Up @@ -170,6 +175,49 @@ def _from_conjure(cls, clients: _Clients, checklist: scout_checks_api.VersionedC
_clients=clients,
)

def execute_streaming(
self,
assets: Sequence[Asset | str],
notification_configurations: Sequence[str],
*,
evaluation_delay: timedelta = timedelta(),
recovery_delay: timedelta = timedelta(seconds=15),
) -> None:
"""Execute the checklist for the given assets.
- `assets`: Can be `Asset` instances, or Asset RIDs.
- `notification_configurations`: Integration RIDs, checklist violations will be sent to the specified
integrations.
- `evaluation_delay`: Delays the evaluation of the streaming checklist. This is useful for when data is delayed.
- `recovery_delay`: Specifies the minimum amount of time that must pass before a check can recover from a
failure. Minimum value is 15 seconds.
"""
self._clients.checklist_execution.execute_streaming_checklist(
self._clients.auth_header,
scout_checklistexecution_api.ExecuteChecklistForAssetsRequest(
assets=[rid_from_instance_or_string(asset) for asset in assets],
checklist=self.rid,
notification_configurations=[
scout_integrations_api.NotificationConfiguration(c) for c in notification_configurations or []
],
evaluation_delay=_to_api_duration(evaluation_delay),
recovery_delay=_to_api_duration(recovery_delay),
),
)

def stop_streaming(self) -> None:
"""Stop the checklist."""
self._clients.checklist_execution.stop_streaming_checklist(self._clients.auth_header, self.rid)

def stop_streaming_for_assets(self, assets: Sequence[Asset | str]) -> None:
"""Stop the checklist for the given assets."""
self._clients.checklist_execution.stop_streaming_checklist_for_assets(
self._clients.auth_header,
scout_checklistexecution_api.StopStreamingChecklistForAssetsRequest(
assets=[rid_from_instance_or_string(asset) for asset in assets],
checklist=self.rid,
),
)


Priority = Literal[0, 1, 2, 3, 4]

Expand Down Expand Up @@ -467,3 +515,7 @@ def _timestamp(
self, timestamp: scout_checks_api.TimestampLocator
) -> scout_compute_representation_api.ComputeRepresentationVariableValue | None:
return None


def _to_api_duration(duration: timedelta) -> scout_run_api.Duration:
return scout_run_api.Duration(seconds=duration.seconds, nanos=duration.microseconds * 1000)
5 changes: 5 additions & 0 deletions nominal/core/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,11 @@ def search_assets(
"""
return list(self._iter_search_assets(search_text, label, property))

def list_streaming_checklists(self) -> Sequence[str]:
"""List all Streaming Checklists."""
response = self._clients.checklist_execution.list_streaming_checklist(self._clients.auth_header)
return response


def _create_search_runs_query(
start: datetime | IntegralNanosecondsUTC | None = None,
Expand Down
6 changes: 6 additions & 0 deletions nominal/nominal.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,12 @@ def search_assets(
return list(assets)


def list_streaming_checklists() -> Sequence[str]:
"""List all Streaming Checklists."""
conn = get_default_client()
return conn.list_streaming_checklists()


def wait_until_ingestions_complete(datasets: list[Dataset]) -> None:
"""Wait until all datasets have completed ingestion.

Expand Down
Loading