From e3f79c3ab317256c1345f4da74cd4c3f92e6d036 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 13 Dec 2024 08:24:14 +0100 Subject: [PATCH 01/10] Rewrite prepare_tables_for_migration fixture --- tests/integration/conftest.py | 86 ++++++++++--------- .../hive_metastore/test_ext_hms.py | 5 +- .../hive_metastore/test_workflows.py | 58 ++++++------- .../workspace_access/test_workflows.py | 2 +- 4 files changed, 75 insertions(+), 76 deletions(-) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 62246637c1..6fc0fc04c8 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -1,14 +1,15 @@ import json -from collections.abc import Callable, Generator import functools import collections import os import logging +import shutil +import subprocess +from collections.abc import Callable, Generator from dataclasses import replace from datetime import timedelta from functools import cached_property -import shutil -import subprocess +from typing import Literal import pytest # pylint: disable=wrong-import-order from databricks.labs.blueprint.commands import CommandExecutor @@ -1222,42 +1223,49 @@ def prepare_regular_tables(context, external_csv, schema) -> dict[str, TableInfo @pytest.fixture -def prepare_tables_for_migration( - ws, installation_ctx, make_catalog, make_random, make_mounted_location, env_or_skip, make_storage_dir, request -) -> tuple[dict[str, TableInfo], SchemaInfo]: - # Here we use pytest indirect parametrization, so the test function can pass arguments to this fixture and the - # arguments will be available in the request.param. If the argument is "hiveserde", we will prepare hiveserde - # tables, otherwise we will prepare regular tables. - # see documents here for details https://docs.pytest.org/en/8.1.x/example/parametrize.html#indirect-parametrization - scenario = request.param - is_hiveserde = scenario == "hiveserde" - random = make_random(5).lower() - # create external and managed tables to be migrated - if scenario == "hiveserde": - schema = installation_ctx.make_schema(catalog_name="hive_metastore", name=f"hiveserde_in_place_{random}") - table_base_dir = make_storage_dir( - path=f'dbfs:/mnt/{env_or_skip("TEST_MOUNT_NAME")}/a/hiveserde_in_place_{random}' - ) - tables = prepare_hiveserde_tables(installation_ctx, random, schema, table_base_dir) - elif scenario == "managed": - schema_name = f"managed_{random}" - schema_location = f'dbfs:/mnt/{env_or_skip("TEST_MOUNT_NAME")}/a/managed_{random}' - schema = installation_ctx.make_schema(catalog_name="hive_metastore", name=schema_name, location=schema_location) - tables = prepare_regular_tables(installation_ctx, make_mounted_location, schema) - elif scenario == "regular": - schema = installation_ctx.make_schema(catalog_name="hive_metastore", name=f"migrate_{random}") - tables = prepare_regular_tables(installation_ctx, make_mounted_location, schema) - - # create destination catalog and schema - dst_catalog = make_catalog() - dst_schema = installation_ctx.make_schema(catalog_name=dst_catalog.name, name=schema.name) - migrate_rules = [Rule.from_src_dst(table, dst_schema) for _, table in tables.items()] - installation_ctx.with_table_mapping_rules(migrate_rules) - installation_ctx.with_dummy_resource_permission() - installation_ctx.save_tables(is_hiveserde=is_hiveserde) - installation_ctx.save_mounts() - installation_ctx.with_dummy_grants_and_tacls() - return tables, dst_schema +def make_table_migration_context( + env_or_skip, + make_random, + make_mounted_location, + make_storage_dir, +) -> Callable[ + [Literal["hiveserde", "managed", "regular"], MockInstallationContext], tuple[dict[str, TableInfo], SchemaInfo] +]: + + def prepare( + scenario: Literal["hiveserde", "managed", "regular"], ctx: MockInstallationContext + ) -> tuple[dict[str, TableInfo], SchemaInfo]: + random = make_random(5).lower() + # create external and managed tables to be migrated + if scenario == "hiveserde": + schema = ctx.make_schema(catalog_name="hive_metastore", name=f"hiveserde_in_place_{random}") + table_base_dir = make_storage_dir( + path=f'dbfs:/mnt/{env_or_skip("TEST_MOUNT_NAME")}/a/hiveserde_in_place_{random}' + ) + tables = prepare_hiveserde_tables(ctx, random, schema, table_base_dir) + elif scenario == "managed": + schema_name = f"managed_{random}" + schema_location = f'dbfs:/mnt/{env_or_skip("TEST_MOUNT_NAME")}/a/managed_{random}' + schema = ctx.make_schema(catalog_name="hive_metastore", name=schema_name, location=schema_location) + tables = prepare_regular_tables(ctx, make_mounted_location, schema) + elif scenario == "regular": + schema = ctx.make_schema(catalog_name="hive_metastore", name=f"migrate_{random}") + tables = prepare_regular_tables(ctx, make_mounted_location, schema) + else: + raise ValueError(f"Unsupported scenario {scenario}") + + # create destination catalog and schema + dst_catalog = ctx.make_catalog() + dst_schema = ctx.make_schema(catalog_name=dst_catalog.name, name=schema.name) + migrate_rules = [Rule.from_src_dst(table, dst_schema) for _, table in tables.items()] + ctx.with_table_mapping_rules(migrate_rules) + ctx.with_dummy_resource_permission() + ctx.save_tables(is_hiveserde=scenario == "hiveserde") + ctx.save_mounts() + ctx.with_dummy_grants_and_tacls() + return tables, dst_schema + + return prepare @pytest.fixture diff --git a/tests/integration/hive_metastore/test_ext_hms.py b/tests/integration/hive_metastore/test_ext_hms.py index cc6f19bbbc..7d1b88e71e 100644 --- a/tests/integration/hive_metastore/test_ext_hms.py +++ b/tests/integration/hive_metastore/test_ext_hms.py @@ -23,10 +23,9 @@ def sql_backend(ws, env_or_skip) -> SqlBackend: @retried(on=[NotFound, InvalidParameterValue], timeout=timedelta(minutes=5)) -@pytest.mark.parametrize('prepare_tables_for_migration', ['regular'], indirect=True) -def test_migration_job_ext_hms(ws, installation_ctx, prepare_tables_for_migration, env_or_skip): +def test_migration_job_ext_hms(ws, installation_ctx, make_table_migration_context, env_or_skip) -> None: + tables, dst_schema = make_table_migration_context('regular', installation_ctx) ext_hms_cluster_id = env_or_skip("TEST_EXT_HMS_CLUSTER_ID") - tables, dst_schema = prepare_tables_for_migration ext_hms_ctx = installation_ctx.replace( config_transform=lambda wc: dataclasses.replace( wc, diff --git a/tests/integration/hive_metastore/test_workflows.py b/tests/integration/hive_metastore/test_workflows.py index ef124ace22..6292a2cd0f 100644 --- a/tests/integration/hive_metastore/test_workflows.py +++ b/tests/integration/hive_metastore/test_workflows.py @@ -1,3 +1,6 @@ +import dataclasses +from typing import Literal + import pytest from databricks.sdk.errors import NotFound @@ -6,22 +9,21 @@ @pytest.mark.parametrize( - "prepare_tables_for_migration,workflow", + "scenario, workflow", [ ("regular", "migrate-tables"), ("hiveserde", "migrate-external-hiveserde-tables-in-place-experimental"), ("hiveserde", "migrate-external-tables-ctas"), ], - indirect=("prepare_tables_for_migration",), ) def test_table_migration_job_refreshes_migration_status( - ws, installation_ctx, - prepare_tables_for_migration, - workflow, -): + scenario: Literal["regular", "hiveserde"], + workflow: str, + make_table_migration_context, +) -> None: """The migration status should be refreshed after the migration job.""" - tables, _ = prepare_tables_for_migration + tables, _ = make_table_migration_context(scenario, installation_ctx) ctx = installation_ctx.replace( extend_prompts={ r".*Do you want to update the existing installation?.*": 'yes', @@ -62,16 +64,9 @@ def test_table_migration_job_refreshes_migration_status( assert len(asserts) == 0, assert_message -@pytest.mark.parametrize( - "prepare_tables_for_migration,workflow", - [ - ("managed", "migrate-tables"), - ], - indirect=("prepare_tables_for_migration",), -) -def test_table_migration_for_managed_table(ws, installation_ctx, prepare_tables_for_migration, workflow, sql_backend): +def test_table_migration_for_managed_table(installation_ctx, make_table_migration_context) -> None: # This test cases test the CONVERT_TO_EXTERNAL scenario. - tables, dst_schema = prepare_tables_for_migration + tables, dst_schema = make_table_migration_context("managed", installation_ctx) ctx = installation_ctx.replace( extend_prompts={ r"If hive_metastore contains managed table with external.*": "0", @@ -80,24 +75,25 @@ def test_table_migration_for_managed_table(ws, installation_ctx, prepare_tables_ ) ctx.workspace_installation.run() - ctx.deployed_workflows.run_workflow(workflow) + ctx.deployed_workflows.run_workflow("migrate-tables") for table in tables.values(): try: - assert ws.tables.get(f"{dst_schema.catalog_name}.{dst_schema.name}.{table.name}").name + assert ctx.workspace_client.tables.get(f"{dst_schema.catalog_name}.{dst_schema.name}.{table.name}").name except NotFound: assert False, f"{table.name} not found in {dst_schema.catalog_name}.{dst_schema.name}" managed_table = tables["src_managed_table"] - for key, value, _ in sql_backend.fetch(f"DESCRIBE TABLE EXTENDED {escape_sql_identifier(managed_table.full_name)}"): + for key, value, _ in ctx.sql_backend.fetch( + f"DESCRIBE TABLE EXTENDED {escape_sql_identifier(managed_table.full_name)}" + ): if key == "Type": assert value == "EXTERNAL" break -@pytest.mark.parametrize('prepare_tables_for_migration', [('hiveserde')], indirect=True) -def test_hiveserde_table_in_place_migration_job(ws, installation_ctx, prepare_tables_for_migration): - tables, dst_schema = prepare_tables_for_migration +def test_hiveserde_table_in_place_migration_job(installation_ctx, make_table_migration_context) -> None: + tables, dst_schema = make_table_migration_context("hiveserde", installation_ctx) ctx = installation_ctx.replace( extend_prompts={ r".*Do you want to update the existing installation?.*": 'yes', @@ -110,14 +106,13 @@ def test_hiveserde_table_in_place_migration_job(ws, installation_ctx, prepare_ta # assert the tables are migrated for table in tables.values(): try: - assert ws.tables.get(f"{dst_schema.catalog_name}.{dst_schema.name}.{table.name}").name + assert ctx.workspace_client.tables.get(f"{dst_schema.catalog_name}.{dst_schema.name}.{table.name}").name except NotFound: assert False, f"{table.name} not found in {dst_schema.catalog_name}.{dst_schema.name}" -@pytest.mark.parametrize('prepare_tables_for_migration', [('hiveserde')], indirect=True) -def test_hiveserde_table_ctas_migration_job(ws, installation_ctx, prepare_tables_for_migration): - tables, dst_schema = prepare_tables_for_migration +def test_hiveserde_table_ctas_migration_job(installation_ctx, make_table_migration_context) -> None: + tables, dst_schema = make_table_migration_context("hiveserde", installation_ctx) ctx = installation_ctx.replace( extend_prompts={ r".*Do you want to update the existing installation?.*": 'yes', @@ -130,16 +125,13 @@ def test_hiveserde_table_ctas_migration_job(ws, installation_ctx, prepare_tables # assert the tables are migrated for table in tables.values(): try: - assert ws.tables.get(f"{dst_schema.catalog_name}.{dst_schema.name}.{table.name}").name + assert ctx.workspace_client.tables.get(f"{dst_schema.catalog_name}.{dst_schema.name}.{table.name}").name except NotFound: assert False, f"{table.name} not found in {dst_schema.catalog_name}.{dst_schema.name}" -@pytest.mark.parametrize('prepare_tables_for_migration', ['regular'], indirect=True) -def test_table_migration_job_publishes_remaining_tables( - ws, installation_ctx, sql_backend, prepare_tables_for_migration, caplog -): - tables, dst_schema = prepare_tables_for_migration +def test_table_migration_job_publishes_remaining_tables(installation_ctx, make_table_migration_context) -> None: + tables, dst_schema = make_table_migration_context("regular", installation_ctx) installation_ctx.workspace_installation.run() second_table = list(tables.values())[1] table = Table( @@ -154,7 +146,7 @@ def test_table_migration_job_publishes_remaining_tables( assert installation_ctx.deployed_workflows.validate_step("migrate-tables") remaining_tables = list( - sql_backend.fetch( + installation_ctx.sql_backend.fetch( f""" SELECT SUBSTRING(message, LENGTH('remained-hive-metastore-table: ') + 1) diff --git a/tests/integration/workspace_access/test_workflows.py b/tests/integration/workspace_access/test_workflows.py index 62e36d4a2b..bed5065630 100644 --- a/tests/integration/workspace_access/test_workflows.py +++ b/tests/integration/workspace_access/test_workflows.py @@ -57,7 +57,7 @@ def test_running_real_migrate_groups_job( # TODO: Move `include_object_permissions` to context like other `include_` attributes # Limit the considered permissions to the following objects: - installation_ctx.__dict__['include_object_permissions'] = [ + installation_ctx.config.include_object_permissions = [ f"cluster-policies:{cluster_policy.policy_id}", f"TABLE:{table.full_name}", f"secrets:{secret_scope}", From a700a6c4483a191f822fea4f116e356b27693630 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 13 Dec 2024 09:11:47 +0100 Subject: [PATCH 02/10] Ignore TACL migration --- .../hive_metastore/test_workflows.py | 34 +++++++++++++++---- 1 file changed, 28 insertions(+), 6 deletions(-) diff --git a/tests/integration/hive_metastore/test_workflows.py b/tests/integration/hive_metastore/test_workflows.py index 6292a2cd0f..8322e555df 100644 --- a/tests/integration/hive_metastore/test_workflows.py +++ b/tests/integration/hive_metastore/test_workflows.py @@ -25,6 +25,10 @@ def test_table_migration_job_refreshes_migration_status( """The migration status should be refreshed after the migration job.""" tables, _ = make_table_migration_context(scenario, installation_ctx) ctx = installation_ctx.replace( + config_transform=lambda wc: dataclasses.replace( + wc, + skip_tacl_migration=True, + ), extend_prompts={ r".*Do you want to update the existing installation?.*": 'yes', }, @@ -68,6 +72,10 @@ def test_table_migration_for_managed_table(installation_ctx, make_table_migratio # This test cases test the CONVERT_TO_EXTERNAL scenario. tables, dst_schema = make_table_migration_context("managed", installation_ctx) ctx = installation_ctx.replace( + config_transform=lambda wc: dataclasses.replace( + wc, + skip_tacl_migration=True, + ), extend_prompts={ r"If hive_metastore contains managed table with external.*": "0", r".*Do you want to update the existing installation?.*": 'yes', @@ -95,6 +103,10 @@ def test_table_migration_for_managed_table(installation_ctx, make_table_migratio def test_hiveserde_table_in_place_migration_job(installation_ctx, make_table_migration_context) -> None: tables, dst_schema = make_table_migration_context("hiveserde", installation_ctx) ctx = installation_ctx.replace( + config_transform=lambda wc: dataclasses.replace( + wc, + skip_tacl_migration=True, + ), extend_prompts={ r".*Do you want to update the existing installation?.*": 'yes', }, @@ -114,6 +126,10 @@ def test_hiveserde_table_in_place_migration_job(installation_ctx, make_table_mig def test_hiveserde_table_ctas_migration_job(installation_ctx, make_table_migration_context) -> None: tables, dst_schema = make_table_migration_context("hiveserde", installation_ctx) ctx = installation_ctx.replace( + config_transform=lambda wc: dataclasses.replace( + wc, + skip_tacl_migration=True, + ), extend_prompts={ r".*Do you want to update the existing installation?.*": 'yes', }, @@ -132,7 +148,13 @@ def test_hiveserde_table_ctas_migration_job(installation_ctx, make_table_migrati def test_table_migration_job_publishes_remaining_tables(installation_ctx, make_table_migration_context) -> None: tables, dst_schema = make_table_migration_context("regular", installation_ctx) - installation_ctx.workspace_installation.run() + ctx = installation_ctx.replace( + config_transform=lambda wc: dataclasses.replace( + wc, + skip_tacl_migration=True, + ), + ) + ctx.workspace_installation.run() second_table = list(tables.values())[1] table = Table( "hive_metastore", @@ -141,17 +163,17 @@ def test_table_migration_job_publishes_remaining_tables(installation_ctx, make_t object_type="UNKNOWN", table_format="UNKNOWN", ) - installation_ctx.table_mapping.skip_table_or_view(dst_schema.name, second_table.name, load_table=lambda *_: table) - installation_ctx.deployed_workflows.run_workflow("migrate-tables") - assert installation_ctx.deployed_workflows.validate_step("migrate-tables") + ctx.table_mapping.skip_table_or_view(dst_schema.name, second_table.name, load_table=lambda *_: table) + ctx.deployed_workflows.run_workflow("migrate-tables") + assert ctx.deployed_workflows.validate_step("migrate-tables") remaining_tables = list( - installation_ctx.sql_backend.fetch( + ctx.sql_backend.fetch( f""" SELECT SUBSTRING(message, LENGTH('remained-hive-metastore-table: ') + 1) AS message - FROM {installation_ctx.inventory_database}.logs + FROM {ctx.inventory_database}.logs WHERE message LIKE 'remained-hive-metastore-table: %' """ ) From 68fee781d487452c3a236535385e1d589ac4ead2 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 13 Dec 2024 09:16:36 +0100 Subject: [PATCH 03/10] Clean up tests --- tests/integration/hive_metastore/test_workflows.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/integration/hive_metastore/test_workflows.py b/tests/integration/hive_metastore/test_workflows.py index 8322e555df..70716df776 100644 --- a/tests/integration/hive_metastore/test_workflows.py +++ b/tests/integration/hive_metastore/test_workflows.py @@ -68,8 +68,7 @@ def test_table_migration_job_refreshes_migration_status( assert len(asserts) == 0, assert_message -def test_table_migration_for_managed_table(installation_ctx, make_table_migration_context) -> None: - # This test cases test the CONVERT_TO_EXTERNAL scenario. +def test_table_migration_convert_manged_to_external(installation_ctx, make_table_migration_context) -> None: tables, dst_schema = make_table_migration_context("managed", installation_ctx) ctx = installation_ctx.replace( config_transform=lambda wc: dataclasses.replace( @@ -112,10 +111,10 @@ def test_hiveserde_table_in_place_migration_job(installation_ctx, make_table_mig }, ) ctx.workspace_installation.run() + ctx.deployed_workflows.run_workflow("migrate-external-hiveserde-tables-in-place-experimental") - # assert the workflow is successful + assert ctx.deployed_workflows.validate_step("migrate-external-hiveserde-tables-in-place-experimental") - # assert the tables are migrated for table in tables.values(): try: assert ctx.workspace_client.tables.get(f"{dst_schema.catalog_name}.{dst_schema.name}.{table.name}").name @@ -135,10 +134,10 @@ def test_hiveserde_table_ctas_migration_job(installation_ctx, make_table_migrati }, ) ctx.workspace_installation.run() + ctx.deployed_workflows.run_workflow("migrate-external-tables-ctas") - # assert the workflow is successful + assert ctx.deployed_workflows.validate_step("migrate-external-tables-ctas") - # assert the tables are migrated for table in tables.values(): try: assert ctx.workspace_client.tables.get(f"{dst_schema.catalog_name}.{dst_schema.name}.{table.name}").name @@ -164,9 +163,10 @@ def test_table_migration_job_publishes_remaining_tables(installation_ctx, make_t table_format="UNKNOWN", ) ctx.table_mapping.skip_table_or_view(dst_schema.name, second_table.name, load_table=lambda *_: table) + ctx.deployed_workflows.run_workflow("migrate-tables") - assert ctx.deployed_workflows.validate_step("migrate-tables") + assert ctx.deployed_workflows.validate_step("migrate-tables") remaining_tables = list( ctx.sql_backend.fetch( f""" From f01fa609eb5e11e43f8f004ea52f8fce47575613 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 13 Dec 2024 09:19:17 +0100 Subject: [PATCH 04/10] Merge tests --- .../hive_metastore/test_workflows.py | 30 +++---------------- 1 file changed, 4 insertions(+), 26 deletions(-) diff --git a/tests/integration/hive_metastore/test_workflows.py b/tests/integration/hive_metastore/test_workflows.py index 70716df776..7153c86e33 100644 --- a/tests/integration/hive_metastore/test_workflows.py +++ b/tests/integration/hive_metastore/test_workflows.py @@ -99,7 +99,8 @@ def test_table_migration_convert_manged_to_external(installation_ctx, make_table break -def test_hiveserde_table_in_place_migration_job(installation_ctx, make_table_migration_context) -> None: +@pytest.mark.parametrize("workflow", ["migrate-external-hiveserde-tables-in-place-experimental", "migrate-external-tables-ctas"]) +def test_hiveserde_table_in_place_migration_job(installation_ctx, make_table_migration_context, workflow) -> None: tables, dst_schema = make_table_migration_context("hiveserde", installation_ctx) ctx = installation_ctx.replace( config_transform=lambda wc: dataclasses.replace( @@ -112,32 +113,9 @@ def test_hiveserde_table_in_place_migration_job(installation_ctx, make_table_mig ) ctx.workspace_installation.run() - ctx.deployed_workflows.run_workflow("migrate-external-hiveserde-tables-in-place-experimental") + ctx.deployed_workflows.run_workflow(workflow, skip_job_wait=True) - assert ctx.deployed_workflows.validate_step("migrate-external-hiveserde-tables-in-place-experimental") - for table in tables.values(): - try: - assert ctx.workspace_client.tables.get(f"{dst_schema.catalog_name}.{dst_schema.name}.{table.name}").name - except NotFound: - assert False, f"{table.name} not found in {dst_schema.catalog_name}.{dst_schema.name}" - - -def test_hiveserde_table_ctas_migration_job(installation_ctx, make_table_migration_context) -> None: - tables, dst_schema = make_table_migration_context("hiveserde", installation_ctx) - ctx = installation_ctx.replace( - config_transform=lambda wc: dataclasses.replace( - wc, - skip_tacl_migration=True, - ), - extend_prompts={ - r".*Do you want to update the existing installation?.*": 'yes', - }, - ) - ctx.workspace_installation.run() - - ctx.deployed_workflows.run_workflow("migrate-external-tables-ctas") - - assert ctx.deployed_workflows.validate_step("migrate-external-tables-ctas") + assert installation_ctx.deployed_workflows.validate_step(workflow), f"Workflow failed: {workflow}" for table in tables.values(): try: assert ctx.workspace_client.tables.get(f"{dst_schema.catalog_name}.{dst_schema.name}.{table.name}").name From 096388b1ddb8aa7e3fd7ac6a950b4e2ae21a0900 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 13 Dec 2024 09:24:01 +0100 Subject: [PATCH 05/10] Clean up asserts --- .../hive_metastore/test_workflows.py | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/tests/integration/hive_metastore/test_workflows.py b/tests/integration/hive_metastore/test_workflows.py index 7153c86e33..c4ba466eae 100644 --- a/tests/integration/hive_metastore/test_workflows.py +++ b/tests/integration/hive_metastore/test_workflows.py @@ -2,6 +2,7 @@ from typing import Literal import pytest +from databricks.labs.lsql.core import Row from databricks.sdk.errors import NotFound from databricks.labs.ucx.framework.utils import escape_sql_identifier @@ -84,13 +85,14 @@ def test_table_migration_convert_manged_to_external(installation_ctx, make_table ctx.workspace_installation.run() ctx.deployed_workflows.run_workflow("migrate-tables") + missing_tables = set[str]() for table in tables.values(): - try: - assert ctx.workspace_client.tables.get(f"{dst_schema.catalog_name}.{dst_schema.name}.{table.name}").name - except NotFound: - assert False, f"{table.name} not found in {dst_schema.catalog_name}.{dst_schema.name}" - managed_table = tables["src_managed_table"] + migrated_table_name = f"{dst_schema.catalog_name}.{dst_schema.name}.{table.name}" + if not ctx.workspace_client.tables.exists(migrated_table_name): + missing_tables.add(migrated_table_name) + assert not missing_tables, f"Missing migrated tables: {missing_tables}" + managed_table = tables["src_managed_table"] for key, value, _ in ctx.sql_backend.fetch( f"DESCRIBE TABLE EXTENDED {escape_sql_identifier(managed_table.full_name)}" ): @@ -116,11 +118,12 @@ def test_hiveserde_table_in_place_migration_job(installation_ctx, make_table_mig ctx.deployed_workflows.run_workflow(workflow, skip_job_wait=True) assert installation_ctx.deployed_workflows.validate_step(workflow), f"Workflow failed: {workflow}" + missing_tables = set[str]() for table in tables.values(): - try: - assert ctx.workspace_client.tables.get(f"{dst_schema.catalog_name}.{dst_schema.name}.{table.name}").name - except NotFound: - assert False, f"{table.name} not found in {dst_schema.catalog_name}.{dst_schema.name}" + migrated_table_name = f"{dst_schema.catalog_name}.{dst_schema.name}.{table.name}" + if not ctx.workspace_client.tables.exists(migrated_table_name): + missing_tables.add(migrated_table_name) + assert not missing_tables, f"Missing migrated tables: {missing_tables}" def test_table_migration_job_publishes_remaining_tables(installation_ctx, make_table_migration_context) -> None: @@ -156,4 +159,4 @@ def test_table_migration_job_publishes_remaining_tables(installation_ctx, make_t """ ) ) - assert remaining_tables[0].message == f'hive_metastore.{dst_schema.name}.{second_table.name}' + assert remaining_tables == [Row(message=f"hive_metastore.{dst_schema.name}.{second_table.name}")] From d3a9544d25fe055eadeefdb830652607bafdb55f Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 13 Dec 2024 09:53:31 +0100 Subject: [PATCH 06/10] Format --- tests/integration/hive_metastore/test_workflows.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/integration/hive_metastore/test_workflows.py b/tests/integration/hive_metastore/test_workflows.py index c4ba466eae..deb29d2499 100644 --- a/tests/integration/hive_metastore/test_workflows.py +++ b/tests/integration/hive_metastore/test_workflows.py @@ -3,7 +3,6 @@ import pytest from databricks.labs.lsql.core import Row -from databricks.sdk.errors import NotFound from databricks.labs.ucx.framework.utils import escape_sql_identifier from databricks.labs.ucx.hive_metastore.tables import Table @@ -101,7 +100,9 @@ def test_table_migration_convert_manged_to_external(installation_ctx, make_table break -@pytest.mark.parametrize("workflow", ["migrate-external-hiveserde-tables-in-place-experimental", "migrate-external-tables-ctas"]) +@pytest.mark.parametrize( + "workflow", ["migrate-external-hiveserde-tables-in-place-experimental", "migrate-external-tables-ctas"] +) def test_hiveserde_table_in_place_migration_job(installation_ctx, make_table_migration_context, workflow) -> None: tables, dst_schema = make_table_migration_context("hiveserde", installation_ctx) ctx = installation_ctx.replace( @@ -122,7 +123,7 @@ def test_hiveserde_table_in_place_migration_job(installation_ctx, make_table_mig for table in tables.values(): migrated_table_name = f"{dst_schema.catalog_name}.{dst_schema.name}.{table.name}" if not ctx.workspace_client.tables.exists(migrated_table_name): - missing_tables.add(migrated_table_name) + missing_tables.add(migrated_table_name) assert not missing_tables, f"Missing migrated tables: {missing_tables}" From 2428a413aa3497664d1530ef098156942fb684e6 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 13 Dec 2024 09:55:28 +0100 Subject: [PATCH 07/10] Set back include object permissions --- tests/integration/workspace_access/test_workflows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/workspace_access/test_workflows.py b/tests/integration/workspace_access/test_workflows.py index bed5065630..62e36d4a2b 100644 --- a/tests/integration/workspace_access/test_workflows.py +++ b/tests/integration/workspace_access/test_workflows.py @@ -57,7 +57,7 @@ def test_running_real_migrate_groups_job( # TODO: Move `include_object_permissions` to context like other `include_` attributes # Limit the considered permissions to the following objects: - installation_ctx.config.include_object_permissions = [ + installation_ctx.__dict__['include_object_permissions'] = [ f"cluster-policies:{cluster_policy.policy_id}", f"TABLE:{table.full_name}", f"secrets:{secret_scope}", From 8a03225fd9a143f6ab4ffae1435a48aedba0c61c Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 13 Dec 2024 09:57:11 +0100 Subject: [PATCH 08/10] Clean up test_migration_job_ext_hms --- .../integration/hive_metastore/test_ext_hms.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/tests/integration/hive_metastore/test_ext_hms.py b/tests/integration/hive_metastore/test_ext_hms.py index 7d1b88e71e..d10c9a13cd 100644 --- a/tests/integration/hive_metastore/test_ext_hms.py +++ b/tests/integration/hive_metastore/test_ext_hms.py @@ -24,11 +24,12 @@ def sql_backend(ws, env_or_skip) -> SqlBackend: @retried(on=[NotFound, InvalidParameterValue], timeout=timedelta(minutes=5)) def test_migration_job_ext_hms(ws, installation_ctx, make_table_migration_context, env_or_skip) -> None: - tables, dst_schema = make_table_migration_context('regular', installation_ctx) + tables, dst_schema = make_table_migration_context("regular", installation_ctx) ext_hms_cluster_id = env_or_skip("TEST_EXT_HMS_CLUSTER_ID") ext_hms_ctx = installation_ctx.replace( config_transform=lambda wc: dataclasses.replace( wc, + skip_tacl_migration=True, override_clusters={ "main": ext_hms_cluster_id, "user_isolation": ext_hms_cluster_id, @@ -44,18 +45,21 @@ def test_migration_job_ext_hms(ws, installation_ctx, make_table_migration_contex r"Choose a cluster policy": "0", }, ) - ext_hms_ctx.workspace_installation.run() - ext_hms_ctx.deployed_workflows.run_workflow("migrate-tables") + + ext_hms_ctx.deployed_workflows.run_workflow("migrate-tables", skip_job_wait=True) + # assert the workflow is successful assert ext_hms_ctx.deployed_workflows.validate_step("migrate-tables") # assert the tables are migrated + missing_tables = set[str]() for table in tables.values(): - try: - assert ws.tables.get(f"{dst_schema.catalog_name}.{dst_schema.name}.{table.name}").name - except NotFound: - assert False, f"{table.name} not found in {dst_schema.catalog_name}.{dst_schema.name}" + migrated_table_name = f"{dst_schema.catalog_name}.{dst_schema.name}.{table.name}" + if not ext_hms_ctx.workspace_client.tables.exists(migrated_table_name): + missing_tables.add(migrated_table_name) + assert not missing_tables, f"Missing migrated tables: {missing_tables}" + # assert the cluster is configured correctly with ext hms install_state = ext_hms_ctx.installation.load(RawState) for job_cluster in ws.jobs.get(install_state.resources["jobs"]["migrate-tables"]).settings.job_clusters: From fe33ce87b2f65b1ac843c6750f462f096364ca27 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 13 Dec 2024 09:58:52 +0100 Subject: [PATCH 09/10] Add skip job wait flag --- tests/integration/hive_metastore/test_workflows.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/integration/hive_metastore/test_workflows.py b/tests/integration/hive_metastore/test_workflows.py index deb29d2499..19ebe0c848 100644 --- a/tests/integration/hive_metastore/test_workflows.py +++ b/tests/integration/hive_metastore/test_workflows.py @@ -35,7 +35,9 @@ def test_table_migration_job_refreshes_migration_status( ) ctx.workspace_installation.run() - ctx.deployed_workflows.run_workflow(workflow) + ctx.deployed_workflows.run_workflow(workflow, skip_job_wait=True) + + assert ctx.deployed_workflows.validate_step(workflow) # Avoiding MigrationStatusRefresh as it will refresh the status before fetching migration_status_query = f"SELECT * FROM {ctx.config.inventory_database}.migration_status" @@ -82,7 +84,7 @@ def test_table_migration_convert_manged_to_external(installation_ctx, make_table ) ctx.workspace_installation.run() - ctx.deployed_workflows.run_workflow("migrate-tables") + ctx.deployed_workflows.run_workflow("migrate-tables", skip_job_wait=True) missing_tables = set[str]() for table in tables.values(): @@ -146,7 +148,7 @@ def test_table_migration_job_publishes_remaining_tables(installation_ctx, make_t ) ctx.table_mapping.skip_table_or_view(dst_schema.name, second_table.name, load_table=lambda *_: table) - ctx.deployed_workflows.run_workflow("migrate-tables") + ctx.deployed_workflows.run_workflow("migrate-tables", skip_job_wait=True) assert ctx.deployed_workflows.validate_step("migrate-tables") remaining_tables = list( From 79635b038eed8475f4db535417bf661d7fc4a8f6 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 13 Dec 2024 10:46:40 +0100 Subject: [PATCH 10/10] Validate workflow finished successfully before other asserts --- tests/integration/hive_metastore/test_workflows.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/integration/hive_metastore/test_workflows.py b/tests/integration/hive_metastore/test_workflows.py index 19ebe0c848..6f065134f3 100644 --- a/tests/integration/hive_metastore/test_workflows.py +++ b/tests/integration/hive_metastore/test_workflows.py @@ -86,6 +86,8 @@ def test_table_migration_convert_manged_to_external(installation_ctx, make_table ctx.workspace_installation.run() ctx.deployed_workflows.run_workflow("migrate-tables", skip_job_wait=True) + assert ctx.deployed_workflows.validate_step("migrate-tables") + missing_tables = set[str]() for table in tables.values(): migrated_table_name = f"{dst_schema.catalog_name}.{dst_schema.name}.{table.name}"