From 08964ba500f00cfa0c797de6f6fa66e9d1bfc769 Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Wed, 16 Oct 2024 10:54:19 +0200 Subject: [PATCH 01/21] build migration steps for workflow task --- .../labs/ucx/sequencing/__init__.py | 0 .../labs/ucx/sequencing/sequencing.py | 145 ++++++++++++++++++ tests/unit/sequencing/__init__.py | 0 tests/unit/sequencing/test_sequencing.py | 22 +++ 4 files changed, 167 insertions(+) create mode 100644 src/databricks/labs/ucx/sequencing/__init__.py create mode 100644 src/databricks/labs/ucx/sequencing/sequencing.py create mode 100644 tests/unit/sequencing/__init__.py create mode 100644 tests/unit/sequencing/test_sequencing.py diff --git a/src/databricks/labs/ucx/sequencing/__init__.py b/src/databricks/labs/ucx/sequencing/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/databricks/labs/ucx/sequencing/sequencing.py b/src/databricks/labs/ucx/sequencing/sequencing.py new file mode 100644 index 0000000000..f28400c3cd --- /dev/null +++ b/src/databricks/labs/ucx/sequencing/sequencing.py @@ -0,0 +1,145 @@ +from __future__ import annotations + +import itertools +from collections.abc import Iterable +from dataclasses import dataclass, field + +from databricks.sdk.service import jobs + +from databricks.labs.ucx.source_code.graph import DependencyGraph + + +@dataclass +class MigrationStep: + step_id: int + step_number: int + object_type: str + object_id: str + object_owner: str + required_step_ids: list[int] = field(default_factory=list) + + +@dataclass +class MigrationNode: + last_node_id = 0 + node_id: int + object_type: str + object_id: str + object_owner: str + required_steps: list[MigrationNode] = field(default_factory=list) + + def generate_steps(self) -> tuple[MigrationStep, Iterable[MigrationStep]]: + # traverse the nodes using a depth-first algorithm + # ultimate leaves have a step number of 1 + # use highest required step number + 1 for this step + highest_step_number = 0 + required_step_ids: list[int] = [] + all_generated_steps: list[Iterable[MigrationStep]] = [] + for required_step in self.required_steps: + step, generated_steps = required_step.generate_steps() + highest_step_number = max(highest_step_number, step.step_number) + required_step_ids.append(step.step_id) + all_generated_steps.append(generated_steps) + all_generated_steps.append([step]) + this_step = MigrationStep( + step_id=self.node_id, + step_number=highest_step_number + 1, + object_type=self.object_type, + object_id=self.object_id, + object_owner=self.object_owner, + required_step_ids=required_step_ids, + ) + return this_step, itertools.chain(*all_generated_steps) + + def find(self, object_type: str, object_id: str) -> MigrationNode | None: + if object_type == self.object_type and object_id == self.object_id: + return self + for step in self.required_steps: + found = step.find(object_type, object_id) + if found: + return found + return None + + +class MigrationSequencer: + + def __init__(self): + self._root = MigrationNode(node_id=0, object_type="ROOT", object_id="ROOT", object_owner="NONE") + + def register_workflow_task(self, task: jobs.Task, job: jobs.Job, _graph: DependencyGraph) -> MigrationNode: + task_node = self._find_node(object_type="TASK", object_id=task.task_key) + if task_node: + return task_node + job_node = self.register_workflow_job(job) + MigrationNode.last_node_id += 1 + task_node = MigrationNode( + node_id=MigrationNode.last_node_id, object_type="TASK", object_id=task.task_key, object_owner="NONE" + ) # TODO object_owner + job_node.required_steps.append(task_node) + if task.existing_cluster_id: + cluster_node = self.register_cluster(task.existing_cluster_id) + cluster_node.required_steps.append(task_node) + if job_node not in cluster_node.required_steps: + cluster_node.required_steps.append(job_node) + # TODO register dependency graph + return task_node + + def register_workflow_job(self, job: jobs.Job) -> MigrationNode: + job_node = self._find_node(object_type="JOB", object_id=str(job.job_id)) + if job_node: + return job_node + MigrationNode.last_node_id += 1 + job_node = MigrationNode( + node_id=MigrationNode.last_node_id, object_type="JOB", object_id=str(job.job_id), object_owner="NONE" + ) # TODO object_owner + top_level = True + if job.settings and job.settings.job_clusters: + for job_cluster in job.settings.job_clusters: + cluster_node = self.register_job_cluster(job_cluster) + if cluster_node: + top_level = False + cluster_node.required_steps.append(job_node) + if top_level: + self._root.required_steps.append(job_node) + return job_node + + def register_job_cluster(self, cluster: jobs.JobCluster) -> MigrationNode | None: + if cluster.new_cluster: + return None + return self.register_cluster(cluster.job_cluster_key) + + def register_cluster(self, cluster_key: str) -> MigrationNode: + cluster_node = self._find_node(object_type="CLUSTER", object_id=cluster_key) + if cluster_node: + return cluster_node + MigrationNode.last_node_id += 1 + cluster_node = MigrationNode( + node_id=MigrationNode.last_node_id, object_type="CLUSTER", object_id=cluster_key, object_owner="NONE" + ) # TODO object_owner + # TODO register warehouses and policies + self._root.required_steps.append(cluster_node) + return cluster_node + + def generate_steps(self) -> Iterable[MigrationStep]: + _root_step, generated_steps = self._root.generate_steps() + unique_steps = self._deduplicate_steps(generated_steps) + return self._sorted_steps(unique_steps) + + @staticmethod + def _sorted_steps(steps: Iterable[MigrationStep]) -> Iterable[MigrationStep]: + # sort by step number, lowest first + return sorted(steps, key=lambda step: step.step_number) + + @staticmethod + def _deduplicate_steps(steps: Iterable[MigrationStep]) -> Iterable[MigrationStep]: + best_steps: dict[int, MigrationStep] = {} + for step in steps: + existing = best_steps.get(step.step_id, None) + # keep the step with the highest step number + if existing and existing.step_number >= step.step_number: + continue + best_steps[step.step_id] = step + return best_steps.values() + + def _find_node(self, object_type: str, object_id: str) -> MigrationNode | None: + return self._root.find(object_type, object_id) diff --git a/tests/unit/sequencing/__init__.py b/tests/unit/sequencing/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/unit/sequencing/test_sequencing.py b/tests/unit/sequencing/test_sequencing.py new file mode 100644 index 0000000000..094767fc3e --- /dev/null +++ b/tests/unit/sequencing/test_sequencing.py @@ -0,0 +1,22 @@ +from databricks.sdk.service import jobs + +from databricks.labs.ucx.sequencing.sequencing import MigrationSequencer +from databricks.labs.ucx.source_code.base import CurrentSessionState +from databricks.labs.ucx.source_code.graph import DependencyGraph +from databricks.labs.ucx.source_code.jobs import WorkflowTask + + +def test_cluster_from_task_has_children(ws, simple_dependency_resolver, mock_path_lookup): + task = jobs.Task(task_key="test-task", existing_cluster_id="cluster-123") + settings = jobs.JobSettings(name="test-job", tasks=[task]) + job = jobs.Job(job_id=1234, settings=settings) + ws.jobs.get.return_value = job + dependency = WorkflowTask(ws, task, job) + graph = DependencyGraph(dependency, None, simple_dependency_resolver, mock_path_lookup, CurrentSessionState()) + sequencer = MigrationSequencer() + sequencer.register_workflow_task(task, job, graph) + steps = list(sequencer.generate_steps()) + step = steps[-1] + assert step.object_type == "CLUSTER" + assert step.object_id == "cluster-123" + assert step.step_number == 3 From de8ddede01f9e383e0a84ff03daeb022ec19f0e4 Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Wed, 16 Oct 2024 12:06:04 +0200 Subject: [PATCH 02/21] add object name --- .../labs/ucx/sequencing/sequencing.py | 29 +++++++++++++++---- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/src/databricks/labs/ucx/sequencing/sequencing.py b/src/databricks/labs/ucx/sequencing/sequencing.py index f28400c3cd..b12fbcd0b2 100644 --- a/src/databricks/labs/ucx/sequencing/sequencing.py +++ b/src/databricks/labs/ucx/sequencing/sequencing.py @@ -15,6 +15,7 @@ class MigrationStep: step_number: int object_type: str object_id: str + object_name: str object_owner: str required_step_ids: list[int] = field(default_factory=list) @@ -25,6 +26,7 @@ class MigrationNode: node_id: int object_type: str object_id: str + object_name: str object_owner: str required_steps: list[MigrationNode] = field(default_factory=list) @@ -46,6 +48,7 @@ def generate_steps(self) -> tuple[MigrationStep, Iterable[MigrationStep]]: step_number=highest_step_number + 1, object_type=self.object_type, object_id=self.object_id, + object_name=self.object_name, object_owner=self.object_owner, required_step_ids=required_step_ids, ) @@ -64,16 +67,23 @@ def find(self, object_type: str, object_id: str) -> MigrationNode | None: class MigrationSequencer: def __init__(self): - self._root = MigrationNode(node_id=0, object_type="ROOT", object_id="ROOT", object_owner="NONE") + self._root = MigrationNode( + node_id=0, object_type="ROOT", object_id="ROOT", object_name="ROOT", object_owner="NONE" + ) def register_workflow_task(self, task: jobs.Task, job: jobs.Job, _graph: DependencyGraph) -> MigrationNode: - task_node = self._find_node(object_type="TASK", object_id=task.task_key) + task_id = f"{job.job_id}/{task.task_key}" + task_node = self._find_node(object_type="TASK", object_id=task_id) if task_node: return task_node job_node = self.register_workflow_job(job) MigrationNode.last_node_id += 1 task_node = MigrationNode( - node_id=MigrationNode.last_node_id, object_type="TASK", object_id=task.task_key, object_owner="NONE" + node_id=MigrationNode.last_node_id, + object_type="TASK", + object_id=task_id, + object_name=task.task_key, + object_owner="NONE", ) # TODO object_owner job_node.required_steps.append(task_node) if task.existing_cluster_id: @@ -89,8 +99,13 @@ def register_workflow_job(self, job: jobs.Job) -> MigrationNode: if job_node: return job_node MigrationNode.last_node_id += 1 + job_name = job.settings.name if job.settings and job.settings.name else str(job.job_id) job_node = MigrationNode( - node_id=MigrationNode.last_node_id, object_type="JOB", object_id=str(job.job_id), object_owner="NONE" + node_id=MigrationNode.last_node_id, + object_type="JOB", + object_id=str(job.job_id), + object_name=job_name, + object_owner="NONE", ) # TODO object_owner top_level = True if job.settings and job.settings.job_clusters: @@ -114,7 +129,11 @@ def register_cluster(self, cluster_key: str) -> MigrationNode: return cluster_node MigrationNode.last_node_id += 1 cluster_node = MigrationNode( - node_id=MigrationNode.last_node_id, object_type="CLUSTER", object_id=cluster_key, object_owner="NONE" + node_id=MigrationNode.last_node_id, + object_type="CLUSTER", + object_id=cluster_key, + object_name=cluster_key, + object_owner="NONE", ) # TODO object_owner # TODO register warehouses and policies self._root.required_steps.append(cluster_node) From 8aabea355d5fa0d1d4e2c5e27861e8730f611625 Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Wed, 16 Oct 2024 12:14:11 +0200 Subject: [PATCH 03/21] populate object owner --- src/databricks/labs/ucx/sequencing/sequencing.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/databricks/labs/ucx/sequencing/sequencing.py b/src/databricks/labs/ucx/sequencing/sequencing.py index b12fbcd0b2..f66563de80 100644 --- a/src/databricks/labs/ucx/sequencing/sequencing.py +++ b/src/databricks/labs/ucx/sequencing/sequencing.py @@ -83,8 +83,8 @@ def register_workflow_task(self, task: jobs.Task, job: jobs.Job, _graph: Depende object_type="TASK", object_id=task_id, object_name=task.task_key, - object_owner="NONE", - ) # TODO object_owner + object_owner=job_node.object_owner, # no task owner so use job one + ) job_node.required_steps.append(task_node) if task.existing_cluster_id: cluster_node = self.register_cluster(task.existing_cluster_id) @@ -105,8 +105,8 @@ def register_workflow_job(self, job: jobs.Job) -> MigrationNode: object_type="JOB", object_id=str(job.job_id), object_name=job_name, - object_owner="NONE", - ) # TODO object_owner + object_owner=job.creator_user_name or "", + ) top_level = True if job.settings and job.settings.job_clusters: for job_cluster in job.settings.job_clusters: From 4ae422781f737180a00532099741847de92c8511 Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Wed, 16 Oct 2024 12:48:00 +0200 Subject: [PATCH 04/21] be more defensive --- .../labs/ucx/sequencing/sequencing.py | 17 ++++++++++++----- tests/unit/sequencing/test_sequencing.py | 8 +++++++- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/src/databricks/labs/ucx/sequencing/sequencing.py b/src/databricks/labs/ucx/sequencing/sequencing.py index f66563de80..113b704d30 100644 --- a/src/databricks/labs/ucx/sequencing/sequencing.py +++ b/src/databricks/labs/ucx/sequencing/sequencing.py @@ -4,6 +4,7 @@ from collections.abc import Iterable from dataclasses import dataclass, field +from databricks.sdk import WorkspaceClient from databricks.sdk.service import jobs from databricks.labs.ucx.source_code.graph import DependencyGraph @@ -66,7 +67,8 @@ def find(self, object_type: str, object_id: str) -> MigrationNode | None: class MigrationSequencer: - def __init__(self): + def __init__(self, ws: WorkspaceClient): + self._ws = ws self._root = MigrationNode( node_id=0, object_type="ROOT", object_id="ROOT", object_name="ROOT", object_owner="NONE" ) @@ -83,7 +85,7 @@ def register_workflow_task(self, task: jobs.Task, job: jobs.Job, _graph: Depende object_type="TASK", object_id=task_id, object_name=task.task_key, - object_owner=job_node.object_owner, # no task owner so use job one + object_owner=job_node.object_owner, # no task owner so use job one ) job_node.required_steps.append(task_node) if task.existing_cluster_id: @@ -127,14 +129,17 @@ def register_cluster(self, cluster_key: str) -> MigrationNode: cluster_node = self._find_node(object_type="CLUSTER", object_id=cluster_key) if cluster_node: return cluster_node + details = self._ws.clusters.get(cluster_key) + object_name = details.cluster_name if details and details.cluster_name else cluster_key + object_owner = details.creator_user_name if details and details.creator_user_name else "" MigrationNode.last_node_id += 1 cluster_node = MigrationNode( node_id=MigrationNode.last_node_id, object_type="CLUSTER", object_id=cluster_key, - object_name=cluster_key, - object_owner="NONE", - ) # TODO object_owner + object_name=object_name, + object_owner=object_owner, + ) # TODO register warehouses and policies self._root.required_steps.append(cluster_node) return cluster_node @@ -155,6 +160,8 @@ def _deduplicate_steps(steps: Iterable[MigrationStep]) -> Iterable[MigrationStep for step in steps: existing = best_steps.get(step.step_id, None) # keep the step with the highest step number + # TODO this possibly affects the step_number of steps that depend on this one + # but it's probably OK to not be 100% accurate initially if existing and existing.step_number >= step.step_number: continue best_steps[step.step_id] = step diff --git a/tests/unit/sequencing/test_sequencing.py b/tests/unit/sequencing/test_sequencing.py index 094767fc3e..fa7271164e 100644 --- a/tests/unit/sequencing/test_sequencing.py +++ b/tests/unit/sequencing/test_sequencing.py @@ -1,4 +1,5 @@ from databricks.sdk.service import jobs +from databricks.sdk.service.compute import ClusterDetails from databricks.labs.ucx.sequencing.sequencing import MigrationSequencer from databricks.labs.ucx.source_code.base import CurrentSessionState @@ -7,16 +8,21 @@ def test_cluster_from_task_has_children(ws, simple_dependency_resolver, mock_path_lookup): + ws.clusters.get.return_value = ClusterDetails(cluster_name="my-cluster", creator_user_name="John Doe") task = jobs.Task(task_key="test-task", existing_cluster_id="cluster-123") settings = jobs.JobSettings(name="test-job", tasks=[task]) job = jobs.Job(job_id=1234, settings=settings) ws.jobs.get.return_value = job dependency = WorkflowTask(ws, task, job) graph = DependencyGraph(dependency, None, simple_dependency_resolver, mock_path_lookup, CurrentSessionState()) - sequencer = MigrationSequencer() + sequencer = MigrationSequencer(ws) sequencer.register_workflow_task(task, job, graph) steps = list(sequencer.generate_steps()) step = steps[-1] + assert step.step_id assert step.object_type == "CLUSTER" assert step.object_id == "cluster-123" + assert step.object_name == "my-cluster" + assert step.object_owner == "John Doe" assert step.step_number == 3 + assert len(step.required_step_ids) == 2 From cad9246b151eec8df6e534c1874d145c00ca2ac0 Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Thu, 17 Oct 2024 10:53:39 +0200 Subject: [PATCH 05/21] move last_node_id to sequencer --- src/databricks/labs/ucx/sequencing/sequencing.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/databricks/labs/ucx/sequencing/sequencing.py b/src/databricks/labs/ucx/sequencing/sequencing.py index 113b704d30..4037a29697 100644 --- a/src/databricks/labs/ucx/sequencing/sequencing.py +++ b/src/databricks/labs/ucx/sequencing/sequencing.py @@ -23,7 +23,6 @@ class MigrationStep: @dataclass class MigrationNode: - last_node_id = 0 node_id: int object_type: str object_id: str @@ -69,6 +68,7 @@ class MigrationSequencer: def __init__(self, ws: WorkspaceClient): self._ws = ws + self._last_node_id = 0 self._root = MigrationNode( node_id=0, object_type="ROOT", object_id="ROOT", object_name="ROOT", object_owner="NONE" ) @@ -79,9 +79,9 @@ def register_workflow_task(self, task: jobs.Task, job: jobs.Job, _graph: Depende if task_node: return task_node job_node = self.register_workflow_job(job) - MigrationNode.last_node_id += 1 + self._last_node_id += 1 task_node = MigrationNode( - node_id=MigrationNode.last_node_id, + node_id=self._last_node_id, object_type="TASK", object_id=task_id, object_name=task.task_key, @@ -100,10 +100,10 @@ def register_workflow_job(self, job: jobs.Job) -> MigrationNode: job_node = self._find_node(object_type="JOB", object_id=str(job.job_id)) if job_node: return job_node - MigrationNode.last_node_id += 1 + self._last_node_id += 1 job_name = job.settings.name if job.settings and job.settings.name else str(job.job_id) job_node = MigrationNode( - node_id=MigrationNode.last_node_id, + node_id=self._last_node_id, object_type="JOB", object_id=str(job.job_id), object_name=job_name, @@ -132,9 +132,9 @@ def register_cluster(self, cluster_key: str) -> MigrationNode: details = self._ws.clusters.get(cluster_key) object_name = details.cluster_name if details and details.cluster_name else cluster_key object_owner = details.creator_user_name if details and details.creator_user_name else "" - MigrationNode.last_node_id += 1 + self._last_node_id += 1 cluster_node = MigrationNode( - node_id=MigrationNode.last_node_id, + node_id=self._last_node_id, object_type="CLUSTER", object_id=cluster_key, object_name=object_name, From 7814bee066795c277400f1a0ceed759dffd5f2f7 Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Thu, 17 Oct 2024 15:15:38 +0200 Subject: [PATCH 06/21] cherry-pick changes --- .../labs/ucx/sequencing/sequencing.py | 136 +++++++++--------- 1 file changed, 64 insertions(+), 72 deletions(-) diff --git a/src/databricks/labs/ucx/sequencing/sequencing.py b/src/databricks/labs/ucx/sequencing/sequencing.py index 4037a29697..063d0ca854 100644 --- a/src/databricks/labs/ucx/sequencing/sequencing.py +++ b/src/databricks/labs/ucx/sequencing/sequencing.py @@ -1,8 +1,8 @@ from __future__ import annotations -import itertools +from collections import defaultdict from collections.abc import Iterable -from dataclasses import dataclass, field +from dataclasses import dataclass from databricks.sdk import WorkspaceClient from databricks.sdk.service import jobs @@ -18,7 +18,7 @@ class MigrationStep: object_id: str object_name: str object_owner: str - required_step_ids: list[int] = field(default_factory=list) + required_step_ids: list[int] @dataclass @@ -28,40 +28,21 @@ class MigrationNode: object_id: str object_name: str object_owner: str - required_steps: list[MigrationNode] = field(default_factory=list) - - def generate_steps(self) -> tuple[MigrationStep, Iterable[MigrationStep]]: - # traverse the nodes using a depth-first algorithm - # ultimate leaves have a step number of 1 - # use highest required step number + 1 for this step - highest_step_number = 0 - required_step_ids: list[int] = [] - all_generated_steps: list[Iterable[MigrationStep]] = [] - for required_step in self.required_steps: - step, generated_steps = required_step.generate_steps() - highest_step_number = max(highest_step_number, step.step_number) - required_step_ids.append(step.step_id) - all_generated_steps.append(generated_steps) - all_generated_steps.append([step]) - this_step = MigrationStep( + + @property + def key(self) -> tuple[str, str]: + return self.object_type, self.object_id + + def as_step(self, step_number: int, required_step_ids: list[int]) -> MigrationStep: + return MigrationStep( step_id=self.node_id, - step_number=highest_step_number + 1, + step_number=step_number, object_type=self.object_type, object_id=self.object_id, object_name=self.object_name, object_owner=self.object_owner, required_step_ids=required_step_ids, ) - return this_step, itertools.chain(*all_generated_steps) - - def find(self, object_type: str, object_id: str) -> MigrationNode | None: - if object_type == self.object_type and object_id == self.object_id: - return self - for step in self.required_steps: - found = step.find(object_type, object_id) - if found: - return found - return None class MigrationSequencer: @@ -69,13 +50,13 @@ class MigrationSequencer: def __init__(self, ws: WorkspaceClient): self._ws = ws self._last_node_id = 0 - self._root = MigrationNode( - node_id=0, object_type="ROOT", object_id="ROOT", object_name="ROOT", object_owner="NONE" - ) + self._nodes: dict[tuple[str, str], MigrationNode] = {} + self._incoming: dict[tuple[str, str], set[tuple[str, str]]] = defaultdict(set) + self._outgoing: dict[tuple[str, str], set[tuple[str, str]]] = defaultdict(set) def register_workflow_task(self, task: jobs.Task, job: jobs.Job, _graph: DependencyGraph) -> MigrationNode: task_id = f"{job.job_id}/{task.task_key}" - task_node = self._find_node(object_type="TASK", object_id=task_id) + task_node = self._nodes.get(("TASK", task_id), None) if task_node: return task_node job_node = self.register_workflow_job(job) @@ -87,17 +68,22 @@ def register_workflow_task(self, task: jobs.Task, job: jobs.Job, _graph: Depende object_name=task.task_key, object_owner=job_node.object_owner, # no task owner so use job one ) - job_node.required_steps.append(task_node) + self._nodes[task_node.key] = task_node + self._incoming[job_node.key].add(task_node.key) + self._outgoing[task_node.key].add(job_node.key) if task.existing_cluster_id: cluster_node = self.register_cluster(task.existing_cluster_id) - cluster_node.required_steps.append(task_node) - if job_node not in cluster_node.required_steps: - cluster_node.required_steps.append(job_node) + if cluster_node: + self._incoming[cluster_node.key].add(task_node.key) + self._outgoing[task_node.key].add(cluster_node.key) + # also make the cluster dependent on the job + self._incoming[cluster_node.key].add(job_node.key) + self._outgoing[job_node.key].add(cluster_node.key) # TODO register dependency graph return task_node def register_workflow_job(self, job: jobs.Job) -> MigrationNode: - job_node = self._find_node(object_type="JOB", object_id=str(job.job_id)) + job_node = self._nodes.get(("JOB", str(job.job_id)), None) if job_node: return job_node self._last_node_id += 1 @@ -109,15 +95,13 @@ def register_workflow_job(self, job: jobs.Job) -> MigrationNode: object_name=job_name, object_owner=job.creator_user_name or "", ) - top_level = True + self._nodes[job_node.key] = job_node if job.settings and job.settings.job_clusters: for job_cluster in job.settings.job_clusters: cluster_node = self.register_job_cluster(job_cluster) if cluster_node: - top_level = False - cluster_node.required_steps.append(job_node) - if top_level: - self._root.required_steps.append(job_node) + self._incoming[cluster_node.key].add(job_node.key) + self._outgoing[job_node.key].add(cluster_node.key) return job_node def register_job_cluster(self, cluster: jobs.JobCluster) -> MigrationNode | None: @@ -125,47 +109,55 @@ def register_job_cluster(self, cluster: jobs.JobCluster) -> MigrationNode | None return None return self.register_cluster(cluster.job_cluster_key) - def register_cluster(self, cluster_key: str) -> MigrationNode: - cluster_node = self._find_node(object_type="CLUSTER", object_id=cluster_key) + def register_cluster(self, cluster_id: str) -> MigrationNode: + cluster_node = self._nodes.get(("CLUSTER", cluster_id), None) if cluster_node: return cluster_node - details = self._ws.clusters.get(cluster_key) - object_name = details.cluster_name if details and details.cluster_name else cluster_key - object_owner = details.creator_user_name if details and details.creator_user_name else "" + details = self._ws.clusters.get(cluster_id) + object_name = details.cluster_name if details and details.cluster_name else cluster_id self._last_node_id += 1 cluster_node = MigrationNode( node_id=self._last_node_id, object_type="CLUSTER", - object_id=cluster_key, + object_id=cluster_id, object_name=object_name, object_owner=object_owner, ) + self._nodes[cluster_node.key] = cluster_node # TODO register warehouses and policies - self._root.required_steps.append(cluster_node) return cluster_node def generate_steps(self) -> Iterable[MigrationStep]: - _root_step, generated_steps = self._root.generate_steps() - unique_steps = self._deduplicate_steps(generated_steps) - return self._sorted_steps(unique_steps) + # algo adapted from Kahn topological sort. The main differences is that + # we want the same step number for all nodes with same dependency depth + # so instead of pushing to a queue, we rebuild it once all leaf nodes are processed + # (these are transient leaf nodes i.e. they only become leaf during processing) + incoming_counts = self._populate_incoming_counts() + step_number = 1 + sorted_steps: list[MigrationStep] = [] + while len(incoming_counts) > 0: + leaf_keys = list(self._get_leaf_keys(incoming_counts)) + for leaf_key in leaf_keys: + del incoming_counts[leaf_key] + sorted_steps.append(self._nodes[leaf_key].as_step(step_number, list(self._required_step_ids(leaf_key)))) + for dependency_key in self._outgoing[leaf_key]: + incoming_counts[dependency_key] -= 1 + step_number += 1 + return sorted_steps + + def _required_step_ids(self, node_key: tuple[str, str]) -> Iterable[int]: + for leaf_key in self._incoming[node_key]: + yield self._nodes[leaf_key].node_id + + def _populate_incoming_counts(self) -> dict[tuple[str, str], int]: + result = defaultdict(int) + for node_key in self._nodes: + result[node_key] = len(self._incoming[node_key]) + return result @staticmethod - def _sorted_steps(steps: Iterable[MigrationStep]) -> Iterable[MigrationStep]: - # sort by step number, lowest first - return sorted(steps, key=lambda step: step.step_number) - - @staticmethod - def _deduplicate_steps(steps: Iterable[MigrationStep]) -> Iterable[MigrationStep]: - best_steps: dict[int, MigrationStep] = {} - for step in steps: - existing = best_steps.get(step.step_id, None) - # keep the step with the highest step number - # TODO this possibly affects the step_number of steps that depend on this one - # but it's probably OK to not be 100% accurate initially - if existing and existing.step_number >= step.step_number: + def _get_leaf_keys(incoming_counts: dict[tuple[str, str], int]) -> Iterable[tuple[str, str]]: + for node_key, incoming_count in incoming_counts.items(): + if incoming_count > 0: continue - best_steps[step.step_id] = step - return best_steps.values() - - def _find_node(self, object_type: str, object_id: str) -> MigrationNode | None: - return self._root.find(object_type, object_id) + yield node_key From 81eb87cec4664b4abeb2ef6a1c80ecf9bb8f0d15 Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Thu, 17 Oct 2024 18:30:27 +0200 Subject: [PATCH 07/21] use existing Ownership classes --- src/databricks/labs/ucx/sequencing/sequencing.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/databricks/labs/ucx/sequencing/sequencing.py b/src/databricks/labs/ucx/sequencing/sequencing.py index 063d0ca854..0b1e384297 100644 --- a/src/databricks/labs/ucx/sequencing/sequencing.py +++ b/src/databricks/labs/ucx/sequencing/sequencing.py @@ -7,6 +7,9 @@ from databricks.sdk import WorkspaceClient from databricks.sdk.service import jobs +from databricks.labs.ucx.assessment.clusters import ClusterOwnership, ClusterInfo +from databricks.labs.ucx.assessment.jobs import JobOwnership, JobInfo +from databricks.labs.ucx.framework.owners import AdministratorLocator from databricks.labs.ucx.source_code.graph import DependencyGraph @@ -93,7 +96,7 @@ def register_workflow_job(self, job: jobs.Job) -> MigrationNode: object_type="JOB", object_id=str(job.job_id), object_name=job_name, - object_owner=job.creator_user_name or "", + object_owner=JobOwnership(self._admin_locator).owner_of(JobInfo.from_job(job)), ) self._nodes[job_node.key] = job_node if job.settings and job.settings.job_clusters: @@ -121,7 +124,7 @@ def register_cluster(self, cluster_id: str) -> MigrationNode: object_type="CLUSTER", object_id=cluster_id, object_name=object_name, - object_owner=object_owner, + object_owner=ClusterOwnership(self._admin_locator).owner_of(ClusterInfo.from_cluster_details(details)), ) self._nodes[cluster_node.key] = cluster_node # TODO register warehouses and policies From 6cf3a8708ac33ecae17fb4522586f412fadd693c Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Thu, 17 Oct 2024 18:37:57 +0200 Subject: [PATCH 08/21] fix merge issues --- src/databricks/labs/ucx/sequencing/sequencing.py | 3 ++- tests/unit/sequencing/test_sequencing.py | 10 ++++++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/src/databricks/labs/ucx/sequencing/sequencing.py b/src/databricks/labs/ucx/sequencing/sequencing.py index 0b1e384297..a873ee1b7e 100644 --- a/src/databricks/labs/ucx/sequencing/sequencing.py +++ b/src/databricks/labs/ucx/sequencing/sequencing.py @@ -50,8 +50,9 @@ def as_step(self, step_number: int, required_step_ids: list[int]) -> MigrationSt class MigrationSequencer: - def __init__(self, ws: WorkspaceClient): + def __init__(self, ws: WorkspaceClient, admin_locator: AdministratorLocator): self._ws = ws + self._admin_locator = admin_locator self._last_node_id = 0 self._nodes: dict[tuple[str, str], MigrationNode] = {} self._incoming: dict[tuple[str, str], set[tuple[str, str]]] = defaultdict(set) diff --git a/tests/unit/sequencing/test_sequencing.py b/tests/unit/sequencing/test_sequencing.py index fa7271164e..21d2a612d0 100644 --- a/tests/unit/sequencing/test_sequencing.py +++ b/tests/unit/sequencing/test_sequencing.py @@ -1,6 +1,9 @@ -from databricks.sdk.service import jobs +from unittest.mock import create_autospec + +from databricks.sdk.service import iam, jobs from databricks.sdk.service.compute import ClusterDetails +from databricks.labs.ucx.framework.owners import AdministratorLocator, AdministratorFinder from databricks.labs.ucx.sequencing.sequencing import MigrationSequencer from databricks.labs.ucx.source_code.base import CurrentSessionState from databricks.labs.ucx.source_code.graph import DependencyGraph @@ -15,7 +18,10 @@ def test_cluster_from_task_has_children(ws, simple_dependency_resolver, mock_pat ws.jobs.get.return_value = job dependency = WorkflowTask(ws, task, job) graph = DependencyGraph(dependency, None, simple_dependency_resolver, mock_path_lookup, CurrentSessionState()) - sequencer = MigrationSequencer(ws) + admin_finder = create_autospec(AdministratorFinder) + admin_user = iam.User(user_name="John Doe", active=True, roles=[iam.ComplexValue(value="account_admin")]) + admin_finder.find_admin_users.return_value = (admin_user,) + sequencer = MigrationSequencer(ws, AdministratorLocator(ws, finders=[lambda _ws: admin_finder])) sequencer.register_workflow_task(task, job, graph) steps = list(sequencer.generate_steps()) step = steps[-1] From ce1938c6652ffedee7b250b76f853b06231c369f Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Wed, 16 Oct 2024 15:35:23 +0200 Subject: [PATCH 09/21] create steps for source files --- .../labs/ucx/sequencing/sequencing.py | 45 +++++++++++++++++-- tests/unit/sequencing/test_sequencing.py | 32 ++++++++++++- 2 files changed, 72 insertions(+), 5 deletions(-) diff --git a/src/databricks/labs/ucx/sequencing/sequencing.py b/src/databricks/labs/ucx/sequencing/sequencing.py index a873ee1b7e..d9ffa6a092 100644 --- a/src/databricks/labs/ucx/sequencing/sequencing.py +++ b/src/databricks/labs/ucx/sequencing/sequencing.py @@ -2,7 +2,8 @@ from collections import defaultdict from collections.abc import Iterable -from dataclasses import dataclass +from dataclasses import dataclass, field +from pathlib import Path from databricks.sdk import WorkspaceClient from databricks.sdk.service import jobs @@ -11,6 +12,7 @@ from databricks.labs.ucx.assessment.jobs import JobOwnership, JobInfo from databricks.labs.ucx.framework.owners import AdministratorLocator from databricks.labs.ucx.source_code.graph import DependencyGraph +from databricks.labs.ucx.source_code.path_lookup import PathLookup @dataclass @@ -51,6 +53,7 @@ def as_step(self, step_number: int, required_step_ids: list[int]) -> MigrationSt class MigrationSequencer: def __init__(self, ws: WorkspaceClient, admin_locator: AdministratorLocator): + def __init__(self, ws: WorkspaceClient, path_lookup: PathLookup): self._ws = ws self._admin_locator = admin_locator self._last_node_id = 0 @@ -58,7 +61,7 @@ def __init__(self, ws: WorkspaceClient, admin_locator: AdministratorLocator): self._incoming: dict[tuple[str, str], set[tuple[str, str]]] = defaultdict(set) self._outgoing: dict[tuple[str, str], set[tuple[str, str]]] = defaultdict(set) - def register_workflow_task(self, task: jobs.Task, job: jobs.Job, _graph: DependencyGraph) -> MigrationNode: + def register_workflow_task(self, task: jobs.Task, job: jobs.Job, graph: DependencyGraph) -> MigrationNode: task_id = f"{job.job_id}/{task.task_key}" task_node = self._nodes.get(("TASK", task_id), None) if task_node: @@ -86,15 +89,49 @@ def register_workflow_task(self, task: jobs.Task, job: jobs.Job, _graph: Depende # TODO register dependency graph return task_node + def _visit_dependency(self, graph: DependencyGraph) -> bool | None: + lineage = graph.dependency.lineage[-1] + parent_node = self._find_node(lineage.object_type, lineage.object_id) + for dependency in graph.local_dependencies: + lineage = dependency.lineage[-1] + child_node = self.register_dependency(lineage.object_type, lineage.object_id) + parent_node.required_steps.append(child_node) + # TODO tables and dfsas + return None + + def register_dependency(self, object_type: str, object_id: str) -> MigrationNode: + existing = self._find_node(object_type, object_id) + if existing: + return existing + object_name: str = "" + object_owner: str = "" + if object_type in { "NOTEBOOK", "FILE" }: + path = Path(object_id) + for library_root in self._path_lookup.library_roots: + if not path.is_relative_to(library_root): + continue + object_name = path.relative_to(library_root).as_posix() + break + else: + raise ValueError(f"{object_type} not supported yet!") + MigrationNode.last_node_id += 1 + return MigrationNode( + node_id=MigrationNode.last_node_id, + object_type=object_type, + object_id=object_id, + object_name=object_name, + object_owner=object_owner, + ) + def register_workflow_job(self, job: jobs.Job) -> MigrationNode: - job_node = self._nodes.get(("JOB", str(job.job_id)), None) + job_node = self._nodes.get(("WORKFLOW", str(job.job_id)), None) if job_node: return job_node self._last_node_id += 1 job_name = job.settings.name if job.settings and job.settings.name else str(job.job_id) job_node = MigrationNode( node_id=self._last_node_id, - object_type="JOB", + object_type="WORKFLOW", object_id=str(job.job_id), object_name=job_name, object_owner=JobOwnership(self._admin_locator).owner_of(JobInfo.from_job(job)), diff --git a/tests/unit/sequencing/test_sequencing.py b/tests/unit/sequencing/test_sequencing.py index 21d2a612d0..83db9395ec 100644 --- a/tests/unit/sequencing/test_sequencing.py +++ b/tests/unit/sequencing/test_sequencing.py @@ -1,7 +1,11 @@ from unittest.mock import create_autospec from databricks.sdk.service import iam, jobs +from pathlib import Path + +from databricks.sdk.service import jobs from databricks.sdk.service.compute import ClusterDetails +from databricks.sdk.service.jobs import NotebookTask from databricks.labs.ucx.framework.owners import AdministratorLocator, AdministratorFinder from databricks.labs.ucx.sequencing.sequencing import MigrationSequencer @@ -10,7 +14,7 @@ from databricks.labs.ucx.source_code.jobs import WorkflowTask -def test_cluster_from_task_has_children(ws, simple_dependency_resolver, mock_path_lookup): +def test_sequencer_builds_cluster_and_children_from_task(ws, simple_dependency_resolver, mock_path_lookup): ws.clusters.get.return_value = ClusterDetails(cluster_name="my-cluster", creator_user_name="John Doe") task = jobs.Task(task_key="test-task", existing_cluster_id="cluster-123") settings = jobs.JobSettings(name="test-job", tasks=[task]) @@ -32,3 +36,29 @@ def test_cluster_from_task_has_children(ws, simple_dependency_resolver, mock_pat assert step.object_owner == "John Doe" assert step.step_number == 3 assert len(step.required_step_ids) == 2 + + +def test_sequencer_builds_steps_from_dependency_graph(ws, simple_dependency_resolver, mock_path_lookup): + functional = mock_path_lookup.resolve(Path("functional")) + mock_path_lookup.append_path(functional) + mock_path_lookup = mock_path_lookup.change_directory(functional) + notebook_path = Path("grand_parent_that_imports_parent_that_magic_runs_child.py") + notebook_task = NotebookTask(notebook_path=notebook_path.as_posix()) + task = jobs.Task(task_key="test-task", existing_cluster_id="cluster-123", notebook_task=notebook_task) + settings = jobs.JobSettings(name="test-job", tasks=[task]) + job = jobs.Job(job_id=1234, settings=settings) + ws.jobs.get.return_value = job + dependency = WorkflowTask(ws, task, job) + container = dependency.load(mock_path_lookup) + graph = DependencyGraph(dependency, None, simple_dependency_resolver, mock_path_lookup, CurrentSessionState()) + problems = container.build_dependency_graph(graph) + assert not problems + sequencer = MigrationSequencer(ws, mock_path_lookup) + sequencer.register_workflow_task(task, job, graph) + steps = list(sequencer.generate_steps()) + names = {step.object_name for step in steps} + assert notebook_path.as_posix() in names + notebook_path = Path("parent_that_magic_runs_child_that_uses_value_from_parent.py") + assert notebook_path.as_posix() in names + notebook_path = Path("_child_that_uses_value_from_parent.py") + assert notebook_path.as_posix() in names From f8f96661cb138371f25c57a07e5cac2ea20941b0 Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Thu, 17 Oct 2024 17:13:07 +0200 Subject: [PATCH 10/21] fix merge issues --- .../labs/ucx/sequencing/sequencing.py | 27 ++++++++++--------- tests/unit/sequencing/test_sequencing.py | 14 ++++++---- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/src/databricks/labs/ucx/sequencing/sequencing.py b/src/databricks/labs/ucx/sequencing/sequencing.py index d9ffa6a092..c3295aae6c 100644 --- a/src/databricks/labs/ucx/sequencing/sequencing.py +++ b/src/databricks/labs/ucx/sequencing/sequencing.py @@ -2,7 +2,7 @@ from collections import defaultdict from collections.abc import Iterable -from dataclasses import dataclass, field +from dataclasses import dataclass from pathlib import Path from databricks.sdk import WorkspaceClient @@ -91,21 +91,20 @@ def register_workflow_task(self, task: jobs.Task, job: jobs.Job, graph: Dependen def _visit_dependency(self, graph: DependencyGraph) -> bool | None: lineage = graph.dependency.lineage[-1] - parent_node = self._find_node(lineage.object_type, lineage.object_id) + parent_node = self._nodes[(lineage.object_type, lineage.object_id)] for dependency in graph.local_dependencies: lineage = dependency.lineage[-1] - child_node = self.register_dependency(lineage.object_type, lineage.object_id) - parent_node.required_steps.append(child_node) + self.register_dependency(parent_node, lineage.object_type, lineage.object_id) # TODO tables and dfsas return None - def register_dependency(self, object_type: str, object_id: str) -> MigrationNode: - existing = self._find_node(object_type, object_id) - if existing: - return existing + def register_dependency(self, parent_node: MigrationNode, object_type: str, object_id: str) -> MigrationNode: + dependency_node = self._nodes.get((object_type, object_id), None) + if dependency_node: + return dependency_node object_name: str = "" object_owner: str = "" - if object_type in { "NOTEBOOK", "FILE" }: + if object_type in {"NOTEBOOK", "FILE"}: path = Path(object_id) for library_root in self._path_lookup.library_roots: if not path.is_relative_to(library_root): @@ -114,14 +113,18 @@ def register_dependency(self, object_type: str, object_id: str) -> MigrationNode break else: raise ValueError(f"{object_type} not supported yet!") - MigrationNode.last_node_id += 1 - return MigrationNode( - node_id=MigrationNode.last_node_id, + self._last_node_id += 1 + dependency_node = MigrationNode( + node_id=self._last_node_id, object_type=object_type, object_id=object_id, object_name=object_name, object_owner=object_owner, ) + self._nodes[dependency_node.key] = dependency_node + self._incoming[dependency_node.key].add(parent_node.key) + self._outgoing[parent_node.key].add(dependency_node.key) + return dependency_node def register_workflow_job(self, job: jobs.Job) -> MigrationNode: job_node = self._nodes.get(("WORKFLOW", str(job.job_id)), None) diff --git a/tests/unit/sequencing/test_sequencing.py b/tests/unit/sequencing/test_sequencing.py index 83db9395ec..d74b95b8bc 100644 --- a/tests/unit/sequencing/test_sequencing.py +++ b/tests/unit/sequencing/test_sequencing.py @@ -14,6 +14,13 @@ from databricks.labs.ucx.source_code.jobs import WorkflowTask +def admin_locator(ws, user_name: str): + admin_finder = create_autospec(AdministratorFinder) + admin_user = iam.User(user_name=user_name, active=True, roles=[iam.ComplexValue(value="account_admin")]) + admin_finder.find_admin_users.return_value = (admin_user,) + return AdministratorLocator(ws, finders=[lambda _ws: admin_finder]) + + def test_sequencer_builds_cluster_and_children_from_task(ws, simple_dependency_resolver, mock_path_lookup): ws.clusters.get.return_value = ClusterDetails(cluster_name="my-cluster", creator_user_name="John Doe") task = jobs.Task(task_key="test-task", existing_cluster_id="cluster-123") @@ -22,10 +29,7 @@ def test_sequencer_builds_cluster_and_children_from_task(ws, simple_dependency_r ws.jobs.get.return_value = job dependency = WorkflowTask(ws, task, job) graph = DependencyGraph(dependency, None, simple_dependency_resolver, mock_path_lookup, CurrentSessionState()) - admin_finder = create_autospec(AdministratorFinder) - admin_user = iam.User(user_name="John Doe", active=True, roles=[iam.ComplexValue(value="account_admin")]) - admin_finder.find_admin_users.return_value = (admin_user,) - sequencer = MigrationSequencer(ws, AdministratorLocator(ws, finders=[lambda _ws: admin_finder])) + sequencer = MigrationSequencer(ws, mock_path_lookup, admin_locator(ws, "John Doe")) sequencer.register_workflow_task(task, job, graph) steps = list(sequencer.generate_steps()) step = steps[-1] @@ -53,7 +57,7 @@ def test_sequencer_builds_steps_from_dependency_graph(ws, simple_dependency_reso graph = DependencyGraph(dependency, None, simple_dependency_resolver, mock_path_lookup, CurrentSessionState()) problems = container.build_dependency_graph(graph) assert not problems - sequencer = MigrationSequencer(ws, mock_path_lookup) + sequencer = MigrationSequencer(ws, mock_path_lookup, admin_locator(ws, "John Doe")) sequencer.register_workflow_task(task, job, graph) steps = list(sequencer.generate_steps()) names = {step.object_name for step in steps} From c32aff801cb5319fd459b5f65a250d92dd28d73e Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Thu, 17 Oct 2024 18:10:34 +0200 Subject: [PATCH 11/21] register notebooks from dependency graph --- .../labs/ucx/sequencing/sequencing.py | 6 +++++- tests/unit/sequencing/test_sequencing.py | 17 +++++++++++------ 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/src/databricks/labs/ucx/sequencing/sequencing.py b/src/databricks/labs/ucx/sequencing/sequencing.py index c3295aae6c..76a337646b 100644 --- a/src/databricks/labs/ucx/sequencing/sequencing.py +++ b/src/databricks/labs/ucx/sequencing/sequencing.py @@ -25,6 +25,10 @@ class MigrationStep: object_owner: str required_step_ids: list[int] + @property + def key(self) -> tuple[str, str]: + return self.object_type, self.object_id + @dataclass class MigrationNode: @@ -86,7 +90,7 @@ def register_workflow_task(self, task: jobs.Task, job: jobs.Job, graph: Dependen # also make the cluster dependent on the job self._incoming[cluster_node.key].add(job_node.key) self._outgoing[job_node.key].add(cluster_node.key) - # TODO register dependency graph + graph.visit(self._visit_dependency, None) return task_node def _visit_dependency(self, graph: DependencyGraph) -> bool | None: diff --git a/tests/unit/sequencing/test_sequencing.py b/tests/unit/sequencing/test_sequencing.py index d74b95b8bc..ac4472a1f4 100644 --- a/tests/unit/sequencing/test_sequencing.py +++ b/tests/unit/sequencing/test_sequencing.py @@ -60,9 +60,14 @@ def test_sequencer_builds_steps_from_dependency_graph(ws, simple_dependency_reso sequencer = MigrationSequencer(ws, mock_path_lookup, admin_locator(ws, "John Doe")) sequencer.register_workflow_task(task, job, graph) steps = list(sequencer.generate_steps()) - names = {step.object_name for step in steps} - assert notebook_path.as_posix() in names - notebook_path = Path("parent_that_magic_runs_child_that_uses_value_from_parent.py") - assert notebook_path.as_posix() in names - notebook_path = Path("_child_that_uses_value_from_parent.py") - assert notebook_path.as_posix() in names + step0 = next((step for step in steps if step.object_type == "TASK"), None) + assert step0 + step1 = next((step for step in steps if step.object_name == notebook_path.as_posix()), None) + assert step1 + assert step1.step_number > step0.step_number + step2 = next((step for step in steps if step.object_name == "parent_that_magic_runs_child_that_uses_value_from_parent.py"), None) + assert step2 + assert step2.step_number > step1.step_number + step3 = next((step for step in steps if step.object_name == "_child_that_uses_value_from_parent.py"), None) + assert step3 + assert step3.step_number > step2.step_number From 12b5bc4940998f1bd6f861be9f16fd4506ce42ed Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Thu, 17 Oct 2024 18:57:05 +0200 Subject: [PATCH 12/21] fix merge issues --- src/databricks/labs/ucx/sequencing/sequencing.py | 6 +++--- src/databricks/labs/ucx/source_code/jobs.py | 1 - tests/unit/sequencing/test_sequencing.py | 9 ++++++--- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/databricks/labs/ucx/sequencing/sequencing.py b/src/databricks/labs/ucx/sequencing/sequencing.py index 76a337646b..612f339822 100644 --- a/src/databricks/labs/ucx/sequencing/sequencing.py +++ b/src/databricks/labs/ucx/sequencing/sequencing.py @@ -56,9 +56,9 @@ def as_step(self, step_number: int, required_step_ids: list[int]) -> MigrationSt class MigrationSequencer: - def __init__(self, ws: WorkspaceClient, admin_locator: AdministratorLocator): - def __init__(self, ws: WorkspaceClient, path_lookup: PathLookup): + def __init__(self, ws: WorkspaceClient, path_lookup: PathLookup, admin_locator: AdministratorLocator): self._ws = ws + self._path_lookup = path_lookup self._admin_locator = admin_locator self._last_node_id = 0 self._nodes: dict[tuple[str, str], MigrationNode] = {} @@ -100,7 +100,7 @@ def _visit_dependency(self, graph: DependencyGraph) -> bool | None: lineage = dependency.lineage[-1] self.register_dependency(parent_node, lineage.object_type, lineage.object_id) # TODO tables and dfsas - return None + return False def register_dependency(self, parent_node: MigrationNode, object_type: str, object_id: str) -> MigrationNode: dependency_node = self._nodes.get((object_type, object_id), None) diff --git a/src/databricks/labs/ucx/source_code/jobs.py b/src/databricks/labs/ucx/source_code/jobs.py index 18c6ff745a..982aeefca5 100644 --- a/src/databricks/labs/ucx/source_code/jobs.py +++ b/src/databricks/labs/ucx/source_code/jobs.py @@ -34,7 +34,6 @@ SourceInfo, UsedTable, LineageAtom, - read_text, ) from databricks.labs.ucx.source_code.directfs_access import ( DirectFsAccessCrawler, diff --git a/tests/unit/sequencing/test_sequencing.py b/tests/unit/sequencing/test_sequencing.py index ac4472a1f4..28d8947c1b 100644 --- a/tests/unit/sequencing/test_sequencing.py +++ b/tests/unit/sequencing/test_sequencing.py @@ -1,9 +1,9 @@ from unittest.mock import create_autospec -from databricks.sdk.service import iam, jobs from pathlib import Path -from databricks.sdk.service import jobs +from databricks.sdk.service import iam, jobs + from databricks.sdk.service.compute import ClusterDetails from databricks.sdk.service.jobs import NotebookTask @@ -65,7 +65,10 @@ def test_sequencer_builds_steps_from_dependency_graph(ws, simple_dependency_reso step1 = next((step for step in steps if step.object_name == notebook_path.as_posix()), None) assert step1 assert step1.step_number > step0.step_number - step2 = next((step for step in steps if step.object_name == "parent_that_magic_runs_child_that_uses_value_from_parent.py"), None) + step2 = next( + (step for step in steps if step.object_name == "parent_that_magic_runs_child_that_uses_value_from_parent.py"), + None, + ) assert step2 assert step2.step_number > step1.step_number step3 = next((step for step in steps if step.object_name == "_child_that_uses_value_from_parent.py"), None) From c1fc42930ad7a8f9f80e49a9c1963ba936fa7e2c Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Thu, 17 Oct 2024 19:36:27 +0200 Subject: [PATCH 13/21] mock WorkspaceCache for testing --- src/databricks/labs/ucx/source_code/jobs.py | 9 +++++---- tests/unit/sequencing/test_sequencing.py | 5 ++++- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/databricks/labs/ucx/source_code/jobs.py b/src/databricks/labs/ucx/source_code/jobs.py index 982aeefca5..59fa6b9eca 100644 --- a/src/databricks/labs/ucx/source_code/jobs.py +++ b/src/databricks/labs/ucx/source_code/jobs.py @@ -34,6 +34,7 @@ SourceInfo, UsedTable, LineageAtom, + read_text, ) from databricks.labs.ucx.source_code.directfs_access import ( DirectFsAccessCrawler, @@ -77,8 +78,8 @@ def as_message(self) -> str: class WorkflowTask(Dependency): - def __init__(self, ws: WorkspaceClient, task: jobs.Task, job: jobs.Job): - loader = WrappingLoader(WorkflowTaskContainer(ws, task, job)) + def __init__(self, ws: WorkspaceClient, task: jobs.Task, job: jobs.Job, cache: WorkspaceCache | None = None): + loader = WrappingLoader(WorkflowTaskContainer(ws, task, job, cache)) super().__init__(loader, Path(f'/jobs/{task.task_key}'), inherits_context=False) self._task = task self._job = job @@ -98,11 +99,11 @@ def lineage(self) -> list[LineageAtom]: class WorkflowTaskContainer(SourceContainer): - def __init__(self, ws: WorkspaceClient, task: jobs.Task, job: jobs.Job): + def __init__(self, ws: WorkspaceClient, task: jobs.Task, job: jobs.Job, cache: WorkspaceCache | None = None): self._task = task self._job = job self._ws = ws - self._cache = WorkspaceCache(ws) + self._cache = cache or WorkspaceCache(ws) self._named_parameters: dict[str, str] | None = {} self._parameters: list[str] | None = [] self._spark_conf: dict[str, str] | None = {} diff --git a/tests/unit/sequencing/test_sequencing.py b/tests/unit/sequencing/test_sequencing.py index 28d8947c1b..e41e92358c 100644 --- a/tests/unit/sequencing/test_sequencing.py +++ b/tests/unit/sequencing/test_sequencing.py @@ -8,6 +8,7 @@ from databricks.sdk.service.jobs import NotebookTask from databricks.labs.ucx.framework.owners import AdministratorLocator, AdministratorFinder +from databricks.labs.ucx.mixins.cached_workspace_path import WorkspaceCache from databricks.labs.ucx.sequencing.sequencing import MigrationSequencer from databricks.labs.ucx.source_code.base import CurrentSessionState from databricks.labs.ucx.source_code.graph import DependencyGraph @@ -52,7 +53,9 @@ def test_sequencer_builds_steps_from_dependency_graph(ws, simple_dependency_reso settings = jobs.JobSettings(name="test-job", tasks=[task]) job = jobs.Job(job_id=1234, settings=settings) ws.jobs.get.return_value = job - dependency = WorkflowTask(ws, task, job) + ws_cache = create_autospec(WorkspaceCache) + ws_cache.get_workspace_path.side_effect = lambda path: Path(path) + dependency = WorkflowTask(ws, task, job, ws_cache) container = dependency.load(mock_path_lookup) graph = DependencyGraph(dependency, None, simple_dependency_resolver, mock_path_lookup, CurrentSessionState()) problems = container.build_dependency_graph(graph) From c6fb61688d7b23bac3698e32ae3f384e517e3fda Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Fri, 18 Oct 2024 13:19:52 +0200 Subject: [PATCH 14/21] populate ownership - leave the correct implementation to issue #3003 --- src/databricks/labs/ucx/framework/owners.py | 7 +++++++ src/databricks/labs/ucx/sequencing/sequencing.py | 3 ++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/src/databricks/labs/ucx/framework/owners.py b/src/databricks/labs/ucx/framework/owners.py index 55a1ddac98..7fcec332f2 100644 --- a/src/databricks/labs/ucx/framework/owners.py +++ b/src/databricks/labs/ucx/framework/owners.py @@ -255,3 +255,10 @@ def _maybe_direct_owner(self, record: str) -> str | None: return None except InternalError: # redash is very naughty and throws 500s instead of proper 404s return None + + +class WorkspaceObjectOwnership(Ownership[tuple[str, str]]): + + def _maybe_direct_owner(self, record: tuple[str, str]) -> str | None: + # TODO: tuple[0] = object_type, tuple[1] = object_id + return None diff --git a/src/databricks/labs/ucx/sequencing/sequencing.py b/src/databricks/labs/ucx/sequencing/sequencing.py index 612f339822..eed753d7cb 100644 --- a/src/databricks/labs/ucx/sequencing/sequencing.py +++ b/src/databricks/labs/ucx/sequencing/sequencing.py @@ -10,7 +10,7 @@ from databricks.labs.ucx.assessment.clusters import ClusterOwnership, ClusterInfo from databricks.labs.ucx.assessment.jobs import JobOwnership, JobInfo -from databricks.labs.ucx.framework.owners import AdministratorLocator +from databricks.labs.ucx.framework.owners import AdministratorLocator, WorkspaceObjectOwnership from databricks.labs.ucx.source_code.graph import DependencyGraph from databricks.labs.ucx.source_code.path_lookup import PathLookup @@ -115,6 +115,7 @@ def register_dependency(self, parent_node: MigrationNode, object_type: str, obje continue object_name = path.relative_to(library_root).as_posix() break + object_owner = WorkspaceObjectOwnership(self._admin_locator).owner_of((object_type, object_id)) else: raise ValueError(f"{object_type} not supported yet!") self._last_node_id += 1 From 73b277aba7336ee83aa469f63b73375ab6e426be Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Fri, 18 Oct 2024 14:17:54 +0200 Subject: [PATCH 15/21] fix incorrect step sequence --- src/databricks/labs/ucx/sequencing/sequencing.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/databricks/labs/ucx/sequencing/sequencing.py b/src/databricks/labs/ucx/sequencing/sequencing.py index eed753d7cb..2bca4cbe94 100644 --- a/src/databricks/labs/ucx/sequencing/sequencing.py +++ b/src/databricks/labs/ucx/sequencing/sequencing.py @@ -104,10 +104,16 @@ def _visit_dependency(self, graph: DependencyGraph) -> bool | None: def register_dependency(self, parent_node: MigrationNode, object_type: str, object_id: str) -> MigrationNode: dependency_node = self._nodes.get((object_type, object_id), None) - if dependency_node: - return dependency_node + if not dependency_node: + dependency_node = self._create_dependency_node(object_type, object_id) + if parent_node: + self._incoming[parent_node.key].add(dependency_node.key) + self._outgoing[dependency_node.key].add(parent_node.key) + return dependency_node + + def _create_dependency_node(self, object_type: str, object_id: str) -> MigrationNode: object_name: str = "" - object_owner: str = "" + _object_owner: str = "" if object_type in {"NOTEBOOK", "FILE"}: path = Path(object_id) for library_root in self._path_lookup.library_roots: @@ -127,8 +133,6 @@ def register_dependency(self, parent_node: MigrationNode, object_type: str, obje object_owner=object_owner, ) self._nodes[dependency_node.key] = dependency_node - self._incoming[dependency_node.key].add(parent_node.key) - self._outgoing[parent_node.key].add(dependency_node.key) return dependency_node def register_workflow_job(self, job: jobs.Job) -> MigrationNode: From ad1f7cfaf6b616c778db2ffc261f5fe5f48e5713 Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Fri, 18 Oct 2024 14:18:35 +0200 Subject: [PATCH 16/21] fix incorrect step sequence --- tests/unit/sequencing/test_sequencing.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tests/unit/sequencing/test_sequencing.py b/tests/unit/sequencing/test_sequencing.py index e41e92358c..fde276ccd2 100644 --- a/tests/unit/sequencing/test_sequencing.py +++ b/tests/unit/sequencing/test_sequencing.py @@ -11,8 +11,9 @@ from databricks.labs.ucx.mixins.cached_workspace_path import WorkspaceCache from databricks.labs.ucx.sequencing.sequencing import MigrationSequencer from databricks.labs.ucx.source_code.base import CurrentSessionState -from databricks.labs.ucx.source_code.graph import DependencyGraph +from databricks.labs.ucx.source_code.graph import DependencyGraph, Dependency from databricks.labs.ucx.source_code.jobs import WorkflowTask +from databricks.labs.ucx.source_code.linters.files import FileLoader def admin_locator(ws, user_name: str): @@ -67,13 +68,14 @@ def test_sequencer_builds_steps_from_dependency_graph(ws, simple_dependency_reso assert step0 step1 = next((step for step in steps if step.object_name == notebook_path.as_posix()), None) assert step1 - assert step1.step_number > step0.step_number + assert step1.step_number < step0.step_number step2 = next( (step for step in steps if step.object_name == "parent_that_magic_runs_child_that_uses_value_from_parent.py"), None, ) assert step2 - assert step2.step_number > step1.step_number + assert step2.step_number < step1.step_number step3 = next((step for step in steps if step.object_name == "_child_that_uses_value_from_parent.py"), None) assert step3 - assert step3.step_number > step2.step_number + assert step3.step_number < step2.step_number + From eda74f2d1245f9500a45327c7bbac0bed035b32c Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Fri, 18 Oct 2024 15:03:53 +0200 Subject: [PATCH 17/21] basic support of cyclic dependencies --- .../labs/ucx/sequencing/sequencing.py | 40 ++++++++++++++----- tests/unit/sequencing/test_sequencing.py | 34 ++++++++++++++++ 2 files changed, 65 insertions(+), 9 deletions(-) diff --git a/src/databricks/labs/ucx/sequencing/sequencing.py b/src/databricks/labs/ucx/sequencing/sequencing.py index 2bca4cbe94..a87c2fa3d4 100644 --- a/src/databricks/labs/ucx/sequencing/sequencing.py +++ b/src/databricks/labs/ucx/sequencing/sequencing.py @@ -181,20 +181,31 @@ def register_cluster(self, cluster_id: str) -> MigrationNode: return cluster_node def generate_steps(self) -> Iterable[MigrationStep]: - # algo adapted from Kahn topological sort. The main differences is that - # we want the same step number for all nodes with same dependency depth - # so instead of pushing to a queue, we rebuild it once all leaf nodes are processed - # (these are transient leaf nodes i.e. they only become leaf during processing) + """ The below algo is adapted from Kahn's topological sort. + The main differences are as follows: + 1) we want the same step number for all nodes with same dependency depth + so instead of pushing 'leaf' nodes to a queue, we fetch them again once all current 'leaf' nodes are processed + (these are transient 'leaf' nodes i.e. they only become 'leaf' during processing) + 2) Kahn only supports DAGs but python code allows cyclic dependencies i.e. A -> B -> C -> A is not a DAG + so when fetching 'leaf' nodes, we relax the 0-incoming-vertex rule in order + to avoid an infinite loop. We also avoid side effects (such as negative counts). + This algo works correctly for simple cases, but is not tested on large trees. + """ incoming_counts = self._populate_incoming_counts() step_number = 1 sorted_steps: list[MigrationStep] = [] while len(incoming_counts) > 0: - leaf_keys = list(self._get_leaf_keys(incoming_counts)) + leaf_keys = self._get_leaf_keys(incoming_counts) for leaf_key in leaf_keys: del incoming_counts[leaf_key] sorted_steps.append(self._nodes[leaf_key].as_step(step_number, list(self._required_step_ids(leaf_key)))) for dependency_key in self._outgoing[leaf_key]: - incoming_counts[dependency_key] -= 1 + # prevent re-instantiation of already deleted keys + if dependency_key not in incoming_counts: + continue + # prevent negative count with cyclic dependencies + if incoming_counts[dependency_key] > 0: + incoming_counts[dependency_key] -= 1 step_number += 1 return sorted_steps @@ -208,9 +219,20 @@ def _populate_incoming_counts(self) -> dict[tuple[str, str], int]: result[node_key] = len(self._incoming[node_key]) return result - @staticmethod - def _get_leaf_keys(incoming_counts: dict[tuple[str, str], int]) -> Iterable[tuple[str, str]]: + @classmethod + def _get_leaf_keys(cls, incoming_counts: dict[tuple[str, str], int]) -> Iterable[tuple[str, str]]: + count = 0 + leaf_keys = list(cls._yield_leaf_keys(incoming_counts, count)) + # if we're not finding nodes with 0 incoming counts, it's likely caused by cyclic dependencies + # in which case it's safe to process nodes with a higher incoming count + while not leaf_keys: + count += 1 + leaf_keys = list(cls._yield_leaf_keys(incoming_counts, count)) + return leaf_keys + + @classmethod + def _yield_leaf_keys(cls, incoming_counts: dict[tuple[str, str], int], level: int) -> Iterable[tuple[str, str]]: for node_key, incoming_count in incoming_counts.items(): - if incoming_count > 0: + if incoming_count > level: continue yield node_key diff --git a/tests/unit/sequencing/test_sequencing.py b/tests/unit/sequencing/test_sequencing.py index fde276ccd2..102da61d5a 100644 --- a/tests/unit/sequencing/test_sequencing.py +++ b/tests/unit/sequencing/test_sequencing.py @@ -79,3 +79,37 @@ def test_sequencer_builds_steps_from_dependency_graph(ws, simple_dependency_reso assert step3 assert step3.step_number < step2.step_number + +class _DependencyGraph(DependencyGraph): + + def add_dependency(self, graph: DependencyGraph): + self._dependencies[graph.dependency] = graph + + +class _MigrationSequencer(MigrationSequencer): + + def visit_graph(self, graph: DependencyGraph): + graph.visit(self._visit_dependency, None) + + +def test_sequencer_supports_cyclic_dependencies(ws, simple_dependency_resolver, mock_path_lookup): + root = Dependency(FileLoader(), Path("root.py")) + root_graph = _DependencyGraph(root, None, simple_dependency_resolver, mock_path_lookup, CurrentSessionState()) + child_a = Dependency(FileLoader(), Path("a.py")) + child_graph_a = _DependencyGraph(child_a, root_graph, simple_dependency_resolver, mock_path_lookup, CurrentSessionState()) + child_b = Dependency(FileLoader(), Path("b.py")) + child_graph_b = _DependencyGraph(child_b, root_graph, simple_dependency_resolver, mock_path_lookup, CurrentSessionState()) + # root imports a and b + root_graph.add_dependency(child_graph_a) + root_graph.add_dependency(child_graph_b) + # a imports b + child_graph_a.add_dependency(child_graph_b) + # b imports a (using local import) + child_graph_b.add_dependency(child_graph_a) + sequencer = _MigrationSequencer(ws, mock_path_lookup, admin_locator(ws, "John Doe")) + sequencer.register_dependency(None, root.lineage[-1].object_type, root.lineage[-1].object_id) + sequencer.visit_graph(root_graph) + steps = list(sequencer.generate_steps()) + assert len(steps) == 3 + assert steps[2].object_id == "root.py" + From efa28393737eba284cf72b7058f4ecf4ee75c352 Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Fri, 18 Oct 2024 15:08:01 +0200 Subject: [PATCH 18/21] rename local --- src/databricks/labs/ucx/sequencing/sequencing.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/databricks/labs/ucx/sequencing/sequencing.py b/src/databricks/labs/ucx/sequencing/sequencing.py index a87c2fa3d4..915bae1113 100644 --- a/src/databricks/labs/ucx/sequencing/sequencing.py +++ b/src/databricks/labs/ucx/sequencing/sequencing.py @@ -221,18 +221,18 @@ def _populate_incoming_counts(self) -> dict[tuple[str, str], int]: @classmethod def _get_leaf_keys(cls, incoming_counts: dict[tuple[str, str], int]) -> Iterable[tuple[str, str]]: - count = 0 - leaf_keys = list(cls._yield_leaf_keys(incoming_counts, count)) + max_count = 0 + leaf_keys = list(cls._yield_leaf_keys(incoming_counts, max_count)) # if we're not finding nodes with 0 incoming counts, it's likely caused by cyclic dependencies # in which case it's safe to process nodes with a higher incoming count while not leaf_keys: - count += 1 - leaf_keys = list(cls._yield_leaf_keys(incoming_counts, count)) + max_count += 1 + leaf_keys = list(cls._yield_leaf_keys(incoming_counts, max_count)) return leaf_keys @classmethod - def _yield_leaf_keys(cls, incoming_counts: dict[tuple[str, str], int], level: int) -> Iterable[tuple[str, str]]: + def _yield_leaf_keys(cls, incoming_counts: dict[tuple[str, str], int], max_count: int) -> Iterable[tuple[str, str]]: for node_key, incoming_count in incoming_counts.items(): - if incoming_count > level: + if incoming_count > max_count: continue yield node_key From a8a6af3bfb5a31e01d499e641a9a114b442b875f Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Fri, 18 Oct 2024 15:10:36 +0200 Subject: [PATCH 19/21] formatting --- src/databricks/labs/ucx/sequencing/sequencing.py | 4 ++-- tests/unit/sequencing/test_sequencing.py | 9 ++++++--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/databricks/labs/ucx/sequencing/sequencing.py b/src/databricks/labs/ucx/sequencing/sequencing.py index 915bae1113..897a1c137a 100644 --- a/src/databricks/labs/ucx/sequencing/sequencing.py +++ b/src/databricks/labs/ucx/sequencing/sequencing.py @@ -181,7 +181,7 @@ def register_cluster(self, cluster_id: str) -> MigrationNode: return cluster_node def generate_steps(self) -> Iterable[MigrationStep]: - """ The below algo is adapted from Kahn's topological sort. + """The below algo is adapted from Kahn's topological sort. The main differences are as follows: 1) we want the same step number for all nodes with same dependency depth so instead of pushing 'leaf' nodes to a queue, we fetch them again once all current 'leaf' nodes are processed @@ -190,7 +190,7 @@ def generate_steps(self) -> Iterable[MigrationStep]: so when fetching 'leaf' nodes, we relax the 0-incoming-vertex rule in order to avoid an infinite loop. We also avoid side effects (such as negative counts). This algo works correctly for simple cases, but is not tested on large trees. - """ + """ incoming_counts = self._populate_incoming_counts() step_number = 1 sorted_steps: list[MigrationStep] = [] diff --git a/tests/unit/sequencing/test_sequencing.py b/tests/unit/sequencing/test_sequencing.py index 102da61d5a..ed92378808 100644 --- a/tests/unit/sequencing/test_sequencing.py +++ b/tests/unit/sequencing/test_sequencing.py @@ -96,9 +96,13 @@ def test_sequencer_supports_cyclic_dependencies(ws, simple_dependency_resolver, root = Dependency(FileLoader(), Path("root.py")) root_graph = _DependencyGraph(root, None, simple_dependency_resolver, mock_path_lookup, CurrentSessionState()) child_a = Dependency(FileLoader(), Path("a.py")) - child_graph_a = _DependencyGraph(child_a, root_graph, simple_dependency_resolver, mock_path_lookup, CurrentSessionState()) + child_graph_a = _DependencyGraph( + child_a, root_graph, simple_dependency_resolver, mock_path_lookup, CurrentSessionState() + ) child_b = Dependency(FileLoader(), Path("b.py")) - child_graph_b = _DependencyGraph(child_b, root_graph, simple_dependency_resolver, mock_path_lookup, CurrentSessionState()) + child_graph_b = _DependencyGraph( + child_b, root_graph, simple_dependency_resolver, mock_path_lookup, CurrentSessionState() + ) # root imports a and b root_graph.add_dependency(child_graph_a) root_graph.add_dependency(child_graph_b) @@ -112,4 +116,3 @@ def test_sequencer_supports_cyclic_dependencies(ws, simple_dependency_resolver, steps = list(sequencer.generate_steps()) assert len(steps) == 3 assert steps[2].object_id == "root.py" - From cf8a9dd3a42028d08398dd91485a25424c74f933 Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Fri, 18 Oct 2024 15:39:38 +0200 Subject: [PATCH 20/21] formatting --- .../labs/ucx/sequencing/sequencing.py | 17 ++++++----- tests/unit/sequencing/test_sequencing.py | 30 +++++++++---------- 2 files changed, 24 insertions(+), 23 deletions(-) diff --git a/src/databricks/labs/ucx/sequencing/sequencing.py b/src/databricks/labs/ucx/sequencing/sequencing.py index 897a1c137a..025c81bc5c 100644 --- a/src/databricks/labs/ucx/sequencing/sequencing.py +++ b/src/databricks/labs/ucx/sequencing/sequencing.py @@ -199,16 +199,19 @@ def generate_steps(self) -> Iterable[MigrationStep]: for leaf_key in leaf_keys: del incoming_counts[leaf_key] sorted_steps.append(self._nodes[leaf_key].as_step(step_number, list(self._required_step_ids(leaf_key)))) - for dependency_key in self._outgoing[leaf_key]: - # prevent re-instantiation of already deleted keys - if dependency_key not in incoming_counts: - continue - # prevent negative count with cyclic dependencies - if incoming_counts[dependency_key] > 0: - incoming_counts[dependency_key] -= 1 + self._on_leaf_key_processed(leaf_key, incoming_counts) step_number += 1 return sorted_steps + def _on_leaf_key_processed(self, leaf_key: tuple[str, str], incoming_counts: dict[tuple[str, str], int]): + for dependency_key in self._outgoing[leaf_key]: + # prevent re-instantiation of already deleted keys + if dependency_key not in incoming_counts: + continue + # prevent negative count with cyclic dependencies + if incoming_counts[dependency_key] > 0: + incoming_counts[dependency_key] -= 1 + def _required_step_ids(self, node_key: tuple[str, str]) -> Iterable[int]: for leaf_key in self._incoming[node_key]: yield self._nodes[leaf_key].node_id diff --git a/tests/unit/sequencing/test_sequencing.py b/tests/unit/sequencing/test_sequencing.py index ed92378808..d1c6f776a7 100644 --- a/tests/unit/sequencing/test_sequencing.py +++ b/tests/unit/sequencing/test_sequencing.py @@ -55,7 +55,7 @@ def test_sequencer_builds_steps_from_dependency_graph(ws, simple_dependency_reso job = jobs.Job(job_id=1234, settings=settings) ws.jobs.get.return_value = job ws_cache = create_autospec(WorkspaceCache) - ws_cache.get_workspace_path.side_effect = lambda path: Path(path) + ws_cache.get_workspace_path.side_effect = Path dependency = WorkflowTask(ws, task, job, ws_cache) container = dependency.load(mock_path_lookup) graph = DependencyGraph(dependency, None, simple_dependency_resolver, mock_path_lookup, CurrentSessionState()) @@ -63,21 +63,19 @@ def test_sequencer_builds_steps_from_dependency_graph(ws, simple_dependency_reso assert not problems sequencer = MigrationSequencer(ws, mock_path_lookup, admin_locator(ws, "John Doe")) sequencer.register_workflow_task(task, job, graph) - steps = list(sequencer.generate_steps()) - step0 = next((step for step in steps if step.object_type == "TASK"), None) - assert step0 - step1 = next((step for step in steps if step.object_name == notebook_path.as_posix()), None) - assert step1 - assert step1.step_number < step0.step_number - step2 = next( - (step for step in steps if step.object_name == "parent_that_magic_runs_child_that_uses_value_from_parent.py"), - None, - ) - assert step2 - assert step2.step_number < step1.step_number - step3 = next((step for step in steps if step.object_name == "_child_that_uses_value_from_parent.py"), None) - assert step3 - assert step3.step_number < step2.step_number + all_steps = list(sequencer.generate_steps()) + # ensure steps have a consistent step_number: TASK > grand-parent > parent > child + parent_name = "parent_that_magic_runs_child_that_uses_value_from_parent.py" + steps = [ + next((step for step in all_steps if step.object_name == "_child_that_uses_value_from_parent.py"), None), + next((step for step in all_steps if step.object_name == parent_name), None), + next((step for step in all_steps if step.object_name == notebook_path.as_posix()), None), + next((step for step in all_steps if step.object_type == "TASK"), None), + ] + # ensure steps have a consistent step_number + for i in range(0, len(steps) - 1): + assert steps[i] + assert steps[i].step_number < steps[i + 1].step_number class _DependencyGraph(DependencyGraph): From eb7974676381ba7b4d7bb340e70ffdd3a6345dbe Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Mon, 21 Oct 2024 14:01:20 +0200 Subject: [PATCH 21/21] move package --- .../labs/ucx/assessment/sequencing.py | 67 ++++- .../labs/ucx/sequencing/__init__.py | 0 .../labs/ucx/sequencing/sequencing.py | 241 ------------------ tests/unit/assessment/test_sequencing.py | 33 +++ tests/unit/sequencing/__init__.py | 0 tests/unit/sequencing/test_sequencing.py | 116 --------- 6 files changed, 96 insertions(+), 361 deletions(-) delete mode 100644 src/databricks/labs/ucx/sequencing/__init__.py delete mode 100644 src/databricks/labs/ucx/sequencing/sequencing.py delete mode 100644 tests/unit/sequencing/__init__.py delete mode 100644 tests/unit/sequencing/test_sequencing.py diff --git a/src/databricks/labs/ucx/assessment/sequencing.py b/src/databricks/labs/ucx/assessment/sequencing.py index 4fa660f558..2e6217dc12 100644 --- a/src/databricks/labs/ucx/assessment/sequencing.py +++ b/src/databricks/labs/ucx/assessment/sequencing.py @@ -5,6 +5,7 @@ from collections import defaultdict from collections.abc import Iterable from dataclasses import dataclass, field +from pathlib import Path from databricks.sdk import WorkspaceClient from databricks.sdk.errors import DatabricksError @@ -12,8 +13,9 @@ from databricks.labs.ucx.assessment.clusters import ClusterOwnership, ClusterInfo from databricks.labs.ucx.assessment.jobs import JobOwnership, JobInfo -from databricks.labs.ucx.framework.owners import AdministratorLocator -from databricks.labs.ucx.source_code.graph import DependencyProblem +from databricks.labs.ucx.framework.owners import AdministratorLocator, WorkspaceObjectOwnership +from databricks.labs.ucx.source_code.graph import DependencyGraph, DependencyProblem +from databricks.labs.ucx.source_code.path_lookup import PathLookup @dataclass @@ -39,6 +41,9 @@ class MigrationStep: required_step_ids: list[int] """The step ids that should be completed before this step is started.""" + @property + def key(self) -> tuple[str, str]: + return self.object_type, self.object_id MigrationNodeKey = tuple[str, str] @@ -148,8 +153,9 @@ class MigrationSequencer: Analysing the graph in this case means: computing the migration sequence in `meth:generate_steps`. """ - def __init__(self, ws: WorkspaceClient, administrator_locator: AdministratorLocator): + def __init__(self, ws: WorkspaceClient, path_lookup: PathLookup, administrator_locator: AdministratorLocator): self._ws = ws + self._path_lookup = path_lookup self._admin_locator = administrator_locator self._counter = itertools.count() self._nodes: dict[MigrationNodeKey, MigrationNode] = {} @@ -220,7 +226,7 @@ def _register_job(self, job: Job) -> MaybeMigrationNode: problems.append(problem) return MaybeMigrationNode(job_node, problems) - def _register_workflow_task(self, task: Task, parent: MigrationNode) -> MaybeMigrationNode: + def _register_workflow_task(self, task: Task, parent: MigrationNode, graph: DependencyGraph) -> MaybeMigrationNode: """Register a workflow task. TODO: @@ -262,8 +268,51 @@ def _register_workflow_task(self, task: Task, parent: MigrationNode) -> MaybeMig else: problem = DependencyProblem('cluster-not-found', f"Could not find cluster: {task.job_cluster_key}") problems.append(problem) + graph.visit(self._visit_dependency, None) return MaybeMigrationNode(task_node, problems) + def _visit_dependency(self, graph: DependencyGraph) -> bool | None: + lineage = graph.dependency.lineage[-1] + parent_node = self._nodes[(lineage.object_type, lineage.object_id)] + for dependency in graph.local_dependencies: + lineage = dependency.lineage[-1] + self.register_dependency(parent_node, lineage.object_type, lineage.object_id) + # TODO tables and dfsas + return False + + def register_dependency(self, parent_node: MigrationNode, object_type: str, object_id: str) -> MigrationNode: + dependency_node = self._nodes.get((object_type, object_id), None) + if not dependency_node: + dependency_node = self._create_dependency_node(object_type, object_id) + if parent_node: + self._incoming[parent_node.key].add(dependency_node.key) + self._outgoing[dependency_node.key].add(parent_node.key) + return dependency_node + + def _create_dependency_node(self, object_type: str, object_id: str) -> MigrationNode: + object_name: str = "" + _object_owner: str = "" + if object_type in {"NOTEBOOK", "FILE"}: + path = Path(object_id) + for library_root in self._path_lookup.library_roots: + if not path.is_relative_to(library_root): + continue + object_name = path.relative_to(library_root).as_posix() + break + object_owner = WorkspaceObjectOwnership(self._admin_locator).owner_of((object_type, object_id)) + else: + raise ValueError(f"{object_type} not supported yet!") + self._last_node_id += 1 + dependency_node = MigrationNode( + node_id=self._last_node_id, + object_type=object_type, + object_id=object_id, + object_name=object_name, + object_owner=object_owner, + ) + self._nodes[dependency_node.key] = dependency_node + return dependency_node + def _register_job_cluster(self, cluster: JobCluster, parent: MigrationNode) -> MaybeMigrationNode: """Register a job cluster. @@ -322,6 +371,16 @@ def generate_steps(self) -> Iterable[MigrationStep]: leaf during processing) - We handle cyclic dependencies (implemented in PR #3009) """ + """The below algo is adapted from Kahn's topological sort. + The main differences are as follows: + 1) we want the same step number for all nodes with same dependency depth + so instead of pushing 'leaf' nodes to a queue, we fetch them again once all current 'leaf' nodes are processed + (these are transient 'leaf' nodes i.e. they only become 'leaf' during processing) + 2) Kahn only supports DAGs but python code allows cyclic dependencies i.e. A -> B -> C -> A is not a DAG + so when fetching 'leaf' nodes, we relax the 0-incoming-vertex rule in order + to avoid an infinite loop. We also avoid side effects (such as negative counts). + This algo works correctly for simple cases, but is not tested on large trees. + """ ordered_steps: list[MigrationStep] = [] # For updating the priority of steps that depend on other steps incoming_references = self._invert_outgoing_to_incoming_references() diff --git a/src/databricks/labs/ucx/sequencing/__init__.py b/src/databricks/labs/ucx/sequencing/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/src/databricks/labs/ucx/sequencing/sequencing.py b/src/databricks/labs/ucx/sequencing/sequencing.py deleted file mode 100644 index 025c81bc5c..0000000000 --- a/src/databricks/labs/ucx/sequencing/sequencing.py +++ /dev/null @@ -1,241 +0,0 @@ -from __future__ import annotations - -from collections import defaultdict -from collections.abc import Iterable -from dataclasses import dataclass -from pathlib import Path - -from databricks.sdk import WorkspaceClient -from databricks.sdk.service import jobs - -from databricks.labs.ucx.assessment.clusters import ClusterOwnership, ClusterInfo -from databricks.labs.ucx.assessment.jobs import JobOwnership, JobInfo -from databricks.labs.ucx.framework.owners import AdministratorLocator, WorkspaceObjectOwnership -from databricks.labs.ucx.source_code.graph import DependencyGraph -from databricks.labs.ucx.source_code.path_lookup import PathLookup - - -@dataclass -class MigrationStep: - step_id: int - step_number: int - object_type: str - object_id: str - object_name: str - object_owner: str - required_step_ids: list[int] - - @property - def key(self) -> tuple[str, str]: - return self.object_type, self.object_id - - -@dataclass -class MigrationNode: - node_id: int - object_type: str - object_id: str - object_name: str - object_owner: str - - @property - def key(self) -> tuple[str, str]: - return self.object_type, self.object_id - - def as_step(self, step_number: int, required_step_ids: list[int]) -> MigrationStep: - return MigrationStep( - step_id=self.node_id, - step_number=step_number, - object_type=self.object_type, - object_id=self.object_id, - object_name=self.object_name, - object_owner=self.object_owner, - required_step_ids=required_step_ids, - ) - - -class MigrationSequencer: - - def __init__(self, ws: WorkspaceClient, path_lookup: PathLookup, admin_locator: AdministratorLocator): - self._ws = ws - self._path_lookup = path_lookup - self._admin_locator = admin_locator - self._last_node_id = 0 - self._nodes: dict[tuple[str, str], MigrationNode] = {} - self._incoming: dict[tuple[str, str], set[tuple[str, str]]] = defaultdict(set) - self._outgoing: dict[tuple[str, str], set[tuple[str, str]]] = defaultdict(set) - - def register_workflow_task(self, task: jobs.Task, job: jobs.Job, graph: DependencyGraph) -> MigrationNode: - task_id = f"{job.job_id}/{task.task_key}" - task_node = self._nodes.get(("TASK", task_id), None) - if task_node: - return task_node - job_node = self.register_workflow_job(job) - self._last_node_id += 1 - task_node = MigrationNode( - node_id=self._last_node_id, - object_type="TASK", - object_id=task_id, - object_name=task.task_key, - object_owner=job_node.object_owner, # no task owner so use job one - ) - self._nodes[task_node.key] = task_node - self._incoming[job_node.key].add(task_node.key) - self._outgoing[task_node.key].add(job_node.key) - if task.existing_cluster_id: - cluster_node = self.register_cluster(task.existing_cluster_id) - if cluster_node: - self._incoming[cluster_node.key].add(task_node.key) - self._outgoing[task_node.key].add(cluster_node.key) - # also make the cluster dependent on the job - self._incoming[cluster_node.key].add(job_node.key) - self._outgoing[job_node.key].add(cluster_node.key) - graph.visit(self._visit_dependency, None) - return task_node - - def _visit_dependency(self, graph: DependencyGraph) -> bool | None: - lineage = graph.dependency.lineage[-1] - parent_node = self._nodes[(lineage.object_type, lineage.object_id)] - for dependency in graph.local_dependencies: - lineage = dependency.lineage[-1] - self.register_dependency(parent_node, lineage.object_type, lineage.object_id) - # TODO tables and dfsas - return False - - def register_dependency(self, parent_node: MigrationNode, object_type: str, object_id: str) -> MigrationNode: - dependency_node = self._nodes.get((object_type, object_id), None) - if not dependency_node: - dependency_node = self._create_dependency_node(object_type, object_id) - if parent_node: - self._incoming[parent_node.key].add(dependency_node.key) - self._outgoing[dependency_node.key].add(parent_node.key) - return dependency_node - - def _create_dependency_node(self, object_type: str, object_id: str) -> MigrationNode: - object_name: str = "" - _object_owner: str = "" - if object_type in {"NOTEBOOK", "FILE"}: - path = Path(object_id) - for library_root in self._path_lookup.library_roots: - if not path.is_relative_to(library_root): - continue - object_name = path.relative_to(library_root).as_posix() - break - object_owner = WorkspaceObjectOwnership(self._admin_locator).owner_of((object_type, object_id)) - else: - raise ValueError(f"{object_type} not supported yet!") - self._last_node_id += 1 - dependency_node = MigrationNode( - node_id=self._last_node_id, - object_type=object_type, - object_id=object_id, - object_name=object_name, - object_owner=object_owner, - ) - self._nodes[dependency_node.key] = dependency_node - return dependency_node - - def register_workflow_job(self, job: jobs.Job) -> MigrationNode: - job_node = self._nodes.get(("WORKFLOW", str(job.job_id)), None) - if job_node: - return job_node - self._last_node_id += 1 - job_name = job.settings.name if job.settings and job.settings.name else str(job.job_id) - job_node = MigrationNode( - node_id=self._last_node_id, - object_type="WORKFLOW", - object_id=str(job.job_id), - object_name=job_name, - object_owner=JobOwnership(self._admin_locator).owner_of(JobInfo.from_job(job)), - ) - self._nodes[job_node.key] = job_node - if job.settings and job.settings.job_clusters: - for job_cluster in job.settings.job_clusters: - cluster_node = self.register_job_cluster(job_cluster) - if cluster_node: - self._incoming[cluster_node.key].add(job_node.key) - self._outgoing[job_node.key].add(cluster_node.key) - return job_node - - def register_job_cluster(self, cluster: jobs.JobCluster) -> MigrationNode | None: - if cluster.new_cluster: - return None - return self.register_cluster(cluster.job_cluster_key) - - def register_cluster(self, cluster_id: str) -> MigrationNode: - cluster_node = self._nodes.get(("CLUSTER", cluster_id), None) - if cluster_node: - return cluster_node - details = self._ws.clusters.get(cluster_id) - object_name = details.cluster_name if details and details.cluster_name else cluster_id - self._last_node_id += 1 - cluster_node = MigrationNode( - node_id=self._last_node_id, - object_type="CLUSTER", - object_id=cluster_id, - object_name=object_name, - object_owner=ClusterOwnership(self._admin_locator).owner_of(ClusterInfo.from_cluster_details(details)), - ) - self._nodes[cluster_node.key] = cluster_node - # TODO register warehouses and policies - return cluster_node - - def generate_steps(self) -> Iterable[MigrationStep]: - """The below algo is adapted from Kahn's topological sort. - The main differences are as follows: - 1) we want the same step number for all nodes with same dependency depth - so instead of pushing 'leaf' nodes to a queue, we fetch them again once all current 'leaf' nodes are processed - (these are transient 'leaf' nodes i.e. they only become 'leaf' during processing) - 2) Kahn only supports DAGs but python code allows cyclic dependencies i.e. A -> B -> C -> A is not a DAG - so when fetching 'leaf' nodes, we relax the 0-incoming-vertex rule in order - to avoid an infinite loop. We also avoid side effects (such as negative counts). - This algo works correctly for simple cases, but is not tested on large trees. - """ - incoming_counts = self._populate_incoming_counts() - step_number = 1 - sorted_steps: list[MigrationStep] = [] - while len(incoming_counts) > 0: - leaf_keys = self._get_leaf_keys(incoming_counts) - for leaf_key in leaf_keys: - del incoming_counts[leaf_key] - sorted_steps.append(self._nodes[leaf_key].as_step(step_number, list(self._required_step_ids(leaf_key)))) - self._on_leaf_key_processed(leaf_key, incoming_counts) - step_number += 1 - return sorted_steps - - def _on_leaf_key_processed(self, leaf_key: tuple[str, str], incoming_counts: dict[tuple[str, str], int]): - for dependency_key in self._outgoing[leaf_key]: - # prevent re-instantiation of already deleted keys - if dependency_key not in incoming_counts: - continue - # prevent negative count with cyclic dependencies - if incoming_counts[dependency_key] > 0: - incoming_counts[dependency_key] -= 1 - - def _required_step_ids(self, node_key: tuple[str, str]) -> Iterable[int]: - for leaf_key in self._incoming[node_key]: - yield self._nodes[leaf_key].node_id - - def _populate_incoming_counts(self) -> dict[tuple[str, str], int]: - result = defaultdict(int) - for node_key in self._nodes: - result[node_key] = len(self._incoming[node_key]) - return result - - @classmethod - def _get_leaf_keys(cls, incoming_counts: dict[tuple[str, str], int]) -> Iterable[tuple[str, str]]: - max_count = 0 - leaf_keys = list(cls._yield_leaf_keys(incoming_counts, max_count)) - # if we're not finding nodes with 0 incoming counts, it's likely caused by cyclic dependencies - # in which case it's safe to process nodes with a higher incoming count - while not leaf_keys: - max_count += 1 - leaf_keys = list(cls._yield_leaf_keys(incoming_counts, max_count)) - return leaf_keys - - @classmethod - def _yield_leaf_keys(cls, incoming_counts: dict[tuple[str, str], int], max_count: int) -> Iterable[tuple[str, str]]: - for node_key, incoming_count in incoming_counts.items(): - if incoming_count > max_count: - continue - yield node_key diff --git a/tests/unit/assessment/test_sequencing.py b/tests/unit/assessment/test_sequencing.py index 1708c9a537..fa93927831 100644 --- a/tests/unit/assessment/test_sequencing.py +++ b/tests/unit/assessment/test_sequencing.py @@ -280,6 +280,39 @@ def test_sequence_steps_from_job_task_with_new_cluster(ws, admin_locator) -> Non ] +class _MigrationSequencer(MigrationSequencer): + + def visit_graph(self, graph: DependencyGraph): + graph.visit(self._visit_dependency, None) + + +def test_sequencer_supports_cyclic_dependencies(ws, simple_dependency_resolver, mock_path_lookup): + root = Dependency(FileLoader(), Path("root.py")) + root_graph = _DependencyGraph(root, None, simple_dependency_resolver, mock_path_lookup, CurrentSessionState()) + child_a = Dependency(FileLoader(), Path("a.py")) + child_graph_a = _DependencyGraph( + child_a, root_graph, simple_dependency_resolver, mock_path_lookup, CurrentSessionState() + ) + child_b = Dependency(FileLoader(), Path("b.py")) + child_graph_b = _DependencyGraph( + child_b, root_graph, simple_dependency_resolver, mock_path_lookup, CurrentSessionState() + ) + # root imports a and b + root_graph.add_dependency(child_graph_a) + root_graph.add_dependency(child_graph_b) + # a imports b + child_graph_a.add_dependency(child_graph_b) + # b imports a (using local import) + child_graph_b.add_dependency(child_graph_a) + sequencer = _MigrationSequencer(ws, mock_path_lookup, admin_locator(ws, "John Doe")) + sequencer.register_dependency(None, root.lineage[-1].object_type, root.lineage[-1].object_id) + sequencer.visit_graph(root_graph) + steps = list(sequencer.generate_steps()) + assert len(steps) == 3 + assert steps[2].object_id == "root.py" + + + def test_sequence_steps_from_job_task_with_non_existing_cluster(ws, admin_locator) -> None: """Sequence a job with a task that references a non-existing cluster. diff --git a/tests/unit/sequencing/__init__.py b/tests/unit/sequencing/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/tests/unit/sequencing/test_sequencing.py b/tests/unit/sequencing/test_sequencing.py deleted file mode 100644 index d1c6f776a7..0000000000 --- a/tests/unit/sequencing/test_sequencing.py +++ /dev/null @@ -1,116 +0,0 @@ -from unittest.mock import create_autospec - -from pathlib import Path - -from databricks.sdk.service import iam, jobs - -from databricks.sdk.service.compute import ClusterDetails -from databricks.sdk.service.jobs import NotebookTask - -from databricks.labs.ucx.framework.owners import AdministratorLocator, AdministratorFinder -from databricks.labs.ucx.mixins.cached_workspace_path import WorkspaceCache -from databricks.labs.ucx.sequencing.sequencing import MigrationSequencer -from databricks.labs.ucx.source_code.base import CurrentSessionState -from databricks.labs.ucx.source_code.graph import DependencyGraph, Dependency -from databricks.labs.ucx.source_code.jobs import WorkflowTask -from databricks.labs.ucx.source_code.linters.files import FileLoader - - -def admin_locator(ws, user_name: str): - admin_finder = create_autospec(AdministratorFinder) - admin_user = iam.User(user_name=user_name, active=True, roles=[iam.ComplexValue(value="account_admin")]) - admin_finder.find_admin_users.return_value = (admin_user,) - return AdministratorLocator(ws, finders=[lambda _ws: admin_finder]) - - -def test_sequencer_builds_cluster_and_children_from_task(ws, simple_dependency_resolver, mock_path_lookup): - ws.clusters.get.return_value = ClusterDetails(cluster_name="my-cluster", creator_user_name="John Doe") - task = jobs.Task(task_key="test-task", existing_cluster_id="cluster-123") - settings = jobs.JobSettings(name="test-job", tasks=[task]) - job = jobs.Job(job_id=1234, settings=settings) - ws.jobs.get.return_value = job - dependency = WorkflowTask(ws, task, job) - graph = DependencyGraph(dependency, None, simple_dependency_resolver, mock_path_lookup, CurrentSessionState()) - sequencer = MigrationSequencer(ws, mock_path_lookup, admin_locator(ws, "John Doe")) - sequencer.register_workflow_task(task, job, graph) - steps = list(sequencer.generate_steps()) - step = steps[-1] - assert step.step_id - assert step.object_type == "CLUSTER" - assert step.object_id == "cluster-123" - assert step.object_name == "my-cluster" - assert step.object_owner == "John Doe" - assert step.step_number == 3 - assert len(step.required_step_ids) == 2 - - -def test_sequencer_builds_steps_from_dependency_graph(ws, simple_dependency_resolver, mock_path_lookup): - functional = mock_path_lookup.resolve(Path("functional")) - mock_path_lookup.append_path(functional) - mock_path_lookup = mock_path_lookup.change_directory(functional) - notebook_path = Path("grand_parent_that_imports_parent_that_magic_runs_child.py") - notebook_task = NotebookTask(notebook_path=notebook_path.as_posix()) - task = jobs.Task(task_key="test-task", existing_cluster_id="cluster-123", notebook_task=notebook_task) - settings = jobs.JobSettings(name="test-job", tasks=[task]) - job = jobs.Job(job_id=1234, settings=settings) - ws.jobs.get.return_value = job - ws_cache = create_autospec(WorkspaceCache) - ws_cache.get_workspace_path.side_effect = Path - dependency = WorkflowTask(ws, task, job, ws_cache) - container = dependency.load(mock_path_lookup) - graph = DependencyGraph(dependency, None, simple_dependency_resolver, mock_path_lookup, CurrentSessionState()) - problems = container.build_dependency_graph(graph) - assert not problems - sequencer = MigrationSequencer(ws, mock_path_lookup, admin_locator(ws, "John Doe")) - sequencer.register_workflow_task(task, job, graph) - all_steps = list(sequencer.generate_steps()) - # ensure steps have a consistent step_number: TASK > grand-parent > parent > child - parent_name = "parent_that_magic_runs_child_that_uses_value_from_parent.py" - steps = [ - next((step for step in all_steps if step.object_name == "_child_that_uses_value_from_parent.py"), None), - next((step for step in all_steps if step.object_name == parent_name), None), - next((step for step in all_steps if step.object_name == notebook_path.as_posix()), None), - next((step for step in all_steps if step.object_type == "TASK"), None), - ] - # ensure steps have a consistent step_number - for i in range(0, len(steps) - 1): - assert steps[i] - assert steps[i].step_number < steps[i + 1].step_number - - -class _DependencyGraph(DependencyGraph): - - def add_dependency(self, graph: DependencyGraph): - self._dependencies[graph.dependency] = graph - - -class _MigrationSequencer(MigrationSequencer): - - def visit_graph(self, graph: DependencyGraph): - graph.visit(self._visit_dependency, None) - - -def test_sequencer_supports_cyclic_dependencies(ws, simple_dependency_resolver, mock_path_lookup): - root = Dependency(FileLoader(), Path("root.py")) - root_graph = _DependencyGraph(root, None, simple_dependency_resolver, mock_path_lookup, CurrentSessionState()) - child_a = Dependency(FileLoader(), Path("a.py")) - child_graph_a = _DependencyGraph( - child_a, root_graph, simple_dependency_resolver, mock_path_lookup, CurrentSessionState() - ) - child_b = Dependency(FileLoader(), Path("b.py")) - child_graph_b = _DependencyGraph( - child_b, root_graph, simple_dependency_resolver, mock_path_lookup, CurrentSessionState() - ) - # root imports a and b - root_graph.add_dependency(child_graph_a) - root_graph.add_dependency(child_graph_b) - # a imports b - child_graph_a.add_dependency(child_graph_b) - # b imports a (using local import) - child_graph_b.add_dependency(child_graph_a) - sequencer = _MigrationSequencer(ws, mock_path_lookup, admin_locator(ws, "John Doe")) - sequencer.register_dependency(None, root.lineage[-1].object_type, root.lineage[-1].object_id) - sequencer.visit_graph(root_graph) - steps = list(sequencer.generate_steps()) - assert len(steps) == 3 - assert steps[2].object_id == "root.py"