Skip to content

Commit e0c7b12

Browse files
authored
Ensure that pipeline assessment doesn't fail if a pipeline is deleted… (#3034)
## Changes This PR updates the pipelines (DLT) crawler so that crawling doesn't fail if a pipeline is deleted while we are iterating over them all. Instead of failing a warning is now logged and the crawling continues. ### Functionality - modified existing workflows: `assessment`, `migration-progress-experimental` ### Tests - added unit tests - existing integration tests
1 parent 8837bb4 commit e0c7b12

File tree

2 files changed

+42
-18
lines changed

2 files changed

+42
-18
lines changed

src/databricks/labs/ucx/assessment/pipelines.py

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
from databricks.labs.lsql.backends import SqlBackend
77
from databricks.sdk import WorkspaceClient
8+
from databricks.sdk.errors import NotFound
89

910
from databricks.labs.ucx.assessment.clusters import CheckClusterMixin
1011
from databricks.labs.ucx.framework.crawlers import CrawlerBase
@@ -31,37 +32,35 @@ def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema):
3132

3233
def _crawl(self) -> Iterable[PipelineInfo]:
3334
all_pipelines = list(self._ws.pipelines.list_pipelines())
34-
return list(self._assess_pipelines(all_pipelines))
35-
36-
def _assess_pipelines(self, all_pipelines) -> Iterable[PipelineInfo]:
3735
for pipeline in all_pipelines:
3836
creator_name = pipeline.creator_user_name or None
3937
if not creator_name:
4038
logger.warning(
4139
f"Pipeline {pipeline.name} have Unknown creator, it means that the original creator "
4240
f"has been deleted and should be re-created"
4341
)
44-
pipeline_info = PipelineInfo(
45-
pipeline_id=pipeline.pipeline_id,
46-
pipeline_name=pipeline.name,
47-
creator_name=creator_name,
48-
success=1,
49-
failures="[]",
50-
)
51-
52-
failures = []
53-
pipeline_response = self._ws.pipelines.get(pipeline.pipeline_id)
42+
try:
43+
assert pipeline.pipeline_id is not None
44+
pipeline_response = self._ws.pipelines.get(pipeline.pipeline_id)
45+
except NotFound:
46+
logger.warning(f"Pipeline disappeared, cannot assess: {pipeline.name} (id={pipeline.pipeline_id})")
47+
continue
5448
assert pipeline_response.spec is not None
5549
pipeline_config = pipeline_response.spec.configuration
50+
failures = []
5651
if pipeline_config:
5752
failures.extend(self._check_spark_conf(pipeline_config, "pipeline"))
5853
clusters = pipeline_response.spec.clusters
5954
if clusters:
6055
self._pipeline_clusters(clusters, failures)
61-
pipeline_info.failures = json.dumps(failures)
62-
if len(failures) > 0:
63-
pipeline_info.success = 0
64-
yield pipeline_info
56+
failures_as_json = json.dumps(failures)
57+
yield PipelineInfo(
58+
pipeline_id=pipeline.pipeline_id,
59+
pipeline_name=pipeline.name,
60+
creator_name=creator_name,
61+
success=int(not failures),
62+
failures=failures_as_json,
63+
)
6564

6665
def _pipeline_clusters(self, clusters, failures):
6766
for cluster in clusters:

tests/unit/assessment/test_pipelines.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
import logging
12
from unittest.mock import create_autospec
23

34
from databricks.labs.lsql.backends import MockBackend
4-
from databricks.sdk.service.pipelines import GetPipelineResponse, PipelineStateInfo
5+
from databricks.sdk.service.pipelines import GetPipelineResponse, PipelineStateInfo, PipelineSpec
6+
from databricks.sdk.errors import ResourceDoesNotExist
57

68
from databricks.labs.ucx.assessment.azure import AzureServicePrincipalCrawler
79
from databricks.labs.ucx.assessment.pipelines import PipelineOwnership, PipelineInfo, PipelinesCrawler
@@ -47,6 +49,29 @@ def test_pipeline_list_with_no_config():
4749
assert len(crawler) == 0
4850

4951

52+
def test_pipeline_disappears_during_crawl(ws, mock_backend, caplog) -> None:
53+
"""Check that crawling doesn't fail if a pipeline is deleted after we list the pipelines but before we assess it."""
54+
ws.pipelines.list_pipelines.return_value = (
55+
PipelineStateInfo(pipeline_id="1", name="will_remain"),
56+
PipelineStateInfo(pipeline_id="2", name="will_disappear"),
57+
)
58+
59+
def mock_get(pipeline_id: str) -> GetPipelineResponse:
60+
if pipeline_id == "2":
61+
raise ResourceDoesNotExist("Simulated disappearance")
62+
return GetPipelineResponse(pipeline_id=pipeline_id, spec=PipelineSpec(id=pipeline_id))
63+
64+
ws.pipelines.get = mock_get
65+
66+
with caplog.at_level(logging.WARNING):
67+
results = PipelinesCrawler(ws, mock_backend, "a_schema").snapshot()
68+
69+
assert results == [
70+
PipelineInfo(pipeline_id="1", pipeline_name="will_remain", creator_name=None, success=1, failures="[]")
71+
]
72+
assert "Pipeline disappeared, cannot assess: will_disappear (id=2)" in caplog.messages
73+
74+
5075
def test_pipeline_crawler_creator():
5176
ws = mock_workspace_client()
5277
ws.pipelines.list_pipelines.return_value = (

0 commit comments

Comments
 (0)