-
Notifications
You must be signed in to change notification settings - Fork 96
Scoping "Seen Tables". Improving table scan performance. #3741
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
d7e9ac4
923f4da
525c9b6
519112c
d13ba47
53b99dd
83c8dae
4b720b9
5c1e4c4
4252fba
4945f9a
ef60201
25727d5
d1de543
543b6ec
817b784
c51ee78
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,16 +2,18 @@ | |
import logging | ||
from dataclasses import dataclass, replace | ||
from collections.abc import Iterable, KeysView | ||
from functools import partial | ||
from typing import ClassVar | ||
|
||
from databricks.labs.blueprint.parallel import Threads | ||
from databricks.labs.lsql.backends import SqlBackend | ||
from databricks.sdk import WorkspaceClient | ||
from databricks.sdk.errors import DatabricksError, NotFound | ||
from databricks.sdk.service.catalog import CatalogInfo, CatalogInfoSecurableKind, SchemaInfo | ||
from databricks.sdk.service.catalog import CatalogInfo, CatalogInfoSecurableKind, SchemaInfo, TableInfo, CatalogType | ||
|
||
from databricks.labs.ucx.framework.crawlers import CrawlerBase | ||
from databricks.labs.ucx.framework.utils import escape_sql_identifier | ||
from databricks.labs.ucx.hive_metastore.tables import TablesCrawler | ||
from databricks.labs.ucx.hive_metastore.tables import TablesCrawler, Table | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
@@ -84,6 +86,8 @@ class TableMigrationStatusRefresher(CrawlerBase[TableMigrationStatus]): | |
CatalogInfoSecurableKind.CATALOG_SYSTEM, | ||
] | ||
|
||
API_LIMIT: ClassVar[int] = 25 | ||
|
||
def __init__(self, ws: WorkspaceClient, sql_backend: SqlBackend, schema, tables_crawler: TablesCrawler): | ||
super().__init__(sql_backend, "hive_metastore", schema, "migration_status", TableMigrationStatus) | ||
self._ws = ws | ||
|
@@ -92,29 +96,45 @@ def __init__(self, ws: WorkspaceClient, sql_backend: SqlBackend, schema, tables_ | |
def index(self, *, force_refresh: bool = False) -> TableMigrationIndex: | ||
return TableMigrationIndex(self.snapshot(force_refresh=force_refresh)) | ||
|
||
def get_seen_tables(self) -> dict[str, str]: | ||
def get_seen_tables(self, *, scope: set[Table] | None = None) -> dict[str, str]: | ||
seen_tables: dict[str, str] = {} | ||
tasks = [] | ||
schema_scope = {(table.catalog.lower(), table.database.lower()) for table in scope} if scope else None | ||
table_scope = {table.full_name.lower() for table in scope} if scope else None | ||
for schema in self._iter_schemas(): | ||
if schema.catalog_name is None or schema.name is None: | ||
if ( | ||
schema.catalog_name is None | ||
or schema.name is None | ||
or schema.name == "information_schema" | ||
or (schema_scope and (schema.catalog_name.lower(), schema.name.lower()) not in schema_scope) | ||
): | ||
continue | ||
try: | ||
# ws.tables.list returns Iterator[TableInfo], so we need to convert it to a list in order to catch the exception | ||
tables = list(self._ws.tables.list(catalog_name=schema.catalog_name, schema_name=schema.name)) | ||
except NotFound: | ||
logger.warning(f"Schema {schema.full_name} no longer exists. Skipping checking its migration status.") | ||
tasks.append( | ||
partial( | ||
self._iter_tables, | ||
schema.catalog_name, | ||
schema.name, | ||
) | ||
) | ||
tables: list = [] | ||
logger.info(f"Scanning {len(tasks)} schemas for tables") | ||
table_lists = Threads.gather("list tables", tasks, self.API_LIMIT) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is not an API limit, but the number of threads |
||
# Combine tuple of lists to a list | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use |
||
for table_list in table_lists[0]: | ||
tables.extend(table_list) | ||
for table in tables: | ||
if not isinstance(table, TableInfo): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this supposed to happen? It should not based on the type hinting, right? |
||
logger.warning(f"Table {table} is not an instance of TableInfo") | ||
continue | ||
except DatabricksError as e: | ||
logger.warning(f"Error while listing tables in schema: {schema.full_name}", exc_info=e) | ||
if not table.full_name: | ||
logger.warning(f"The table {table.name} in {table.schema_name} has no full name") | ||
continue | ||
for table in tables: | ||
if not table.properties: | ||
continue | ||
if "upgraded_from" not in table.properties: | ||
continue | ||
if not table.full_name: | ||
logger.warning(f"The table {table.name} in {schema.name} has no full name") | ||
continue | ||
seen_tables[table.full_name.lower()] = table.properties["upgraded_from"].lower() | ||
if table_scope and table.full_name.lower() not in table_scope: | ||
continue | ||
if not table.properties or "upgraded_from" not in table.properties: | ||
continue | ||
|
||
seen_tables[table.full_name.lower()] = table.properties["upgraded_from"].lower() | ||
return seen_tables | ||
|
||
def is_migrated(self, schema: str, table: str) -> bool: | ||
|
@@ -171,7 +191,10 @@ def _iter_catalogs(self) -> Iterable[CatalogInfo]: | |
|
||
def _iter_schemas(self) -> Iterable[SchemaInfo]: | ||
for catalog in self._iter_catalogs(): | ||
if catalog.name is None: | ||
if catalog.name is None or catalog.catalog_type in ( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This has probably the same effect as the filter on line 186. If not, move the filter there so that filtering catalogs happens inside the |
||
CatalogType.DELTASHARING_CATALOG, | ||
CatalogType.SYSTEM_CATALOG, | ||
): | ||
continue | ||
try: | ||
yield from self._ws.schemas.list(catalog_name=catalog.name) | ||
|
@@ -181,3 +204,16 @@ def _iter_schemas(self) -> Iterable[SchemaInfo]: | |
except DatabricksError as e: | ||
logger.warning(f"Error while listing schemas in catalog: {catalog.name}", exc_info=e) | ||
continue | ||
|
||
def _iter_tables(self, catalog_name: str, schema_name: str) -> list[TableInfo]: | ||
try: | ||
# ws.tables.list returns Iterator[TableInfo], so we need to convert it to a list in order to catch the exception | ||
return list(self._ws.tables.list(catalog_name=catalog_name, schema_name=schema_name)) | ||
except NotFound: | ||
logger.warning( | ||
f"Schema {catalog_name}.{schema_name} no longer exists. Skipping checking its migration status." | ||
) | ||
return [] | ||
except DatabricksError as e: | ||
logger.warning(f"Error while listing tables in schema: {schema_name}", exc_info=e) | ||
return [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this into
_iter_schema