diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 62246637c1..6fc0fc04c8 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -1,14 +1,15 @@ import json -from collections.abc import Callable, Generator import functools import collections import os import logging +import shutil +import subprocess +from collections.abc import Callable, Generator from dataclasses import replace from datetime import timedelta from functools import cached_property -import shutil -import subprocess +from typing import Literal import pytest # pylint: disable=wrong-import-order from databricks.labs.blueprint.commands import CommandExecutor @@ -1222,42 +1223,49 @@ def prepare_regular_tables(context, external_csv, schema) -> dict[str, TableInfo @pytest.fixture -def prepare_tables_for_migration( - ws, installation_ctx, make_catalog, make_random, make_mounted_location, env_or_skip, make_storage_dir, request -) -> tuple[dict[str, TableInfo], SchemaInfo]: - # Here we use pytest indirect parametrization, so the test function can pass arguments to this fixture and the - # arguments will be available in the request.param. If the argument is "hiveserde", we will prepare hiveserde - # tables, otherwise we will prepare regular tables. - # see documents here for details https://docs.pytest.org/en/8.1.x/example/parametrize.html#indirect-parametrization - scenario = request.param - is_hiveserde = scenario == "hiveserde" - random = make_random(5).lower() - # create external and managed tables to be migrated - if scenario == "hiveserde": - schema = installation_ctx.make_schema(catalog_name="hive_metastore", name=f"hiveserde_in_place_{random}") - table_base_dir = make_storage_dir( - path=f'dbfs:/mnt/{env_or_skip("TEST_MOUNT_NAME")}/a/hiveserde_in_place_{random}' - ) - tables = prepare_hiveserde_tables(installation_ctx, random, schema, table_base_dir) - elif scenario == "managed": - schema_name = f"managed_{random}" - schema_location = f'dbfs:/mnt/{env_or_skip("TEST_MOUNT_NAME")}/a/managed_{random}' - schema = installation_ctx.make_schema(catalog_name="hive_metastore", name=schema_name, location=schema_location) - tables = prepare_regular_tables(installation_ctx, make_mounted_location, schema) - elif scenario == "regular": - schema = installation_ctx.make_schema(catalog_name="hive_metastore", name=f"migrate_{random}") - tables = prepare_regular_tables(installation_ctx, make_mounted_location, schema) - - # create destination catalog and schema - dst_catalog = make_catalog() - dst_schema = installation_ctx.make_schema(catalog_name=dst_catalog.name, name=schema.name) - migrate_rules = [Rule.from_src_dst(table, dst_schema) for _, table in tables.items()] - installation_ctx.with_table_mapping_rules(migrate_rules) - installation_ctx.with_dummy_resource_permission() - installation_ctx.save_tables(is_hiveserde=is_hiveserde) - installation_ctx.save_mounts() - installation_ctx.with_dummy_grants_and_tacls() - return tables, dst_schema +def make_table_migration_context( + env_or_skip, + make_random, + make_mounted_location, + make_storage_dir, +) -> Callable[ + [Literal["hiveserde", "managed", "regular"], MockInstallationContext], tuple[dict[str, TableInfo], SchemaInfo] +]: + + def prepare( + scenario: Literal["hiveserde", "managed", "regular"], ctx: MockInstallationContext + ) -> tuple[dict[str, TableInfo], SchemaInfo]: + random = make_random(5).lower() + # create external and managed tables to be migrated + if scenario == "hiveserde": + schema = ctx.make_schema(catalog_name="hive_metastore", name=f"hiveserde_in_place_{random}") + table_base_dir = make_storage_dir( + path=f'dbfs:/mnt/{env_or_skip("TEST_MOUNT_NAME")}/a/hiveserde_in_place_{random}' + ) + tables = prepare_hiveserde_tables(ctx, random, schema, table_base_dir) + elif scenario == "managed": + schema_name = f"managed_{random}" + schema_location = f'dbfs:/mnt/{env_or_skip("TEST_MOUNT_NAME")}/a/managed_{random}' + schema = ctx.make_schema(catalog_name="hive_metastore", name=schema_name, location=schema_location) + tables = prepare_regular_tables(ctx, make_mounted_location, schema) + elif scenario == "regular": + schema = ctx.make_schema(catalog_name="hive_metastore", name=f"migrate_{random}") + tables = prepare_regular_tables(ctx, make_mounted_location, schema) + else: + raise ValueError(f"Unsupported scenario {scenario}") + + # create destination catalog and schema + dst_catalog = ctx.make_catalog() + dst_schema = ctx.make_schema(catalog_name=dst_catalog.name, name=schema.name) + migrate_rules = [Rule.from_src_dst(table, dst_schema) for _, table in tables.items()] + ctx.with_table_mapping_rules(migrate_rules) + ctx.with_dummy_resource_permission() + ctx.save_tables(is_hiveserde=scenario == "hiveserde") + ctx.save_mounts() + ctx.with_dummy_grants_and_tacls() + return tables, dst_schema + + return prepare @pytest.fixture diff --git a/tests/integration/hive_metastore/test_ext_hms.py b/tests/integration/hive_metastore/test_ext_hms.py index cc6f19bbbc..d10c9a13cd 100644 --- a/tests/integration/hive_metastore/test_ext_hms.py +++ b/tests/integration/hive_metastore/test_ext_hms.py @@ -23,13 +23,13 @@ def sql_backend(ws, env_or_skip) -> SqlBackend: @retried(on=[NotFound, InvalidParameterValue], timeout=timedelta(minutes=5)) -@pytest.mark.parametrize('prepare_tables_for_migration', ['regular'], indirect=True) -def test_migration_job_ext_hms(ws, installation_ctx, prepare_tables_for_migration, env_or_skip): +def test_migration_job_ext_hms(ws, installation_ctx, make_table_migration_context, env_or_skip) -> None: + tables, dst_schema = make_table_migration_context("regular", installation_ctx) ext_hms_cluster_id = env_or_skip("TEST_EXT_HMS_CLUSTER_ID") - tables, dst_schema = prepare_tables_for_migration ext_hms_ctx = installation_ctx.replace( config_transform=lambda wc: dataclasses.replace( wc, + skip_tacl_migration=True, override_clusters={ "main": ext_hms_cluster_id, "user_isolation": ext_hms_cluster_id, @@ -45,18 +45,21 @@ def test_migration_job_ext_hms(ws, installation_ctx, prepare_tables_for_migratio r"Choose a cluster policy": "0", }, ) - ext_hms_ctx.workspace_installation.run() - ext_hms_ctx.deployed_workflows.run_workflow("migrate-tables") + + ext_hms_ctx.deployed_workflows.run_workflow("migrate-tables", skip_job_wait=True) + # assert the workflow is successful assert ext_hms_ctx.deployed_workflows.validate_step("migrate-tables") # assert the tables are migrated + missing_tables = set[str]() for table in tables.values(): - try: - assert ws.tables.get(f"{dst_schema.catalog_name}.{dst_schema.name}.{table.name}").name - except NotFound: - assert False, f"{table.name} not found in {dst_schema.catalog_name}.{dst_schema.name}" + migrated_table_name = f"{dst_schema.catalog_name}.{dst_schema.name}.{table.name}" + if not ext_hms_ctx.workspace_client.tables.exists(migrated_table_name): + missing_tables.add(migrated_table_name) + assert not missing_tables, f"Missing migrated tables: {missing_tables}" + # assert the cluster is configured correctly with ext hms install_state = ext_hms_ctx.installation.load(RawState) for job_cluster in ws.jobs.get(install_state.resources["jobs"]["migrate-tables"]).settings.job_clusters: diff --git a/tests/integration/hive_metastore/test_workflows.py b/tests/integration/hive_metastore/test_workflows.py index ef124ace22..6f065134f3 100644 --- a/tests/integration/hive_metastore/test_workflows.py +++ b/tests/integration/hive_metastore/test_workflows.py @@ -1,35 +1,43 @@ +import dataclasses +from typing import Literal + import pytest -from databricks.sdk.errors import NotFound +from databricks.labs.lsql.core import Row from databricks.labs.ucx.framework.utils import escape_sql_identifier from databricks.labs.ucx.hive_metastore.tables import Table @pytest.mark.parametrize( - "prepare_tables_for_migration,workflow", + "scenario, workflow", [ ("regular", "migrate-tables"), ("hiveserde", "migrate-external-hiveserde-tables-in-place-experimental"), ("hiveserde", "migrate-external-tables-ctas"), ], - indirect=("prepare_tables_for_migration",), ) def test_table_migration_job_refreshes_migration_status( - ws, installation_ctx, - prepare_tables_for_migration, - workflow, -): + scenario: Literal["regular", "hiveserde"], + workflow: str, + make_table_migration_context, +) -> None: """The migration status should be refreshed after the migration job.""" - tables, _ = prepare_tables_for_migration + tables, _ = make_table_migration_context(scenario, installation_ctx) ctx = installation_ctx.replace( + config_transform=lambda wc: dataclasses.replace( + wc, + skip_tacl_migration=True, + ), extend_prompts={ r".*Do you want to update the existing installation?.*": 'yes', }, ) ctx.workspace_installation.run() - ctx.deployed_workflows.run_workflow(workflow) + ctx.deployed_workflows.run_workflow(workflow, skip_job_wait=True) + + assert ctx.deployed_workflows.validate_step(workflow) # Avoiding MigrationStatusRefresh as it will refresh the status before fetching migration_status_query = f"SELECT * FROM {ctx.config.inventory_database}.migration_status" @@ -62,17 +70,13 @@ def test_table_migration_job_refreshes_migration_status( assert len(asserts) == 0, assert_message -@pytest.mark.parametrize( - "prepare_tables_for_migration,workflow", - [ - ("managed", "migrate-tables"), - ], - indirect=("prepare_tables_for_migration",), -) -def test_table_migration_for_managed_table(ws, installation_ctx, prepare_tables_for_migration, workflow, sql_backend): - # This test cases test the CONVERT_TO_EXTERNAL scenario. - tables, dst_schema = prepare_tables_for_migration +def test_table_migration_convert_manged_to_external(installation_ctx, make_table_migration_context) -> None: + tables, dst_schema = make_table_migration_context("managed", installation_ctx) ctx = installation_ctx.replace( + config_transform=lambda wc: dataclasses.replace( + wc, + skip_tacl_migration=True, + ), extend_prompts={ r"If hive_metastore contains managed table with external.*": "0", r".*Do you want to update the existing installation?.*": 'yes', @@ -80,67 +84,62 @@ def test_table_migration_for_managed_table(ws, installation_ctx, prepare_tables_ ) ctx.workspace_installation.run() - ctx.deployed_workflows.run_workflow(workflow) + ctx.deployed_workflows.run_workflow("migrate-tables", skip_job_wait=True) + + assert ctx.deployed_workflows.validate_step("migrate-tables") + missing_tables = set[str]() for table in tables.values(): - try: - assert ws.tables.get(f"{dst_schema.catalog_name}.{dst_schema.name}.{table.name}").name - except NotFound: - assert False, f"{table.name} not found in {dst_schema.catalog_name}.{dst_schema.name}" - managed_table = tables["src_managed_table"] + migrated_table_name = f"{dst_schema.catalog_name}.{dst_schema.name}.{table.name}" + if not ctx.workspace_client.tables.exists(migrated_table_name): + missing_tables.add(migrated_table_name) + assert not missing_tables, f"Missing migrated tables: {missing_tables}" - for key, value, _ in sql_backend.fetch(f"DESCRIBE TABLE EXTENDED {escape_sql_identifier(managed_table.full_name)}"): + managed_table = tables["src_managed_table"] + for key, value, _ in ctx.sql_backend.fetch( + f"DESCRIBE TABLE EXTENDED {escape_sql_identifier(managed_table.full_name)}" + ): if key == "Type": assert value == "EXTERNAL" break -@pytest.mark.parametrize('prepare_tables_for_migration', [('hiveserde')], indirect=True) -def test_hiveserde_table_in_place_migration_job(ws, installation_ctx, prepare_tables_for_migration): - tables, dst_schema = prepare_tables_for_migration +@pytest.mark.parametrize( + "workflow", ["migrate-external-hiveserde-tables-in-place-experimental", "migrate-external-tables-ctas"] +) +def test_hiveserde_table_in_place_migration_job(installation_ctx, make_table_migration_context, workflow) -> None: + tables, dst_schema = make_table_migration_context("hiveserde", installation_ctx) ctx = installation_ctx.replace( + config_transform=lambda wc: dataclasses.replace( + wc, + skip_tacl_migration=True, + ), extend_prompts={ r".*Do you want to update the existing installation?.*": 'yes', }, ) ctx.workspace_installation.run() - ctx.deployed_workflows.run_workflow("migrate-external-hiveserde-tables-in-place-experimental") - # assert the workflow is successful - assert ctx.deployed_workflows.validate_step("migrate-external-hiveserde-tables-in-place-experimental") - # assert the tables are migrated + + ctx.deployed_workflows.run_workflow(workflow, skip_job_wait=True) + + assert installation_ctx.deployed_workflows.validate_step(workflow), f"Workflow failed: {workflow}" + missing_tables = set[str]() for table in tables.values(): - try: - assert ws.tables.get(f"{dst_schema.catalog_name}.{dst_schema.name}.{table.name}").name - except NotFound: - assert False, f"{table.name} not found in {dst_schema.catalog_name}.{dst_schema.name}" + migrated_table_name = f"{dst_schema.catalog_name}.{dst_schema.name}.{table.name}" + if not ctx.workspace_client.tables.exists(migrated_table_name): + missing_tables.add(migrated_table_name) + assert not missing_tables, f"Missing migrated tables: {missing_tables}" -@pytest.mark.parametrize('prepare_tables_for_migration', [('hiveserde')], indirect=True) -def test_hiveserde_table_ctas_migration_job(ws, installation_ctx, prepare_tables_for_migration): - tables, dst_schema = prepare_tables_for_migration +def test_table_migration_job_publishes_remaining_tables(installation_ctx, make_table_migration_context) -> None: + tables, dst_schema = make_table_migration_context("regular", installation_ctx) ctx = installation_ctx.replace( - extend_prompts={ - r".*Do you want to update the existing installation?.*": 'yes', - }, + config_transform=lambda wc: dataclasses.replace( + wc, + skip_tacl_migration=True, + ), ) ctx.workspace_installation.run() - ctx.deployed_workflows.run_workflow("migrate-external-tables-ctas") - # assert the workflow is successful - assert ctx.deployed_workflows.validate_step("migrate-external-tables-ctas") - # assert the tables are migrated - for table in tables.values(): - try: - assert ws.tables.get(f"{dst_schema.catalog_name}.{dst_schema.name}.{table.name}").name - except NotFound: - assert False, f"{table.name} not found in {dst_schema.catalog_name}.{dst_schema.name}" - - -@pytest.mark.parametrize('prepare_tables_for_migration', ['regular'], indirect=True) -def test_table_migration_job_publishes_remaining_tables( - ws, installation_ctx, sql_backend, prepare_tables_for_migration, caplog -): - tables, dst_schema = prepare_tables_for_migration - installation_ctx.workspace_installation.run() second_table = list(tables.values())[1] table = Table( "hive_metastore", @@ -149,19 +148,20 @@ def test_table_migration_job_publishes_remaining_tables( object_type="UNKNOWN", table_format="UNKNOWN", ) - installation_ctx.table_mapping.skip_table_or_view(dst_schema.name, second_table.name, load_table=lambda *_: table) - installation_ctx.deployed_workflows.run_workflow("migrate-tables") - assert installation_ctx.deployed_workflows.validate_step("migrate-tables") + ctx.table_mapping.skip_table_or_view(dst_schema.name, second_table.name, load_table=lambda *_: table) + + ctx.deployed_workflows.run_workflow("migrate-tables", skip_job_wait=True) + assert ctx.deployed_workflows.validate_step("migrate-tables") remaining_tables = list( - sql_backend.fetch( + ctx.sql_backend.fetch( f""" SELECT SUBSTRING(message, LENGTH('remained-hive-metastore-table: ') + 1) AS message - FROM {installation_ctx.inventory_database}.logs + FROM {ctx.inventory_database}.logs WHERE message LIKE 'remained-hive-metastore-table: %' """ ) ) - assert remaining_tables[0].message == f'hive_metastore.{dst_schema.name}.{second_table.name}' + assert remaining_tables == [Row(message=f"hive_metastore.{dst_schema.name}.{second_table.name}")]