Skip to content

Implement migration sequencing (phase 1) #2980

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion src/databricks/labs/ucx/assessment/clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def _try_fetch(self) -> Iterable[ClusterInfo]:
yield ClusterInfo(*row)


class ClusterOwnership(Ownership[ClusterInfo]):
class ClusterInfoOwnership(Ownership[ClusterInfo]):
"""Determine ownership of clusters in the inventory.

This is the cluster creator (if known).
Expand All @@ -192,6 +192,16 @@ def _maybe_direct_owner(self, record: ClusterInfo) -> str | None:
return record.creator


class ClusterDetailsOwnership(Ownership[ClusterDetails]):
"""Determine ownership of clusters in the workspace.

This is the cluster creator (if known).
"""

def _maybe_direct_owner(self, record: ClusterDetails) -> str | None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

convert from ClusterDetails to ClusterInfo and don't do any renames/new classes. we have "cluster" as a concept within our system, ClusterDetails and ClusterInfo are just the names of API generated classes which are slightly different views on the cluster entity.

TLDR: don't rename existing ownership classes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return record.creator_user_name


@dataclass
class PolicyInfo:
policy_id: str
Expand Down
13 changes: 12 additions & 1 deletion src/databricks/labs/ucx/assessment/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
RunType,
SparkJarTask,
SqlTask,
Job,
)

from databricks.labs.ucx.assessment.clusters import CheckClusterMixin
Expand Down Expand Up @@ -149,7 +150,7 @@ def _check_jar_task(self, all_task: list[RunTask]) -> list[str]:
return task_failures


class JobOwnership(Ownership[JobInfo]):
class JobInfoOwnership(Ownership[JobInfo]):
"""Determine ownership of jobs (workflows) in the inventory.

This is the job creator (if known).
Expand All @@ -159,6 +160,16 @@ def _maybe_direct_owner(self, record: JobInfo) -> str | None:
return record.creator


class JobOwnership(Ownership[Job]):
"""Determine ownership of jobs (workflows) in the workspace.

This is the job creator (if known).
"""

def _maybe_direct_owner(self, record: Job) -> str | None:
return record.creator_user_name


@dataclass
class SubmitRunInfo:
run_ids: str # JSON-encoded list of run ids
Expand Down
Empty file.
174 changes: 174 additions & 0 deletions src/databricks/labs/ucx/sequencing/sequencing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
from __future__ import annotations

import itertools
from collections.abc import Iterable
from dataclasses import dataclass, field

from databricks.sdk import WorkspaceClient
from databricks.sdk.service import jobs

from databricks.labs.ucx.assessment.clusters import ClusterDetailsOwnership
from databricks.labs.ucx.assessment.jobs import JobOwnership
from databricks.labs.ucx.framework.owners import AdministratorLocator
from databricks.labs.ucx.source_code.graph import DependencyGraph


@dataclass
class MigrationStep:
step_id: int
step_number: int
object_type: str
object_id: str
object_name: str
object_owner: str
required_step_ids: list[int] = field(default_factory=list)


@dataclass
class MigrationNode:
node_id: int
object_type: str
object_id: str
object_name: str
object_owner: str
required_steps: list[MigrationNode] = field(default_factory=list)

def generate_steps(self) -> tuple[MigrationStep, Iterable[MigrationStep]]:
# traverse the nodes using a depth-first algorithm
# ultimate leaves have a step number of 1
# use highest required step number + 1 for this step
highest_step_number = 0
required_step_ids: list[int] = []
all_generated_steps: list[Iterable[MigrationStep]] = []
for required_step in self.required_steps:
step, generated_steps = required_step.generate_steps()
highest_step_number = max(highest_step_number, step.step_number)
required_step_ids.append(step.step_id)
all_generated_steps.append(generated_steps)
all_generated_steps.append([step])
this_step = MigrationStep(
step_id=self.node_id,
step_number=highest_step_number + 1,
object_type=self.object_type,
object_id=self.object_id,
object_name=self.object_name,
object_owner=self.object_owner,
required_step_ids=required_step_ids,
)
return this_step, itertools.chain(*all_generated_steps)

def find(self, object_type: str, object_id: str) -> MigrationNode | None:
if object_type == self.object_type and object_id == self.object_id:
return self
for step in self.required_steps:
found = step.find(object_type, object_id)
if found:
return found
return None


class MigrationSequencer:

def __init__(self, ws: WorkspaceClient, admin_locator: AdministratorLocator):
self._ws = ws
self._admin_locator = admin_locator
self._last_node_id = 0
self._root = MigrationNode(
node_id=0, object_type="ROOT", object_id="ROOT", object_name="ROOT", object_owner="NONE"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
node_id=0, object_type="ROOT", object_id="ROOT", object_name="ROOT", object_owner="NONE"
node_id=0, object_type="ROOT", object_id="ROOT", object_name="ROOT", object_owner="NONE",

make fmt

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make fmt doesn't change this code...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with trailing comma it should

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that line is gone anyway

)

def register_workflow_task(self, task: jobs.Task, job: jobs.Job, _graph: DependencyGraph) -> MigrationNode:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def register_workflow_task(self, task: jobs.Task, job: jobs.Job, _graph: DependencyGraph) -> MigrationNode:
def _register_workflow_task(self, task: jobs.Task, job: jobs.Job, graph: DependencyGraph) -> MigrationNode:

make this one private. we should start from the job, not the task

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the current thinking is to leverage the existing dependency graph in WorkflowLinter.refresh_report in order to avoid rebuilding it and re-fetch all assets from the workspace. Starting from the job would make that impossible.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

task_id = f"{job.job_id}/{task.task_key}"
task_node = self._find_node(object_type="TASK", object_id=task_id)
if task_node:
return task_node
job_node = self.register_workflow_job(job)
self._last_node_id += 1
task_node = MigrationNode(
node_id=self._last_node_id,
object_type="TASK",
object_id=task_id,
object_name=task.task_key,
object_owner=job_node.object_owner, # no task owner so use job one
)
job_node.required_steps.append(task_node)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this algorithm is a bit convoluted and hard to follow. please rewrite with the adjacency map and do a topological sorting with kahn's algorithm in this class. e.g.

self._adjacency = collections.defaultdict(set)

...

# have a central _nodes field to have all the nodes addressable by (TYPE, ID) tuple
self._nodes[('TASK', task_id)] = MigrationNode(
            node_id=self._last_node_id,
            object_type="TASK",
            object_id=task_id,
            object_name=task.task_key,
            object_owner=job_node.object_owner,
        )
self._adjacency[('JOB', job_id)].append(('TASK', task_id))

... and actual toposort is pretty straightforward with Kahn's algo:

indegrees = collections.defaultdict(int)
for src, dep_set in self._adjacency.items():
  indegrees[src] = len(dep_set) # count incoming dependencies for a node

queue, toposorted, sequence_num = [], [], 0
for src, incoming in indegrees.items():
  if incoming > 0: continue
  queue.append(src) # start with zero-dependencies nodes

while queue:
  curr = queue.popleft()
  toposorted.append(replace(self._nodes[curr], sequence_num=sequence_num))
  sequence_num += 1
  for dep in self._adjacency[curr]:
    indegrees[dep] -= 1
    if indegrees[dep] == 0:
      queue.append(dep)

return toposorted

i find it confusing with all those _deduplicate_steps and _find_node methods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with Kahn is it only works for DAGs, which we can't guarantee. We have duplicates and recursive loops. I'm trying it out but I suspect it will break on solacc.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where do we have recursive loops? incomplete DAGs would also be fine, as node without dependencies would alway be first

Copy link
Contributor Author

@ericvergnaud ericvergnaud Oct 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider the following code:

file A.py:

class A:
 
    def return_a_bee(self):
        from B import B
        return B()

file B.py:

from A import A

class B(A):
     pass

file T.py:

from A import A
from B import B

the above will return the following dependency graph fragment:

     T
    / \
   A    B
  /      \
 B        A

When building the dependency graph we detect the recursive cycle (and we break it).
In the above, there is no 0-dependency node, so not sure Kahn will work...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've implemented Kahn. I'll check whether it fails in PR for phase 2 (which deals with the dependency graph)

if task.existing_cluster_id:
cluster_node = self.register_cluster(task.existing_cluster_id)
cluster_node.required_steps.append(task_node)
if job_node not in cluster_node.required_steps:
cluster_node.required_steps.append(job_node)
# TODO register dependency graph
return task_node

def register_workflow_job(self, job: jobs.Job) -> MigrationNode:
job_node = self._find_node(object_type="JOB", object_id=str(job.job_id))
if job_node:
return job_node
self._last_node_id += 1
job_name = job.settings.name if job.settings and job.settings.name else str(job.job_id)
job_node = MigrationNode(
node_id=self._last_node_id,
object_type="JOB",
object_id=str(job.job_id),
object_name=job_name,
object_owner=JobOwnership(self._admin_locator).owner_of(job),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inject instance of JobOwnership via constructor, it has to be on GlobalContext

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not on GlobalContext, and I guess making that happen deserves a dedicated PR ?
Can we make this change later ?

)
top_level = True
if job.settings and job.settings.job_clusters:
for job_cluster in job.settings.job_clusters:
cluster_node = self.register_job_cluster(job_cluster)
if cluster_node:
top_level = False
cluster_node.required_steps.append(job_node)
if top_level:
self._root.required_steps.append(job_node)
return job_node

def register_job_cluster(self, cluster: jobs.JobCluster) -> MigrationNode | None:
if cluster.new_cluster:
return None
return self.register_cluster(cluster.job_cluster_key)

def register_cluster(self, cluster_key: str) -> MigrationNode:
cluster_node = self._find_node(object_type="CLUSTER", object_id=cluster_key)
if cluster_node:
return cluster_node
details = self._ws.clusters.get(cluster_key)
object_name = details.cluster_name if details and details.cluster_name else cluster_key
self._last_node_id += 1
cluster_node = MigrationNode(
node_id=self._last_node_id,
object_type="CLUSTER",
object_id=cluster_key,
object_name=object_name,
object_owner=ClusterDetailsOwnership(self._admin_locator).owner_of(details),
)
# TODO register warehouses and policies
self._root.required_steps.append(cluster_node)
return cluster_node

def generate_steps(self) -> Iterable[MigrationStep]:
_root_step, generated_steps = self._root.generate_steps()
unique_steps = self._deduplicate_steps(generated_steps)
return self._sorted_steps(unique_steps)

@staticmethod
def _sorted_steps(steps: Iterable[MigrationStep]) -> Iterable[MigrationStep]:
# sort by step number, lowest first
return sorted(steps, key=lambda step: step.step_number)

@staticmethod
def _deduplicate_steps(steps: Iterable[MigrationStep]) -> Iterable[MigrationStep]:
best_steps: dict[int, MigrationStep] = {}
for step in steps:
existing = best_steps.get(step.step_id, None)
# keep the step with the highest step number
# TODO this possibly affects the step_number of steps that depend on this one
# but it's probably OK to not be 100% accurate initially
if existing and existing.step_number >= step.step_number:
continue
best_steps[step.step_id] = step
return best_steps.values()

def _find_node(self, object_type: str, object_id: str) -> MigrationNode | None:
return self._root.find(object_type, object_id)
12 changes: 8 additions & 4 deletions tests/integration/assessment/test_clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
from databricks.labs.ucx.assessment.clusters import (
ClustersCrawler,
PoliciesCrawler,
ClusterOwnership,
ClusterDetailsOwnership,
ClusterInfoOwnership,
ClusterPolicyOwnership,
)

Expand Down Expand Up @@ -76,9 +77,12 @@ def test_cluster_ownership(ws, runtime_ctx, make_cluster, make_user, inventory_s

# Verify ownership is as expected.
administrator_locator = runtime_ctx.administrator_locator
ownership = ClusterOwnership(administrator_locator)
assert ownership.owner_of(my_cluster_record) == ws.current_user.me().user_name
assert ownership.owner_of(their_cluster_record) == another_user.user_name
info_ownership = ClusterInfoOwnership(administrator_locator)
assert info_ownership.owner_of(my_cluster_record) == ws.current_user.me().user_name
assert info_ownership.owner_of(their_cluster_record) == another_user.user_name
details_ownership = ClusterDetailsOwnership(administrator_locator)
assert details_ownership.owner_of(ws.clusters.get(my_cluster.cluster_id)) == ws.current_user.me().user_name
assert details_ownership.owner_of(ws.clusters.get(their_cluster.cluster_id)) == another_user.user_name


def test_cluster_crawler_mlr_no_isolation(ws, make_cluster, inventory_schema, sql_backend):
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/assessment/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from databricks.sdk.service.jobs import NotebookTask, RunTask
from databricks.sdk.service.workspace import ImportFormat

from databricks.labs.ucx.assessment.jobs import JobOwnership, JobsCrawler, SubmitRunsCrawler
from databricks.labs.ucx.assessment.jobs import JobInfoOwnership, JobsCrawler, SubmitRunsCrawler

from .test_assessment import _SPARK_CONF

Expand Down Expand Up @@ -80,5 +80,5 @@ def test_job_ownership(ws, runtime_ctx, make_job, inventory_schema, sql_backend)
job_record = next(record for record in records if record.job_id == str(job.job_id))

# Verify ownership is as expected.
ownership = JobOwnership(runtime_ctx.administrator_locator)
ownership = JobInfoOwnership(runtime_ctx.administrator_locator)
assert ownership.owner_of(job_record) == ws.current_user.me().user_name
34 changes: 28 additions & 6 deletions tests/unit/assessment/test_clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@
from databricks.labs.ucx.assessment.azure import AzureServicePrincipalCrawler
from databricks.labs.ucx.assessment.clusters import (
ClustersCrawler,
PoliciesCrawler,
ClusterOwnership,
ClusterDetailsOwnership,
ClusterInfoOwnership,
ClusterInfo,
ClusterPolicyOwnership,
PoliciesCrawler,
PolicyInfo,
)
from databricks.labs.ucx.framework.crawlers import SqlBackend
Expand Down Expand Up @@ -185,27 +186,48 @@ def test_unsupported_clusters():
assert result_set[0].failures == '["cluster type not supported : LEGACY_PASSTHROUGH"]'


def test_cluster_owner_creator() -> None:
def test_cluster_info_owner_creator() -> None:
admin_locator = create_autospec(AdministratorLocator)

ownership = ClusterOwnership(admin_locator)
ownership = ClusterInfoOwnership(admin_locator)
owner = ownership.owner_of(ClusterInfo(creator="bob", cluster_id="1", success=1, failures="[]"))

assert owner == "bob"
admin_locator.get_workspace_administrator.assert_not_called()


def test_cluster_owner_creator_unknown() -> None:
def test_cluster_info_owner_creator_unknown() -> None:
admin_locator = create_autospec(AdministratorLocator)
admin_locator.get_workspace_administrator.return_value = "an_admin"

ownership = ClusterOwnership(admin_locator)
ownership = ClusterInfoOwnership(admin_locator)
owner = ownership.owner_of(ClusterInfo(creator=None, cluster_id="1", success=1, failures="[]"))

assert owner == "an_admin"
admin_locator.get_workspace_administrator.assert_called_once()


def test_cluster_details_owner_creator() -> None:
admin_locator = create_autospec(AdministratorLocator)

ownership = ClusterDetailsOwnership(admin_locator)
owner = ownership.owner_of(ClusterDetails(creator_user_name="bob", cluster_id="1"))

assert owner == "bob"
admin_locator.get_workspace_administrator.assert_not_called()


def test_cluster_details_owner_creator_unknown() -> None:
admin_locator = create_autospec(AdministratorLocator)
admin_locator.get_workspace_administrator.return_value = "an_admin"

ownership = ClusterDetailsOwnership(admin_locator)
owner = ownership.owner_of(ClusterDetails(cluster_id="1"))

assert owner == "an_admin"
admin_locator.get_workspace_administrator.assert_called_once()


def test_policy_crawler():
ws = mock_workspace_client(
policy_ids=['single-user-with-spn', 'single-user-with-spn-policyid', 'single-user-with-spn-no-sparkversion'],
Expand Down
Loading
Loading