Skip to content

Commit f329875

Browse files
authored
Improve error message inference from failed workflow runs (#753)
This PR adds heuristics to determine the actual remote error type based on message content. This improves integration test flakiness for those that call `run_workflow`.
1 parent b616354 commit f329875

File tree

3 files changed

+160
-17
lines changed

3 files changed

+160
-17
lines changed

src/databricks/labs/ucx/install.py

Lines changed: 68 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import json
33
import logging
44
import os
5+
import re
56
import sys
67
import time
78
import webbrowser
@@ -12,15 +13,33 @@
1213
import yaml
1314
from databricks.labs.blueprint.entrypoint import get_logger
1415
from databricks.labs.blueprint.installer import InstallState
15-
from databricks.labs.blueprint.parallel import Threads
16+
from databricks.labs.blueprint.parallel import ManyError, Threads
1617
from databricks.labs.blueprint.tui import Prompts
1718
from databricks.labs.blueprint.wheels import ProductInfo, Wheels, find_project_root
1819
from databricks.sdk import WorkspaceClient
1920
from databricks.sdk.errors import (
21+
Aborted,
22+
AlreadyExists,
23+
BadRequest,
24+
Cancelled,
25+
DatabricksError,
26+
DataLoss,
27+
DeadlineExceeded,
28+
InternalError,
2029
InvalidParameterValue,
2130
NotFound,
31+
NotImplemented,
2232
OperationFailed,
2333
PermissionDenied,
34+
RequestLimitExceeded,
35+
ResourceAlreadyExists,
36+
ResourceConflict,
37+
ResourceDoesNotExist,
38+
ResourceExhausted,
39+
TemporarilyUnavailable,
40+
TooManyRequests,
41+
Unauthenticated,
42+
Unknown,
2443
)
2544
from databricks.sdk.service import compute, jobs
2645
from databricks.sdk.service.jobs import RunLifeCycleState, RunResultState
@@ -248,16 +267,18 @@ def run_workflow(self, step: str):
248267
job_run_waiter = self._ws.jobs.run_now(job_id)
249268
try:
250269
job_run_waiter.result()
251-
except OperationFailed:
270+
except OperationFailed as err:
252271
# currently we don't have any good message from API, so we have to work around it.
253272
job_run = self._ws.jobs.get_run(job_run_waiter.run_id)
254-
messages = []
273+
errors: list[DatabricksError] = []
274+
timeouts: list[DeadlineExceeded] = []
255275
assert job_run.tasks is not None
256276
for run_task in job_run.tasks:
257277
if not run_task.state:
258278
continue
259279
if run_task.state.result_state == jobs.RunResultState.TIMEDOUT:
260-
messages.append(f"{run_task.task_key}: The run was stopped after reaching the timeout")
280+
msg = f"{run_task.task_key}: The run was stopped after reaching the timeout"
281+
timeouts.append(DeadlineExceeded(msg))
261282
continue
262283
if run_task.state.result_state != jobs.RunResultState.FAILED:
263284
continue
@@ -267,13 +288,51 @@ def run_workflow(self, step: str):
267288
if run_output and run_output.error_trace:
268289
sys.stderr.write(run_output.error_trace)
269290
if run_output and run_output.error:
270-
messages.append(f"{run_task.task_key}: {run_output.error}")
271-
else:
272-
messages.append(f"{run_task.task_key}: output unavailable")
291+
errors.append(self._infer_task_exception(f"{run_task.task_key}: {run_output.error}"))
273292
assert job_run.state is not None
274293
assert job_run.state.state_message is not None
275-
msg = f'{job_run.state.state_message.rstrip(".")}: {", ".join(messages)}'
276-
raise OperationFailed(msg) from None
294+
if len(errors) == 1:
295+
raise errors[0] from err
296+
all_errors = errors + timeouts
297+
if len(all_errors) == 0:
298+
raise Unknown(job_run.state.state_message) from err
299+
raise ManyError(all_errors) from err
300+
301+
@staticmethod
302+
def _infer_task_exception(haystack: str) -> DatabricksError:
303+
needles = [
304+
BadRequest,
305+
Unauthenticated,
306+
PermissionDenied,
307+
NotFound,
308+
ResourceConflict,
309+
TooManyRequests,
310+
Cancelled,
311+
InternalError,
312+
NotImplemented,
313+
TemporarilyUnavailable,
314+
DeadlineExceeded,
315+
InvalidParameterValue,
316+
ResourceDoesNotExist,
317+
Aborted,
318+
AlreadyExists,
319+
ResourceAlreadyExists,
320+
ResourceExhausted,
321+
RequestLimitExceeded,
322+
Unknown,
323+
DataLoss,
324+
]
325+
constructors: dict[re.Pattern, type[DatabricksError]] = {
326+
re.compile(r".*\[TABLE_OR_VIEW_NOT_FOUND] (.*)"): NotFound,
327+
re.compile(r".*\[SCHEMA_NOT_FOUND] (.*)"): NotFound,
328+
}
329+
for klass in needles:
330+
constructors[re.compile(f".*{klass.__name__}: (.*)")] = klass
331+
for pattern, klass in constructors.items():
332+
match = pattern.match(haystack)
333+
if match:
334+
return klass(match.group(1))
335+
return Unknown(haystack)
277336

278337
def _create_dashboards(self):
279338
logger.info("Creating dashboards...")

tests/integration/test_installation.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
import pytest
88
from databricks.labs.blueprint.parallel import Threads
9-
from databricks.sdk.errors import InvalidParameterValue, NotFound, OperationFailed
9+
from databricks.sdk.errors import InvalidParameterValue, NotFound
1010
from databricks.sdk.retries import retried
1111
from databricks.sdk.service.iam import PermissionLevel
1212

@@ -67,7 +67,7 @@ def test_job_failure_propagates_correct_error_message_and_logs(ws, sql_backend,
6767

6868
sql_backend.execute(f"DROP SCHEMA {install.current_config.inventory_database} CASCADE")
6969

70-
with pytest.raises(OperationFailed) as failure:
70+
with pytest.raises(NotFound) as failure:
7171
install.run_workflow("099-destroy-schema")
7272

7373
assert "cannot be found" in str(failure.value)
@@ -76,7 +76,7 @@ def test_job_failure_propagates_correct_error_message_and_logs(ws, sql_backend,
7676
assert len(workflow_run_logs) == 1
7777

7878

79-
@retried(on=[NotFound, InvalidParameterValue, OperationFailed], timeout=timedelta(minutes=10))
79+
@retried(on=[NotFound, InvalidParameterValue], timeout=timedelta(minutes=6))
8080
def test_running_real_assessment_job(
8181
ws, new_installation, make_ucx_group, make_cluster_policy, make_cluster_policy_permissions
8282
):
@@ -130,7 +130,7 @@ def test_running_real_migrate_groups_job(
130130
assert found[f"{install.current_config.renamed_group_prefix}{ws_group_a.display_name}"] == PermissionLevel.CAN_USE
131131

132132

133-
@retried(on=[NotFound, InvalidParameterValue, OperationFailed], timeout=timedelta(minutes=5))
133+
@retried(on=[NotFound, InvalidParameterValue], timeout=timedelta(minutes=5))
134134
def test_running_real_remove_backup_groups_job(ws, sql_backend, new_installation, make_ucx_group):
135135
ws_group_a, acc_group_a = make_ucx_group()
136136

@@ -149,12 +149,12 @@ def test_running_real_remove_backup_groups_job(ws, sql_backend, new_installation
149149
ws.groups.get(ws_group_a.id)
150150

151151

152-
@retried(on=[NotFound, InvalidParameterValue, OperationFailed], timeout=timedelta(minutes=10))
152+
@retried(on=[NotFound, InvalidParameterValue], timeout=timedelta(minutes=10))
153153
def test_repair_run_workflow_job(ws, mocker, new_installation, sql_backend):
154154
install = new_installation()
155155
mocker.patch("webbrowser.open")
156156
sql_backend.execute(f"DROP SCHEMA {install.current_config.inventory_database} CASCADE")
157-
with pytest.raises(OperationFailed):
157+
with pytest.raises(NotFound):
158158
install.run_workflow("099-destroy-schema")
159159

160160
sql_backend.execute(f"CREATE SCHEMA IF NOT EXISTS {install.current_config.inventory_database}")

tests/unit/test_install.py

Lines changed: 86 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,15 @@
66
import pytest
77
import yaml
88
from databricks.labs.blueprint.installer import InstallState
9+
from databricks.labs.blueprint.parallel import ManyError
910
from databricks.labs.blueprint.tui import MockPrompts
1011
from databricks.labs.blueprint.wheels import Wheels, find_project_root
1112
from databricks.sdk.errors import (
1213
InvalidParameterValue,
1314
NotFound,
1415
OperationFailed,
1516
PermissionDenied,
17+
Unknown,
1618
)
1719
from databricks.sdk.service import iam, jobs, sql
1820
from databricks.sdk.service.compute import (
@@ -248,10 +250,92 @@ def result():
248250
ws.jobs.get_run_output.return_value = jobs.RunOutput(error="does not compute", error_trace="# goes to stderr")
249251
installer = WorkspaceInstaller(ws)
250252
installer._state.jobs = {"foo": "111"}
251-
with pytest.raises(OperationFailed) as failure:
253+
with pytest.raises(Unknown) as failure:
252254
installer.run_workflow("foo")
253255

254-
assert "Stuff happens: stuff: does not compute" == str(failure.value)
256+
assert "stuff: does not compute" == str(failure.value)
257+
258+
259+
def test_run_workflow_creates_failure_from_mapping(ws, mocker):
260+
def run_now(job_id):
261+
assert 111 == job_id
262+
263+
def result():
264+
raise OperationFailed(...)
265+
266+
waiter = mocker.Mock()
267+
waiter.result = result
268+
waiter.run_id = "qux"
269+
return waiter
270+
271+
ws.jobs.run_now = run_now
272+
ws.jobs.get_run.return_value = jobs.Run(
273+
state=jobs.RunState(state_message="Stuff happens."),
274+
tasks=[
275+
jobs.RunTask(
276+
task_key="stuff",
277+
state=jobs.RunState(result_state=jobs.RunResultState.FAILED),
278+
run_id=123,
279+
)
280+
],
281+
)
282+
ws.jobs.get_run_output.return_value = jobs.RunOutput(
283+
error="something: PermissionDenied: does not compute", error_trace="# goes to stderr"
284+
)
285+
installer = WorkspaceInstaller(ws)
286+
installer._state.jobs = {"foo": "111"}
287+
with pytest.raises(PermissionDenied) as failure:
288+
installer.run_workflow("foo")
289+
290+
assert str(failure.value) == "does not compute"
291+
292+
293+
def test_run_workflow_creates_failure_many_error(ws, mocker):
294+
def run_now(job_id):
295+
assert 111 == job_id
296+
297+
def result():
298+
raise OperationFailed(...)
299+
300+
waiter = mocker.Mock()
301+
waiter.result = result
302+
waiter.run_id = "qux"
303+
return waiter
304+
305+
ws.jobs.run_now = run_now
306+
ws.jobs.get_run.return_value = jobs.Run(
307+
state=jobs.RunState(state_message="Stuff happens."),
308+
tasks=[
309+
jobs.RunTask(
310+
task_key="stuff",
311+
state=jobs.RunState(result_state=jobs.RunResultState.FAILED),
312+
run_id=123,
313+
),
314+
jobs.RunTask(
315+
task_key="things",
316+
state=jobs.RunState(result_state=jobs.RunResultState.TIMEDOUT),
317+
run_id=124,
318+
),
319+
jobs.RunTask(
320+
task_key="some",
321+
state=jobs.RunState(result_state=jobs.RunResultState.FAILED),
322+
run_id=125,
323+
),
324+
],
325+
)
326+
ws.jobs.get_run_output.return_value = jobs.RunOutput(
327+
error="something: DataLoss: does not compute", error_trace="# goes to stderr"
328+
)
329+
installer = WorkspaceInstaller(ws)
330+
installer._state.jobs = {"foo": "111"}
331+
with pytest.raises(ManyError) as failure:
332+
installer.run_workflow("foo")
333+
334+
assert str(failure.value) == (
335+
"Detected 3 failures: "
336+
"DataLoss: does not compute, "
337+
"DeadlineExceeded: things: The run was stopped after reaching the timeout"
338+
)
255339

256340

257341
def test_save_config(ws):

0 commit comments

Comments
 (0)