Skip to content

Commit a34001b

Browse files
committed
build migration steps for workflow task
1 parent 0b96a34 commit a34001b

File tree

4 files changed

+167
-0
lines changed

4 files changed

+167
-0
lines changed

src/databricks/labs/ucx/sequencing/__init__.py

Whitespace-only changes.
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
from __future__ import annotations
2+
3+
import itertools
4+
from collections.abc import Iterable
5+
from dataclasses import dataclass, field
6+
7+
from databricks.sdk.service import jobs
8+
9+
from databricks.labs.ucx.source_code.graph import DependencyGraph
10+
11+
12+
@dataclass
13+
class MigrationStep:
14+
step_id: int
15+
step_number: int
16+
object_type: str
17+
object_id: str
18+
object_owner: str
19+
required_step_ids: list[int] = field(default_factory=list)
20+
21+
22+
@dataclass
23+
class MigrationNode:
24+
last_node_id = 0
25+
node_id: int
26+
object_type: str
27+
object_id: str
28+
object_owner: str
29+
required_steps: list[MigrationNode] = field(default_factory=list)
30+
31+
def generate_steps(self) -> tuple[MigrationStep, Iterable[MigrationStep]]:
32+
# traverse the nodes using a depth-first algorithm
33+
# ultimate leaves have a step number of 1
34+
# use highest required step number + 1 for this step
35+
highest_step_number = 0
36+
required_step_ids: list[int] = []
37+
all_generated_steps: list[Iterable[MigrationStep]] = []
38+
for required_step in self.required_steps:
39+
step, generated_steps = required_step.generate_steps()
40+
highest_step_number = max(highest_step_number, step.step_number)
41+
required_step_ids.append(step.step_id)
42+
all_generated_steps.append(generated_steps)
43+
all_generated_steps.append([step])
44+
this_step = MigrationStep(
45+
step_id=self.node_id,
46+
step_number=highest_step_number + 1,
47+
object_type=self.object_type,
48+
object_id=self.object_id,
49+
object_owner=self.object_owner,
50+
required_step_ids=required_step_ids,
51+
)
52+
return this_step, itertools.chain(*all_generated_steps)
53+
54+
def find(self, object_type: str, object_id: str) -> MigrationNode | None:
55+
if object_type == self.object_type and object_id == self.object_id:
56+
return self
57+
for step in self.required_steps:
58+
found = step.find(object_type, object_id)
59+
if found:
60+
return found
61+
return None
62+
63+
64+
class MigrationSequencer:
65+
66+
def __init__(self):
67+
self._root = MigrationNode(node_id=0, object_type="ROOT", object_id="ROOT", object_owner="NONE")
68+
69+
def register_workflow_task(self, task: jobs.Task, job: jobs.Job, _graph: DependencyGraph) -> MigrationNode:
70+
task_node = self._find_node(object_type="TASK", object_id=task.task_key)
71+
if task_node:
72+
return task_node
73+
job_node = self.register_workflow_job(job)
74+
MigrationNode.last_node_id += 1
75+
task_node = MigrationNode(
76+
node_id=MigrationNode.last_node_id, object_type="TASK", object_id=task.task_key, object_owner="NONE"
77+
) # TODO object_owner
78+
job_node.required_steps.append(task_node)
79+
if task.existing_cluster_id:
80+
cluster_node = self.register_cluster(task.existing_cluster_id)
81+
cluster_node.required_steps.append(task_node)
82+
if job_node not in cluster_node.required_steps:
83+
cluster_node.required_steps.append(job_node)
84+
# TODO register dependency graph
85+
return task_node
86+
87+
def register_workflow_job(self, job: jobs.Job) -> MigrationNode:
88+
job_node = self._find_node(object_type="JOB", object_id=str(job.job_id))
89+
if job_node:
90+
return job_node
91+
MigrationNode.last_node_id += 1
92+
job_node = MigrationNode(
93+
node_id=MigrationNode.last_node_id, object_type="JOB", object_id=str(job.job_id), object_owner="NONE"
94+
) # TODO object_owner
95+
top_level = True
96+
if job.settings and job.settings.job_clusters:
97+
for job_cluster in job.settings.job_clusters:
98+
cluster_node = self.register_job_cluster(job_cluster)
99+
if cluster_node:
100+
top_level = False
101+
cluster_node.required_steps.append(job_node)
102+
if top_level:
103+
self._root.required_steps.append(job_node)
104+
return job_node
105+
106+
def register_job_cluster(self, cluster: jobs.JobCluster) -> MigrationNode | None:
107+
if cluster.new_cluster:
108+
return None
109+
return self.register_cluster(cluster.job_cluster_key)
110+
111+
def register_cluster(self, cluster_key: str) -> MigrationNode:
112+
cluster_node = self._find_node(object_type="CLUSTER", object_id=cluster_key)
113+
if cluster_node:
114+
return cluster_node
115+
MigrationNode.last_node_id += 1
116+
cluster_node = MigrationNode(
117+
node_id=MigrationNode.last_node_id, object_type="CLUSTER", object_id=cluster_key, object_owner="NONE"
118+
) # TODO object_owner
119+
# TODO register warehouses and policies
120+
self._root.required_steps.append(cluster_node)
121+
return cluster_node
122+
123+
def generate_steps(self) -> Iterable[MigrationStep]:
124+
_root_step, generated_steps = self._root.generate_steps()
125+
unique_steps = self._deduplicate_steps(generated_steps)
126+
return self._sorted_steps(unique_steps)
127+
128+
@staticmethod
129+
def _sorted_steps(steps: Iterable[MigrationStep]) -> Iterable[MigrationStep]:
130+
# sort by step number, lowest first
131+
return sorted(steps, key=lambda step: step.step_number)
132+
133+
@staticmethod
134+
def _deduplicate_steps(steps: Iterable[MigrationStep]) -> Iterable[MigrationStep]:
135+
best_steps: dict[int, MigrationStep] = {}
136+
for step in steps:
137+
existing = best_steps.get(step.step_id, None)
138+
# keep the step with the highest step number
139+
if existing and existing.step_number >= step.step_number:
140+
continue
141+
best_steps[step.step_id] = step
142+
return best_steps.values()
143+
144+
def _find_node(self, object_type: str, object_id: str) -> MigrationNode | None:
145+
return self._root.find(object_type, object_id)

tests/unit/sequencing/__init__.py

Whitespace-only changes.
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
from databricks.sdk.service import jobs
2+
3+
from databricks.labs.ucx.sequencing.sequencing import MigrationSequencer
4+
from databricks.labs.ucx.source_code.base import CurrentSessionState
5+
from databricks.labs.ucx.source_code.graph import DependencyGraph
6+
from databricks.labs.ucx.source_code.jobs import WorkflowTask
7+
8+
9+
def test_cluster_from_task_has_children(ws, simple_dependency_resolver, mock_path_lookup):
10+
task = jobs.Task(task_key="test-task", existing_cluster_id="cluster-123")
11+
settings = jobs.JobSettings(name="test-job", tasks=[task])
12+
job = jobs.Job(job_id=1234, settings=settings)
13+
ws.jobs.get.return_value = job
14+
dependency = WorkflowTask(ws, task, job)
15+
graph = DependencyGraph(dependency, None, simple_dependency_resolver, mock_path_lookup, CurrentSessionState())
16+
sequencer = MigrationSequencer()
17+
sequencer.register_workflow_task(task, job, graph)
18+
steps = list(sequencer.generate_steps())
19+
step = steps[-1]
20+
assert step.object_type == "CLUSTER"
21+
assert step.object_id == "cluster-123"
22+
assert step.step_number == 3

0 commit comments

Comments
 (0)