|
11 | 11 | import logging
|
12 | 12 | import os
|
13 | 13 | import re
|
| 14 | +import time |
14 | 15 | import traceback
|
15 | 16 | from datetime import date, datetime
|
16 | 17 | from typing import Callable, Optional, Union
|
|
20 | 21 | import yaml
|
21 | 22 | from ads.common import auth
|
22 | 23 | from ads.common.decorator.utils import class_or_instance_method
|
23 |
| -from ads.common.utils import camel_to_snake |
| 24 | +from ads.common.utils import camel_to_snake, get_progress_bar |
24 | 25 | from ads.config import COMPARTMENT_OCID
|
25 | 26 | from dateutil import tz
|
26 | 27 | from dateutil.parser import parse
|
|
29 | 30 | logger = logging.getLogger(__name__)
|
30 | 31 |
|
31 | 32 | LIFECYCLE_STOP_STATE = ("SUCCEEDED", "FAILED", "CANCELED", "DELETED")
|
| 33 | +WORK_REQUEST_STOP_STATE = ("SUCCEEDED", "FAILED", "CANCELED") |
| 34 | +DEFAULT_WAIT_TIME = 1200 |
| 35 | +DEFAULT_POLL_INTERVAL = 10 |
| 36 | +DEFAULT_WORKFLOW_STEPS = 2 |
32 | 37 |
|
33 | 38 |
|
34 | 39 | class MergeStrategy(Enum):
|
@@ -931,6 +936,76 @@ def get_work_request_response(
|
931 | 936 | )
|
932 | 937 | return work_request_response
|
933 | 938 |
|
| 939 | + def wait_for_progress( |
| 940 | + self, |
| 941 | + work_request_id: str, |
| 942 | + num_steps: int = DEFAULT_WORKFLOW_STEPS, |
| 943 | + max_wait_time: int = DEFAULT_WAIT_TIME, |
| 944 | + poll_interval: int = DEFAULT_POLL_INTERVAL |
| 945 | + ): |
| 946 | + """Waits for the work request progress bar to be completed. |
| 947 | +
|
| 948 | + Parameters |
| 949 | + ---------- |
| 950 | + work_request_id: str |
| 951 | + Work Request OCID. |
| 952 | + num_steps: (int, optional). Defaults to 2. |
| 953 | + Number of steps for the progress indicator. |
| 954 | + max_wait_time: int |
| 955 | + Maximum amount of time to wait in seconds (Defaults to 1200). |
| 956 | + Negative implies infinite wait time. |
| 957 | + poll_interval: int |
| 958 | + Poll interval in seconds (Defaults to 10). |
| 959 | +
|
| 960 | + Returns |
| 961 | + ------- |
| 962 | + None |
| 963 | + """ |
| 964 | + work_request_logs = [] |
| 965 | + |
| 966 | + i = 0 |
| 967 | + start_time = time.time() |
| 968 | + with get_progress_bar(num_steps) as progress: |
| 969 | + seconds_since = time.time() - start_time |
| 970 | + exceed_max_time = max_wait_time > 0 and seconds_since >= max_wait_time |
| 971 | + if exceed_max_time: |
| 972 | + logger.error( |
| 973 | + f"Max wait time ({max_wait_time} seconds) exceeded." |
| 974 | + ) |
| 975 | + while not exceed_max_time and (not work_request_logs or len(work_request_logs) < num_steps): |
| 976 | + time.sleep(poll_interval) |
| 977 | + new_work_request_logs = [] |
| 978 | + |
| 979 | + try: |
| 980 | + work_request = self.client.get_work_request(work_request_id).data |
| 981 | + work_request_logs = self.client.list_work_request_logs( |
| 982 | + work_request_id |
| 983 | + ).data |
| 984 | + except Exception as ex: |
| 985 | + logger.warn(ex) |
| 986 | + |
| 987 | + new_work_request_logs = ( |
| 988 | + work_request_logs[i:] if work_request_logs else [] |
| 989 | + ) |
| 990 | + |
| 991 | + for wr_item in new_work_request_logs: |
| 992 | + progress.update(wr_item.message) |
| 993 | + i += 1 |
| 994 | + |
| 995 | + if work_request and work_request.status in WORK_REQUEST_STOP_STATE: |
| 996 | + if work_request.status != "SUCCEEDED": |
| 997 | + if new_work_request_logs: |
| 998 | + raise Exception(new_work_request_logs[-1].message) |
| 999 | + else: |
| 1000 | + raise Exception( |
| 1001 | + "Error occurred in attempt to perform the operation. " |
| 1002 | + "Check the service logs to get more details. " |
| 1003 | + f"{work_request}" |
| 1004 | + ) |
| 1005 | + else: |
| 1006 | + break |
| 1007 | + progress.update("Done") |
| 1008 | + |
934 | 1009 |
|
935 | 1010 | class OCIModelWithNameMixin:
|
936 | 1011 | """Mixin class to operate OCI model which contains name property."""
|
|
0 commit comments