Skip to content

Commit 123c89a

Browse files
authored
Combine static code analysis results with historical job snapshots (#3074)
Fix #3059
1 parent 51f768a commit 123c89a

File tree

3 files changed

+123
-2
lines changed

3 files changed

+123
-2
lines changed

src/databricks/labs/ucx/contexts/workflow_task.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from databricks.labs.ucx.hive_metastore.udfs import Udf
2828
from databricks.labs.ucx.installer.logs import TaskRunWarningRecorder
2929
from databricks.labs.ucx.progress.history import ProgressEncoder
30+
from databricks.labs.ucx.progress.jobs import JobsProgressEncoder
3031
from databricks.labs.ucx.progress.workflow_runs import WorkflowRunRecorder
3132

3233
# As with GlobalContext, service factories unavoidably have a lot of public methods.
@@ -199,10 +200,10 @@ def grants_progress(self) -> ProgressEncoder[Grant]:
199200

200201
@cached_property
201202
def jobs_progress(self) -> ProgressEncoder[JobInfo]:
202-
return ProgressEncoder(
203+
return JobsProgressEncoder(
203204
self.sql_backend,
204205
self.job_ownership,
205-
JobInfo,
206+
self.inventory_database,
206207
self.parent_run_id,
207208
self.workspace_id,
208209
self.config.ucx_catalog,
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import collections
2+
from dataclasses import replace
3+
from functools import cached_property
4+
5+
from databricks.labs.lsql.backends import SqlBackend
6+
7+
from databricks.labs.ucx.assessment.jobs import JobInfo, JobOwnership
8+
from databricks.labs.ucx.progress.history import ProgressEncoder
9+
from databricks.labs.ucx.progress.install import Historical
10+
from databricks.labs.ucx.source_code.jobs import JobProblem
11+
12+
13+
class JobsProgressEncoder(ProgressEncoder[JobInfo]):
14+
15+
def __init__(
16+
self,
17+
sql_backend: SqlBackend,
18+
ownership: JobOwnership,
19+
inventory_database: str,
20+
run_id: int,
21+
workspace_id: int,
22+
catalog: str,
23+
schema: str = "multiworkspace",
24+
table: str = "historical",
25+
) -> None:
26+
super().__init__(
27+
sql_backend,
28+
ownership,
29+
JobInfo,
30+
run_id,
31+
workspace_id,
32+
catalog,
33+
schema,
34+
table,
35+
)
36+
self._inventory_database = inventory_database
37+
38+
@cached_property
39+
def _job_problems(self) -> dict[int, list[str]]:
40+
index = collections.defaultdict(list)
41+
for row in self._sql_backend.fetch(
42+
'SELECT * FROM workflow_problems',
43+
catalog='hive_metastore',
44+
schema=self._inventory_database,
45+
):
46+
job_problem = JobProblem(**row.asDict())
47+
failure = f'{job_problem.code}: {job_problem.task_key} task: {job_problem.path}: {job_problem.message}'
48+
index[job_problem.job_id].append(failure)
49+
return index
50+
51+
def _encode_record_as_historical(self, record: JobInfo) -> Historical:
52+
historical = super()._encode_record_as_historical(record)
53+
failures = self._job_problems.get(int(record.job_id), [])
54+
return replace(historical, failures=historical.failures + failures)

tests/unit/progress/test_jobs.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
from unittest.mock import create_autospec
2+
3+
from databricks.labs.lsql import Row
4+
from databricks.labs.lsql.backends import MockBackend
5+
6+
from databricks.labs.ucx.assessment.jobs import JobOwnership, JobInfo
7+
from databricks.labs.ucx.progress.jobs import JobsProgressEncoder
8+
from databricks.labs.ucx import __version__
9+
10+
11+
def test_jobs_progress_encoder() -> None:
12+
common = {
13+
'message': 'some failure',
14+
'job_name': 'job_name',
15+
'start_line': 1,
16+
'start_col': 2,
17+
'end_line': 3,
18+
'end_col': 4,
19+
}
20+
sql_backend = MockBackend(
21+
rows={
22+
"workflow_problems": [
23+
Row(job_id=1, code="cannot-autofix-table-reference", task_key="a", path="/some/path", **common),
24+
Row(job_id=1, code="catalog-api-in-shared-clusters", task_key="b", path="/some/other", **common),
25+
Row(job_id=2, code="catalog-api-in-shared-clusters", task_key="c", path="/x", **common),
26+
],
27+
}
28+
)
29+
job_ownership = create_autospec(JobOwnership)
30+
job_ownership.owner_of.return_value = "some_owner"
31+
jobs_progress_encoder = JobsProgressEncoder(
32+
sql_backend,
33+
job_ownership,
34+
"inventory",
35+
2,
36+
3,
37+
"ucx",
38+
)
39+
40+
jobs_progress_encoder.append_inventory_snapshot(
41+
[
42+
JobInfo(
43+
job_id='1',
44+
success=0,
45+
failures='["some failure from config"]',
46+
)
47+
]
48+
)
49+
50+
rows = sql_backend.rows_written_for('`ucx`.`multiworkspace`.`historical`', 'append')
51+
assert rows == [
52+
Row(
53+
workspace_id=3,
54+
job_run_id=2,
55+
object_type='JobInfo',
56+
object_id=['1'],
57+
data={'job_id': '1', 'success': '0'},
58+
failures=[
59+
'some failure from config',
60+
'cannot-autofix-table-reference: a task: /some/path: some failure',
61+
'catalog-api-in-shared-clusters: b task: /some/other: some failure',
62+
],
63+
owner='some_owner',
64+
ucx_version=__version__,
65+
)
66+
]

0 commit comments

Comments
 (0)