@@ -15,6 +15,7 @@ class MigrationStep:
15
15
step_number : int
16
16
object_type : str
17
17
object_id : str
18
+ object_name : str
18
19
object_owner : str
19
20
required_step_ids : list [int ] = field (default_factory = list )
20
21
@@ -25,6 +26,7 @@ class MigrationNode:
25
26
node_id : int
26
27
object_type : str
27
28
object_id : str
29
+ object_name : str
28
30
object_owner : str
29
31
required_steps : list [MigrationNode ] = field (default_factory = list )
30
32
@@ -46,6 +48,7 @@ def generate_steps(self) -> tuple[MigrationStep, Iterable[MigrationStep]]:
46
48
step_number = highest_step_number + 1 ,
47
49
object_type = self .object_type ,
48
50
object_id = self .object_id ,
51
+ object_name = self .object_name ,
49
52
object_owner = self .object_owner ,
50
53
required_step_ids = required_step_ids ,
51
54
)
@@ -64,16 +67,23 @@ def find(self, object_type: str, object_id: str) -> MigrationNode | None:
64
67
class MigrationSequencer :
65
68
66
69
def __init__ (self ):
67
- self ._root = MigrationNode (node_id = 0 , object_type = "ROOT" , object_id = "ROOT" , object_owner = "NONE" )
70
+ self ._root = MigrationNode (
71
+ node_id = 0 , object_type = "ROOT" , object_id = "ROOT" , object_name = "ROOT" , object_owner = "NONE"
72
+ )
68
73
69
74
def register_workflow_task (self , task : jobs .Task , job : jobs .Job , _graph : DependencyGraph ) -> MigrationNode :
70
- task_node = self ._find_node (object_type = "TASK" , object_id = task .task_key )
75
+ task_id = f"{ job .job_id } /{ task .task_key } "
76
+ task_node = self ._find_node (object_type = "TASK" , object_id = task_id )
71
77
if task_node :
72
78
return task_node
73
79
job_node = self .register_workflow_job (job )
74
80
MigrationNode .last_node_id += 1
75
81
task_node = MigrationNode (
76
- node_id = MigrationNode .last_node_id , object_type = "TASK" , object_id = task .task_key , object_owner = "NONE"
82
+ node_id = MigrationNode .last_node_id ,
83
+ object_type = "TASK" ,
84
+ object_id = task_id ,
85
+ object_name = task .task_key ,
86
+ object_owner = "NONE" ,
77
87
) # TODO object_owner
78
88
job_node .required_steps .append (task_node )
79
89
if task .existing_cluster_id :
@@ -89,8 +99,13 @@ def register_workflow_job(self, job: jobs.Job) -> MigrationNode:
89
99
if job_node :
90
100
return job_node
91
101
MigrationNode .last_node_id += 1
102
+ job_name = job .settings .name if job .settings and job .settings .name else str (job .job_id )
92
103
job_node = MigrationNode (
93
- node_id = MigrationNode .last_node_id , object_type = "JOB" , object_id = str (job .job_id ), object_owner = "NONE"
104
+ node_id = MigrationNode .last_node_id ,
105
+ object_type = "JOB" ,
106
+ object_id = str (job .job_id ),
107
+ object_name = job_name ,
108
+ object_owner = "NONE" ,
94
109
) # TODO object_owner
95
110
top_level = True
96
111
if job .settings and job .settings .job_clusters :
@@ -114,7 +129,11 @@ def register_cluster(self, cluster_key: str) -> MigrationNode:
114
129
return cluster_node
115
130
MigrationNode .last_node_id += 1
116
131
cluster_node = MigrationNode (
117
- node_id = MigrationNode .last_node_id , object_type = "CLUSTER" , object_id = cluster_key , object_owner = "NONE"
132
+ node_id = MigrationNode .last_node_id ,
133
+ object_type = "CLUSTER" ,
134
+ object_id = cluster_key ,
135
+ object_name = cluster_key ,
136
+ object_owner = "NONE" ,
118
137
) # TODO object_owner
119
138
# TODO register warehouses and policies
120
139
self ._root .required_steps .append (cluster_node )
0 commit comments