Skip to content

Commit 8b8d66a

Browse files
authored
Scope crawled pipelines in PipelineCrawler (#3513)
## Changes Scope crawled pipelines in PipelineCrawler using the new include_pipeline_ids argument ### Linked issues Introduces #3493 ### Tests - [x] added unit tests
1 parent edae305 commit 8b8d66a

File tree

2 files changed

+39
-15
lines changed

2 files changed

+39
-15
lines changed

src/databricks/labs/ucx/assessment/pipelines.py

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,31 +29,41 @@ class PipelineInfo:
2929

3030

3131
class PipelinesCrawler(CrawlerBase[PipelineInfo], CheckClusterMixin):
32-
def __init__(self, ws: WorkspaceClient, sql_backend: SqlBackend, schema):
32+
def __init__(
33+
self, ws: WorkspaceClient, sql_backend: SqlBackend, schema, include_pipeline_ids: list[str] | None = None
34+
):
3335
super().__init__(sql_backend, "hive_metastore", schema, "pipelines", PipelineInfo)
3436
self._ws = ws
37+
self._include_pipeline_ids = include_pipeline_ids
3538

3639
def _crawl(self) -> Iterable[PipelineInfo]:
37-
all_pipelines = list(self._ws.pipelines.list_pipelines())
38-
for pipeline in all_pipelines:
40+
41+
pipeline_ids = []
42+
if self._include_pipeline_ids is not None:
43+
pipeline_ids = self._include_pipeline_ids
44+
else:
45+
pipeline_ids = [p.pipeline_id for p in self._ws.pipelines.list_pipelines() if p.pipeline_id]
46+
47+
for pipeline_id in pipeline_ids:
48+
try:
49+
pipeline = self._ws.pipelines.get(pipeline_id)
50+
assert pipeline.pipeline_id is not None
51+
assert pipeline.spec is not None
52+
except NotFound:
53+
logger.warning(f"Pipeline not found: {pipeline_id}")
54+
continue
55+
3956
creator_name = pipeline.creator_user_name or None
4057
if not creator_name:
4158
logger.warning(
4259
f"Pipeline {pipeline.name} have Unknown creator, it means that the original creator "
4360
f"has been deleted and should be re-created"
4461
)
45-
try:
46-
assert pipeline.pipeline_id is not None
47-
pipeline_response = self._ws.pipelines.get(pipeline.pipeline_id)
48-
except NotFound:
49-
logger.warning(f"Pipeline disappeared, cannot assess: {pipeline.name} (id={pipeline.pipeline_id})")
50-
continue
51-
assert pipeline_response.spec is not None
52-
pipeline_config = pipeline_response.spec.configuration
62+
pipeline_config = pipeline.spec.configuration
5363
failures = []
5464
if pipeline_config:
5565
failures.extend(self._check_spark_conf(pipeline_config, "pipeline"))
56-
clusters = pipeline_response.spec.clusters
66+
clusters = pipeline.spec.clusters
5767
if clusters:
5868
self._pipeline_clusters(clusters, failures)
5969
failures_as_json = json.dumps(failures)

tests/unit/assessment/test_pipelines.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,15 @@ def test_pipeline_list_with_no_config():
5353
assert len(crawler) == 0
5454

5555

56+
def test_include_pipeline_ids():
57+
ws = mock_workspace_client(pipeline_ids=['empty-spec', 'spec-with-spn'])
58+
crawler = PipelinesCrawler(ws, MockBackend(), "ucx", include_pipeline_ids=['empty-spec'])
59+
result_set = list(crawler.snapshot())
60+
61+
assert len(result_set) == 1
62+
assert result_set[0].pipeline_id == 'empty-spec'
63+
64+
5665
def test_pipeline_disappears_during_crawl(ws, mock_backend, caplog) -> None:
5766
"""Check that crawling doesn't fail if a pipeline is deleted after we list the pipelines but before we assess it."""
5867
ws.pipelines.list_pipelines.return_value = (
@@ -63,7 +72,7 @@ def test_pipeline_disappears_during_crawl(ws, mock_backend, caplog) -> None:
6372
def mock_get(pipeline_id: str) -> GetPipelineResponse:
6473
if pipeline_id == "2":
6574
raise ResourceDoesNotExist("Simulated disappearance")
66-
return GetPipelineResponse(pipeline_id=pipeline_id, spec=PipelineSpec(id=pipeline_id))
75+
return GetPipelineResponse(pipeline_id=pipeline_id, spec=PipelineSpec(id=pipeline_id), name="will_remain")
6776

6877
ws.pipelines.get = mock_get
6978

@@ -73,7 +82,7 @@ def mock_get(pipeline_id: str) -> GetPipelineResponse:
7382
assert results == [
7483
PipelineInfo(pipeline_id="1", pipeline_name="will_remain", creator_name=None, success=1, failures="[]")
7584
]
76-
assert "Pipeline disappeared, cannot assess: will_disappear (id=2)" in caplog.messages
85+
assert "Pipeline not found: 2" in caplog.messages
7786

7887

7988
def test_pipeline_crawler_creator():
@@ -83,7 +92,12 @@ def test_pipeline_crawler_creator():
8392
PipelineStateInfo(pipeline_id="2", creator_user_name=""),
8493
PipelineStateInfo(pipeline_id="3", creator_user_name="bob"),
8594
)
86-
ws.pipelines.get = create_autospec(GetPipelineResponse) # pylint: disable=mock-no-usage
95+
ws.pipelines.get = create_autospec(GetPipelineResponse)
96+
ws.pipelines.get.side_effect = [
97+
GetPipelineResponse(pipeline_id="1", spec=PipelineSpec(id="1"), creator_user_name=None),
98+
GetPipelineResponse(pipeline_id="2", spec=PipelineSpec(id="2"), creator_user_name=""),
99+
GetPipelineResponse(pipeline_id="3", spec=PipelineSpec(id="3"), creator_user_name="bob"),
100+
]
87101
result = PipelinesCrawler(ws, MockBackend(), "ucx").snapshot(force_refresh=True)
88102

89103
expected_creators = [None, None, "bob"]

0 commit comments

Comments
 (0)