Skip to content

Commit 9eb66d4

Browse files
authored
Operator Error Handling (#550)
2 parents 8ea56e8 + 2b3fe8b commit 9eb66d4

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+3956
-2665
lines changed

.github/workflows/run-operators-unit-tests.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ jobs:
2626
test:
2727
name: python ${{ matrix.python-version }}
2828
runs-on: ubuntu-latest
29-
timeout-minutes: 60
29+
timeout-minutes: 180
3030

3131
strategy:
3232
fail-fast: false
@@ -49,7 +49,7 @@ jobs:
4949
name: "Test config setup"
5050

5151
- name: "Run Operators Tests"
52-
timeout-minutes: 60
52+
timeout-minutes: 180
5353
shell: bash
5454
run: |
5555
set -x # print commands that are executed

ads/opctl/operator/cli.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,14 @@
99
import click
1010
import fsspec
1111
import yaml
12+
import logging
1213
from ads.opctl.operator.common.utils import default_signer
1314
from ads.common.auth import AuthType
1415
from ads.common.object_storage_details import ObjectStorageDetails
1516
from ads.opctl.constants import BACKEND_NAME, RUNTIME_TYPE
1617
from ads.opctl.decorator.common import click_options, with_auth, with_click_unknown_args
1718
from ads.opctl.utils import suppress_traceback
19+
from ads.opctl import logger
1820

1921
from .__init__ import __operators__
2022
from .cmd import run as cmd_run
@@ -311,10 +313,14 @@ def publish_conda(debug: bool, **kwargs: Dict[str, Any]) -> None:
311313
@click.pass_context
312314
@with_click_unknown_args
313315
@with_auth
314-
def run(ctx: click.core.Context, debug: bool, **kwargs: Dict[str, Any]) -> None:
316+
def run(ctx: click.core.Context, debug: bool = False, **kwargs: Dict[str, Any]) -> None:
315317
"""
316318
Runs the operator with the given specification on the targeted backend.
317319
"""
320+
if debug:
321+
logger.setLevel(logging.DEBUG)
322+
else:
323+
logger.setLevel(logging.CRITICAL)
318324
operator_spec = {}
319325
backend = kwargs.pop("backend")
320326

ads/opctl/operator/cmd.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
from .common.errors import (
4343
OperatorCondaNotFoundError,
4444
OperatorImageNotFoundError,
45-
OperatorSchemaYamlError,
45+
InvalidParameterError,
4646
)
4747
from .common.operator_loader import _operator_info_list
4848

@@ -415,7 +415,7 @@ def verify(
415415
run_name="verify",
416416
)
417417
operator_module.get("verify")(config, **kwargs)
418-
except OperatorSchemaYamlError as ex:
418+
except InvalidParameterError as ex:
419419
logger.debug(ex)
420420
raise ValueError(
421421
f"The operator's specification is not valid for the `{operator_info.type}` operator. "

ads/opctl/operator/common/errors.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@
77
from ads.opctl.operator import __operators__
88

99

10-
class OperatorSchemaYamlError(Exception):
10+
class InvalidParameterError(Exception):
1111
"""Exception raised when there is an issue with the schema."""
12+
1213
def __init__(self, error: str):
1314
super().__init__(
1415
"Invalid operator specification. Check the YAML structure and ensure it "

ads/opctl/operator/common/operator_config.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from ads.common.serializer import DataClassSerializable
1414

1515
from ads.opctl.operator.common.utils import OperatorValidator
16-
from ads.opctl.operator.common.errors import OperatorSchemaYamlError
16+
from ads.opctl.operator.common.errors import InvalidParameterError
1717

1818

1919
@dataclass(repr=True)
@@ -65,7 +65,7 @@ def _validate_dict(cls, obj_dict: Dict) -> bool:
6565
result = validator.validate(obj_dict)
6666

6767
if not result:
68-
raise OperatorSchemaYamlError(json.dumps(validator.errors, indent=2))
68+
raise InvalidParameterError(json.dumps(validator.errors, indent=2))
6969
return True
7070

7171
@classmethod

ads/opctl/operator/common/utils.py

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -172,23 +172,3 @@ def default_signer(**kwargs):
172172
from ads.common.auth import default_signer
173173

174174
return default_signer(**kwargs)
175-
176-
177-
def human_time_friendly(seconds):
178-
TIME_DURATION_UNITS = (
179-
("week", 60 * 60 * 24 * 7),
180-
("day", 60 * 60 * 24),
181-
("hour", 60 * 60),
182-
("min", 60),
183-
)
184-
if seconds == 0:
185-
return "inf"
186-
accumulator = []
187-
for unit, div in TIME_DURATION_UNITS:
188-
amount, seconds = divmod(float(seconds), div)
189-
if amount > 0:
190-
accumulator.append(
191-
"{} {}{}".format(int(amount), unit, "" if amount == 1 else "s")
192-
)
193-
accumulator.append("{} secs".format(round(seconds, 2)))
194-
return ", ".join(accumulator)

ads/opctl/operator/lowcode/anomaly/__main__.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,17 @@
1515
from ads.opctl.operator.common.const import ENV_OPERATOR_ARGS
1616
from ads.opctl.operator.common.utils import _parse_input_args
1717

18-
from .model.anomaly_dataset import AnomalyDatasets
18+
from .model.anomaly_dataset import AnomalyDatasets, AnomalyData
1919
from .operator_config import AnomalyOperatorConfig
2020

2121

2222
def operate(operator_config: AnomalyOperatorConfig) -> None:
2323
"""Runs the anomaly detection operator."""
2424
from .model.factory import AnomalyOperatorModelFactory
2525

26-
datasets = AnomalyDatasets(operator_config)
26+
datasets = AnomalyDatasets(operator_config.spec)
27+
datasets2 = AnomalyData(operator_config.spec)
28+
print(f"d1: {datasets.data}\n\n d2: {datasets2.data}")
2729
AnomalyOperatorModelFactory.get_model(operator_config, datasets).generate_report()
2830

2931

ads/opctl/operator/lowcode/anomaly/const.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
66

77
from ads.common.extended_enum import ExtendedEnumMeta
8+
from ads.opctl.operator.lowcode.common.const import DataColumns
89

910

1011
class SupportedModels(str, metaclass=ExtendedEnumMeta):
@@ -44,6 +45,7 @@ class TODSSubModels(str, metaclass=ExtendedEnumMeta):
4445
TODSSubModels.KNN: "KNNSKI",
4546
}
4647

48+
4749
class SupportedMetrics(str, metaclass=ExtendedEnumMeta):
4850
UNSUPERVISED_UNIFY95 = "unsupervised_unify95"
4951
UNSUPERVISED_UNIFY95_LOG_LOSS = "unsupervised_unify95_log_loss"
@@ -75,9 +77,11 @@ class SupportedMetrics(str, metaclass=ExtendedEnumMeta):
7577
MEDIAN_MCC = "Median MCC"
7678
ELAPSED_TIME = "Elapsed Time"
7779

80+
7881
class OutputColumns(str, metaclass=ExtendedEnumMeta):
7982
ANOMALY_COL = "anomaly"
8083
SCORE_COL = "score"
84+
Series = DataColumns.Series
8185

8286

8387
TODS_DEFAULT_MODEL = "ocsvm"

ads/opctl/operator/lowcode/anomaly/model/anomaly_dataset.py

Lines changed: 87 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -4,73 +4,91 @@
44
# Copyright (c) 2023 Oracle and/or its affiliates.
55
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
66

7-
from ..operator_config import AnomalyOperatorConfig
8-
from .. import utils
9-
from ads.opctl.operator.common.utils import default_signer
7+
from ..operator_config import AnomalyOperatorSpec
8+
from ads.opctl.operator.lowcode.common.utils import (
9+
default_signer,
10+
load_data,
11+
merge_category_columns,
12+
)
13+
from ads.opctl.operator.lowcode.common.data import AbstractData
14+
from ads.opctl.operator.lowcode.common.data import AbstractData
15+
from ads.opctl.operator.lowcode.anomaly.utils import get_frequency_of_datetime
1016
from ads.opctl import logger
1117
import pandas as pd
1218
from ads.opctl.operator.lowcode.anomaly.const import OutputColumns
1319

1420

21+
class AnomalyData(AbstractData):
22+
def __init__(self, spec: AnomalyOperatorSpec):
23+
super().__init__(spec=spec, name="input_data")
24+
25+
26+
class TestData(AbstractData):
27+
def __init__(self, spec: AnomalyOperatorSpec):
28+
super().__init__(spec=spec, name="test_data")
29+
30+
1531
class AnomalyDatasets:
16-
def __init__(self, config: AnomalyOperatorConfig):
32+
def __init__(self, spec: AnomalyOperatorSpec):
1733
"""Instantiates the DataIO instance.
1834
1935
Properties
2036
----------
21-
config: AnomalyOperatorConfig
22-
The anomaly operator configuration.
37+
spec: AnomalyOperatorSpec
38+
The anomaly operator spec.
2339
"""
24-
self.original_user_data = None
25-
self.data = None
26-
self.test_data = None
27-
self.target_columns = None
28-
self.full_data_dict = None
29-
self._load_data(config.spec)
30-
31-
def _load_data(self, spec):
32-
"""Loads anomaly input data."""
33-
34-
self.data = utils._load_data(
35-
filename=spec.input_data.url,
36-
format=spec.input_data.format,
37-
storage_options=default_signer(),
38-
columns=spec.input_data.columns,
39-
)
40-
self.original_user_data = self.data.copy()
41-
date_col = spec.datetime_column.name
42-
self.data[date_col] = pd.to_datetime(self.data[date_col])
43-
try:
44-
spec.freq = utils.get_frequency_of_datetime(self.data, spec)
45-
except TypeError as e:
46-
logger.warn(
47-
f"Error determining frequency: {e.args}. Setting Frequency to None"
48-
)
49-
logger.debug(f"Full traceback: {e}")
50-
spec.freq = None
51-
52-
if spec.target_category_columns is None:
53-
if spec.target_column is None:
54-
target_col = [
55-
col
56-
for col in self.data.columns
57-
if col not in [spec.datetime_column.name]
58-
]
59-
spec.target_column = target_col[0]
60-
self.full_data_dict = {spec.target_column: self.data}
61-
else:
62-
# Merge target category columns
63-
64-
self.data["__Series__"] = utils._merge_category_columns(self.data, spec.target_category_columns)
65-
unique_categories = self.data["__Series__"].unique()
66-
self.full_data_dict = dict()
67-
68-
for cat in unique_categories:
69-
data_by_cat = (
70-
self.data[self.data["__Series__"] == cat].drop(spec.target_category_columns + ["__Series__"],
71-
axis=1)
72-
)
73-
self.full_data_dict[cat] = data_by_cat
40+
self._data = AnomalyData(spec)
41+
self.data = self._data.get_data_long()
42+
# self.test_data = None
43+
# self.target_columns = None
44+
self.full_data_dict = self._data.get_dict_by_series()
45+
# self._load_data(spec)
46+
47+
# def _load_data(self, spec):
48+
# """Loads anomaly input data."""
49+
# try:
50+
# self.data = load_data(
51+
# filename=spec.input_data.url,
52+
# format=spec.input_data.format,
53+
# columns=spec.input_data.columns,
54+
# )
55+
# except InvalidParameterError as e:
56+
# e.args = e.args + ("Invalid Parameter: input_data",)
57+
# raise e
58+
# date_col = spec.datetime_column.name
59+
# self.data[date_col] = pd.to_datetime(self.data[date_col])
60+
# try:
61+
# spec.freq = get_frequency_of_datetime(self.data, spec)
62+
# except TypeError as e:
63+
# logger.warn(
64+
# f"Error determining frequency: {e.args}. Setting Frequency to None"
65+
# )
66+
# logger.debug(f"Full traceback: {e}")
67+
# spec.freq = None
68+
69+
# if spec.target_category_columns is None:
70+
# if spec.target_column is None:
71+
# target_col = [
72+
# col
73+
# for col in self.data.columns
74+
# if col not in [spec.datetime_column.name]
75+
# ]
76+
# spec.target_column = target_col[0]
77+
# self.full_data_dict = {spec.target_column: self.data}
78+
# else:
79+
# # Merge target category columns
80+
81+
# self.data[OutputColumns.Series] = merge_category_columns(
82+
# self.data, spec.target_category_columns
83+
# )
84+
# unique_categories = self.data[OutputColumns.Series].unique()
85+
# self.full_data_dict = dict()
86+
87+
# for cat in unique_categories:
88+
# data_by_cat = self.data[self.data[OutputColumns.Series] == cat].drop(
89+
# spec.target_category_columns + [OutputColumns.Series], axis=1
90+
# )
91+
# self.full_data_dict[cat] = data_by_cat
7492

7593

7694
class AnomalyOutput:
@@ -93,11 +111,7 @@ def get_inliers_by_cat(self, category: str, data: pd.DataFrame):
93111
inlier_indices = anomaly.index[anomaly[OutputColumns.ANOMALY_COL] == 0]
94112
inliers = data.iloc[inlier_indices]
95113
if scores is not None and not scores.empty:
96-
inliers = pd.merge(
97-
inliers,
98-
scores,
99-
on=self.date_column,
100-
how='inner')
114+
inliers = pd.merge(inliers, scores, on=self.date_column, how="inner")
101115
return inliers
102116

103117
def get_outliers_by_cat(self, category: str, data: pd.DataFrame):
@@ -106,11 +120,7 @@ def get_outliers_by_cat(self, category: str, data: pd.DataFrame):
106120
outliers_indices = anomaly.index[anomaly[OutputColumns.ANOMALY_COL] == 1]
107121
outliers = data.iloc[outliers_indices]
108122
if scores is not None and not scores.empty:
109-
outliers = pd.merge(
110-
outliers,
111-
scores,
112-
on=self.date_column,
113-
how='inner')
123+
outliers = pd.merge(outliers, scores, on=self.date_column, how="inner")
114124
return outliers
115125

116126
def get_inliers(self, data):
@@ -120,9 +130,12 @@ def get_inliers(self, data):
120130
inliers = pd.concat(
121131
[
122132
inliers,
123-
self.get_inliers_by_cat(
124-
category, data[data['__Series__'] == category].reset_index(drop=True).drop('__Series__', axis=1)
125-
)
133+
self.get_inliers_by_cat(
134+
category,
135+
data[data[OutputColumns.Series] == category]
136+
.reset_index(drop=True)
137+
.drop(OutputColumns.Series, axis=1),
138+
),
126139
],
127140
axis=0,
128141
ignore_index=True,
@@ -137,8 +150,11 @@ def get_outliers(self, data):
137150
[
138151
outliers,
139152
self.get_outliers_by_cat(
140-
category, data[data['__Series__'] == category].reset_index(drop=True).drop('__Series__', axis=1)
141-
)
153+
category,
154+
data[data[OutputColumns.Series] == category]
155+
.reset_index(drop=True)
156+
.drop(OutputColumns.Series, axis=1),
157+
),
142158
],
143159
axis=0,
144160
ignore_index=True,

0 commit comments

Comments
 (0)