Skip to content

Commit 2552cf7

Browse files
authored
Added Assessment step to estimate the size of DBFS Root Tables. (#741)
Closes #330 - Added assessment step estimate dbfs root table size. - This should help deciding whether the table is a good candidate for cloning.
1 parent c957147 commit 2552cf7

File tree

9 files changed

+174
-5
lines changed

9 files changed

+174
-5
lines changed

src/databricks/labs/ucx/framework/crawlers.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def create_table(self, full_name: str, klass: Dataclass):
4343

4444
_builtin_type_mapping: ClassVar[dict[type, str]] = {
4545
str: "STRING",
46-
int: "INT",
46+
int: "LONG",
4747
bool: "BOOLEAN",
4848
float: "FLOAT",
4949
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import logging
2+
from collections.abc import Iterable
3+
from dataclasses import dataclass
4+
from functools import partial
5+
6+
from databricks.labs.ucx.framework.crawlers import CrawlerBase, RuntimeBackend
7+
from databricks.labs.ucx.hive_metastore import TablesCrawler
8+
9+
logger = logging.getLogger(__name__)
10+
11+
12+
@dataclass
13+
class TableSize:
14+
catalog: str
15+
database: str
16+
name: str
17+
size_in_bytes: int
18+
19+
20+
class TableSizeCrawler(CrawlerBase):
21+
def __init__(self, backend: RuntimeBackend, schema):
22+
from pyspark.sql.session import SparkSession # type: ignore[import-not-found]
23+
24+
"""
25+
Initializes a TablesSizeCrawler instance.
26+
27+
Args:
28+
backend (SqlBackend): The SQL Execution Backend abstraction (either REST API or Spark)
29+
schema: The schema name for the inventory persistence.
30+
"""
31+
self._backend: RuntimeBackend = backend
32+
super().__init__(backend, "hive_metastore", schema, "table_size", TableSize)
33+
self._tables_crawler = TablesCrawler(backend, schema)
34+
self._spark = SparkSession.builder.getOrCreate()
35+
36+
def _crawl(self) -> Iterable[TableSize]:
37+
"""Crawls and lists tables using table crawler
38+
Identifies DBFS root tables and calculates the size for these.
39+
"""
40+
for table in self._tables_crawler.snapshot():
41+
if not table.kind == "TABLE":
42+
continue
43+
if not table.is_dbfs_root():
44+
continue
45+
size_in_bytes = self.get_table_size(table.key)
46+
yield TableSize(
47+
catalog=table.catalog, database=table.database, name=table.name, size_in_bytes=size_in_bytes
48+
)
49+
50+
def _try_load(self) -> Iterable[TableSize]:
51+
"""Tries to load table information from the database or throws TABLE_OR_VIEW_NOT_FOUND error"""
52+
for row in self._fetch(f"SELECT * FROM {self._full_name}"):
53+
yield TableSize(*row)
54+
55+
def snapshot(self) -> list[TableSize]:
56+
"""
57+
Takes a snapshot of tables in the specified catalog and database.
58+
59+
Returns:
60+
list[Table]: A list of Table objects representing the snapshot of tables.
61+
"""
62+
return self._snapshot(partial(self._try_load), partial(self._crawl))
63+
64+
def get_table_size(self, table_full_name: str) -> int:
65+
logger.debug(f"Evaluating {table_full_name} table size.")
66+
return self._spark._jsparkSession.table(table_full_name).queryExecution().analyzed().stats().sizeInBytes()

src/databricks/labs/ucx/hive_metastore/tables.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import logging
22
import re
3+
import typing
34
from collections import defaultdict
45
from collections.abc import Iterable, Iterator
56
from dataclasses import dataclass
@@ -28,6 +29,18 @@ class Table:
2829

2930
storage_properties: str | None = None
3031

32+
DBFS_ROOT_PREFIXES: typing.ClassVar[list[str]] = [
33+
"/dbfs/",
34+
"dbfs:/",
35+
]
36+
37+
DBFS_ROOT_PREFIX_EXCEPTIONS: typing.ClassVar[list[str]] = [
38+
"/dbfs/mnt",
39+
"dbfs:/mnt",
40+
"/dbfs/databricks-datasets",
41+
"dbfs:/databricks-datasets",
42+
]
43+
3144
@property
3245
def is_delta(self) -> bool:
3346
if self.table_format is None:
@@ -80,6 +93,17 @@ def sql_unset_upgraded_to(self, catalog):
8093
f"UNSET TBLPROPERTIES IF EXISTS('upgraded_to');"
8194
)
8295

96+
def is_dbfs_root(self) -> bool:
97+
if not self.location:
98+
return False
99+
for exception in self.DBFS_ROOT_PREFIX_EXCEPTIONS:
100+
if self.location.startswith(exception):
101+
return False
102+
for prefix in self.DBFS_ROOT_PREFIXES:
103+
if self.location.startswith(prefix):
104+
return True
105+
return False
106+
83107

84108
@dataclass
85109
class TableError:

src/databricks/labs/ucx/install.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
from databricks.labs.ucx.hive_metastore.grants import Grant
4545
from databricks.labs.ucx.hive_metastore.hms_lineage import HiveMetastoreLineageEnabler
4646
from databricks.labs.ucx.hive_metastore.locations import ExternalLocation, Mount
47+
from databricks.labs.ucx.hive_metastore.table_size import TableSize
4748
from databricks.labs.ucx.hive_metastore.tables import Table, TableError
4849
from databricks.labs.ucx.runtime import main
4950
from databricks.labs.ucx.workspace_access.base import Permissions
@@ -130,6 +131,7 @@ def deploy_schema(sql_backend: SqlBackend, inventory_schema: str):
130131
functools.partial(table, "grants", Grant),
131132
functools.partial(table, "groups", MigratedGroup),
132133
functools.partial(table, "tables", Table),
134+
functools.partial(table, "table_size", TableSize),
133135
functools.partial(table, "table_failures", TableError),
134136
functools.partial(table, "workspace_objects", WorkspaceObjectInfo),
135137
functools.partial(table, "permissions", Permissions),
Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,31 @@
11
-- viz type=table, name=Table Types, search_by=name, columns=name,type,format,storage,is_delta,location
22
-- widget title=Table Types, row=1, col=3, size_x=3, size_y=8
3-
SELECT CONCAT(`database`, '.', name) AS name,
3+
SELECT CONCAT(tables.`database`, '.', tables.name) AS name,
44
object_type AS type,
55
table_format AS format,
66
CASE
77
WHEN STARTSWITH(location, "dbfs:/mnt") THEN "DBFS MOUNT"
88
WHEN STARTSWITH(location, "/dbfs/mnt") THEN "DBFS MOUNT"
9+
WHEN STARTSWITH(location, "dbfs:/databricks-datasets") THEN "Databricks Demo Dataset"
10+
WHEN STARTSWITH(location, "/dbfs/databricks-datasets") THEN "Databricks Demo Dataset"
911
WHEN STARTSWITH(location, "dbfs:/") THEN "DBFS ROOT"
1012
WHEN STARTSWITH(location, "/dbfs/") THEN "DBFS ROOT"
1113
WHEN STARTSWITH(location, "wasb") THEN "UNSUPPORTED"
1214
WHEN STARTSWITH(location, "adl") THEN "UNSUPPORTED"
1315
ELSE "EXTERNAL"
1416
END AS storage,
1517
IF(format = "DELTA", "Yes", "No") AS is_delta,
16-
location
17-
FROM $inventory.tables
18+
location,
19+
CASE
20+
WHEN size_in_bytes IS null THEN "Non DBFS Root"
21+
WHEN size_in_bytes > 10000000000000000 THEN "SIZE OUT OF RANGE"
22+
WHEN size_in_bytes < 100 THEN CONCAT(CAST(size_in_bytes AS string)," Bytes")
23+
WHEN size_in_bytes < 100000 THEN CONCAT(CAST(round(size_in_bytes/1024,2) AS string),"KB")
24+
WHEN size_in_bytes < 100000000 THEN CONCAT(CAST(round(size_in_bytes/1024/1024,2) AS string),"MB")
25+
WHEN size_in_bytes < 100000000000 THEN CONCAT(CAST(round(size_in_bytes/1024/1024/1024,2) AS string),"GB")
26+
ELSE CONCAT(CAST(round(size_in_bytes/1024/1024/1024/1024,2) AS string),"TB")
27+
END AS table_size
28+
FROM $inventory.tables left outer join $inventory.table_size on
29+
$inventory.tables.catalog = $inventory.table_size.catalog and
30+
$inventory.tables.database = $inventory.table_size.database and
31+
$inventory.tables.name = $inventory.table_size.name

src/databricks/labs/ucx/runtime.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
Mounts,
1919
TablesCrawler,
2020
)
21+
from databricks.labs.ucx.hive_metastore.table_size import TableSizeCrawler
2122
from databricks.labs.ucx.workspace_access.generic import WorkspaceListing
2223
from databricks.labs.ucx.workspace_access.groups import GroupManager
2324
from databricks.labs.ucx.workspace_access.manager import PermissionManager
@@ -55,6 +56,17 @@ def crawl_grants(cfg: WorkspaceConfig):
5556
grants.snapshot()
5657

5758

59+
@task("assessment", depends_on=[crawl_tables])
60+
def estimate_table_size_for_migration(cfg: WorkspaceConfig):
61+
"""Scans the previously created Delta table named `$inventory_database.tables` and locate tables that cannot be
62+
"synced". These tables will have to be cloned in the migration process.
63+
Assesses the size of these tables and create `$inventory_database.table_size` table to list these sizes.
64+
The table size is a factor in deciding whether to clone these tables."""
65+
backend = RuntimeBackend()
66+
table_size = TableSizeCrawler(backend, cfg.inventory_database)
67+
table_size.snapshot()
68+
69+
5870
@task("assessment")
5971
def crawl_mounts(cfg: WorkspaceConfig):
6072
"""Defines the scope of the _mount points_ intended for migration into Unity Catalog. As these objects are not

tests/unit/framework/mocks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ def fetch(self, sql) -> Iterator[Row]:
3535
if self._rows:
3636
for pattern in self._rows.keys():
3737
r = re.compile(pattern)
38-
if r.match(sql):
38+
if r.search(sql):
3939
logger.debug(f"Found match: {sql}")
4040
rows.extend(self._rows[pattern])
4141
logger.debug(f"Returning rows: {rows}")
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import sys
2+
3+
from databricks.labs.ucx.hive_metastore.table_size import TableSize, TableSizeCrawler
4+
from tests.unit.framework.mocks import MockBackend
5+
6+
7+
class SparkSession:
8+
pass
9+
10+
11+
def test_table_size_crawler(mocker):
12+
errors = {}
13+
rows = {
14+
"table_size": [],
15+
"hive_metastore.inventory_database.tables": [
16+
("hive_metastore", "db1", "table1", "MANAGED", "DELTA", "dbfs:/location/table", None),
17+
("hive_metastore", "db1", "table2", "MANAGED", "DELTA", "/dbfs/location/table", None),
18+
("hive_metastore", "db1", "table3", "MANAGED", "DELTA", "dbfs:/mnt/location/table", None),
19+
("hive_metastore", "db1", "table4", "MANAGED", "DELTA", "s3:/location/table", None),
20+
("hive_metastore", "db1", "table5", "MANAGED", "DELTA", "/dbfs/mnt/location/table", None),
21+
("hive_metastore", "db1", "table6", "MANAGED", "DELTA", "/dbfs/databricks-datasets/location/table", None),
22+
("hive_metastore", "db1", "table7", "MANAGED", "DELTA", "dbfs:/databricks-datasets/location/table", None),
23+
("hive_metastore", "db1", "table8", "MANAGED", "DELTA", "/databricks-datasets/location/table", None),
24+
("hive_metastore", "db1", "view", "VIEW", "DELTA", None, "SELECT * FROM TABLE"),
25+
],
26+
"SHOW DATABASES": [("db1",)],
27+
}
28+
backend = MockBackend(fails_on_first=errors, rows=rows)
29+
pyspark_sql_session = mocker.Mock()
30+
sys.modules["pyspark.sql.session"] = pyspark_sql_session
31+
tsc = TableSizeCrawler(backend, "inventory_database")
32+
tsc._spark._jsparkSession.table().queryExecution().analyzed().stats().sizeInBytes.side_effect = [100, 200, 300]
33+
results = tsc.snapshot()
34+
assert len(results) == 2
35+
assert TableSize("hive_metastore", "db1", "table1", 100) in results
36+
assert TableSize("hive_metastore", "db1", "table2", 200) in results

tests/unit/hive_metastore/test_tables.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,3 +107,18 @@ def test_tables_returning_error_when_describing():
107107
tc = TablesCrawler(backend, "default")
108108
results = tc._crawl()
109109
assert len(results) == 1
110+
111+
112+
def test_is_dbfs_root():
113+
assert Table("a", "b", "c", "MANAGED", "DELTA", location="dbfs:/somelocation/tablename").is_dbfs_root()
114+
assert Table("a", "b", "c", "MANAGED", "DELTA", location="/dbfs/somelocation/tablename").is_dbfs_root()
115+
assert not Table("a", "b", "c", "MANAGED", "DELTA", location="dbfs:/mnt/somelocation/tablename").is_dbfs_root()
116+
assert not Table("a", "b", "c", "MANAGED", "DELTA", location="/dbfs/mnt/somelocation/tablename").is_dbfs_root()
117+
assert not Table(
118+
"a", "b", "c", "MANAGED", "DELTA", location="dbfs:/databricks-datasets/somelocation/tablename"
119+
).is_dbfs_root()
120+
assert not Table(
121+
"a", "b", "c", "MANAGED", "DELTA", location="/dbfs/databricks-datasets/somelocation/tablename"
122+
).is_dbfs_root()
123+
assert not Table("a", "b", "c", "MANAGED", "DELTA", location="s3:/somelocation/tablename").is_dbfs_root()
124+
assert not Table("a", "b", "c", "MANAGED", "DELTA", location="adls:/somelocation/tablename").is_dbfs_root()

0 commit comments

Comments
 (0)