Skip to content

Exclude TACL migration in table migration integration tests #3446

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 47 additions & 39 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import json
from collections.abc import Callable, Generator
import functools
import collections
import os
import logging
import shutil
import subprocess
from collections.abc import Callable, Generator
from dataclasses import replace
from datetime import timedelta
from functools import cached_property
import shutil
import subprocess
from typing import Literal

import pytest # pylint: disable=wrong-import-order
from databricks.labs.blueprint.commands import CommandExecutor
Expand Down Expand Up @@ -1222,42 +1223,49 @@ def prepare_regular_tables(context, external_csv, schema) -> dict[str, TableInfo


@pytest.fixture
def prepare_tables_for_migration(
ws, installation_ctx, make_catalog, make_random, make_mounted_location, env_or_skip, make_storage_dir, request
) -> tuple[dict[str, TableInfo], SchemaInfo]:
# Here we use pytest indirect parametrization, so the test function can pass arguments to this fixture and the
# arguments will be available in the request.param. If the argument is "hiveserde", we will prepare hiveserde
# tables, otherwise we will prepare regular tables.
# see documents here for details https://docs.pytest.org/en/8.1.x/example/parametrize.html#indirect-parametrization
scenario = request.param
is_hiveserde = scenario == "hiveserde"
random = make_random(5).lower()
# create external and managed tables to be migrated
if scenario == "hiveserde":
schema = installation_ctx.make_schema(catalog_name="hive_metastore", name=f"hiveserde_in_place_{random}")
table_base_dir = make_storage_dir(
path=f'dbfs:/mnt/{env_or_skip("TEST_MOUNT_NAME")}/a/hiveserde_in_place_{random}'
)
tables = prepare_hiveserde_tables(installation_ctx, random, schema, table_base_dir)
elif scenario == "managed":
schema_name = f"managed_{random}"
schema_location = f'dbfs:/mnt/{env_or_skip("TEST_MOUNT_NAME")}/a/managed_{random}'
schema = installation_ctx.make_schema(catalog_name="hive_metastore", name=schema_name, location=schema_location)
tables = prepare_regular_tables(installation_ctx, make_mounted_location, schema)
elif scenario == "regular":
schema = installation_ctx.make_schema(catalog_name="hive_metastore", name=f"migrate_{random}")
tables = prepare_regular_tables(installation_ctx, make_mounted_location, schema)

# create destination catalog and schema
dst_catalog = make_catalog()
dst_schema = installation_ctx.make_schema(catalog_name=dst_catalog.name, name=schema.name)
migrate_rules = [Rule.from_src_dst(table, dst_schema) for _, table in tables.items()]
installation_ctx.with_table_mapping_rules(migrate_rules)
installation_ctx.with_dummy_resource_permission()
installation_ctx.save_tables(is_hiveserde=is_hiveserde)
installation_ctx.save_mounts()
installation_ctx.with_dummy_grants_and_tacls()
return tables, dst_schema
def make_table_migration_context(
env_or_skip,
make_random,
make_mounted_location,
make_storage_dir,
) -> Callable[
[Literal["hiveserde", "managed", "regular"], MockInstallationContext], tuple[dict[str, TableInfo], SchemaInfo]
]:

def prepare(
scenario: Literal["hiveserde", "managed", "regular"], ctx: MockInstallationContext
) -> tuple[dict[str, TableInfo], SchemaInfo]:
random = make_random(5).lower()
# create external and managed tables to be migrated
if scenario == "hiveserde":
schema = ctx.make_schema(catalog_name="hive_metastore", name=f"hiveserde_in_place_{random}")
table_base_dir = make_storage_dir(
path=f'dbfs:/mnt/{env_or_skip("TEST_MOUNT_NAME")}/a/hiveserde_in_place_{random}'
)
tables = prepare_hiveserde_tables(ctx, random, schema, table_base_dir)
elif scenario == "managed":
schema_name = f"managed_{random}"
schema_location = f'dbfs:/mnt/{env_or_skip("TEST_MOUNT_NAME")}/a/managed_{random}'
schema = ctx.make_schema(catalog_name="hive_metastore", name=schema_name, location=schema_location)
tables = prepare_regular_tables(ctx, make_mounted_location, schema)
elif scenario == "regular":
schema = ctx.make_schema(catalog_name="hive_metastore", name=f"migrate_{random}")
tables = prepare_regular_tables(ctx, make_mounted_location, schema)
else:
raise ValueError(f"Unsupported scenario {scenario}")

# create destination catalog and schema
dst_catalog = ctx.make_catalog()
dst_schema = ctx.make_schema(catalog_name=dst_catalog.name, name=schema.name)
migrate_rules = [Rule.from_src_dst(table, dst_schema) for _, table in tables.items()]
ctx.with_table_mapping_rules(migrate_rules)
ctx.with_dummy_resource_permission()
ctx.save_tables(is_hiveserde=scenario == "hiveserde")
ctx.save_mounts()
ctx.with_dummy_grants_and_tacls()
return tables, dst_schema

return prepare


@pytest.fixture
Expand Down
21 changes: 12 additions & 9 deletions tests/integration/hive_metastore/test_ext_hms.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ def sql_backend(ws, env_or_skip) -> SqlBackend:


@retried(on=[NotFound, InvalidParameterValue], timeout=timedelta(minutes=5))
@pytest.mark.parametrize('prepare_tables_for_migration', ['regular'], indirect=True)
def test_migration_job_ext_hms(ws, installation_ctx, prepare_tables_for_migration, env_or_skip):
def test_migration_job_ext_hms(ws, installation_ctx, make_table_migration_context, env_or_skip) -> None:
tables, dst_schema = make_table_migration_context("regular", installation_ctx)
ext_hms_cluster_id = env_or_skip("TEST_EXT_HMS_CLUSTER_ID")
tables, dst_schema = prepare_tables_for_migration
ext_hms_ctx = installation_ctx.replace(
config_transform=lambda wc: dataclasses.replace(
wc,
skip_tacl_migration=True,
override_clusters={
"main": ext_hms_cluster_id,
"user_isolation": ext_hms_cluster_id,
Expand All @@ -45,18 +45,21 @@ def test_migration_job_ext_hms(ws, installation_ctx, prepare_tables_for_migratio
r"Choose a cluster policy": "0",
},
)

ext_hms_ctx.workspace_installation.run()
ext_hms_ctx.deployed_workflows.run_workflow("migrate-tables")

ext_hms_ctx.deployed_workflows.run_workflow("migrate-tables", skip_job_wait=True)

# assert the workflow is successful
assert ext_hms_ctx.deployed_workflows.validate_step("migrate-tables")

# assert the tables are migrated
missing_tables = set[str]()
for table in tables.values():
try:
assert ws.tables.get(f"{dst_schema.catalog_name}.{dst_schema.name}.{table.name}").name
except NotFound:
assert False, f"{table.name} not found in {dst_schema.catalog_name}.{dst_schema.name}"
migrated_table_name = f"{dst_schema.catalog_name}.{dst_schema.name}.{table.name}"
if not ext_hms_ctx.workspace_client.tables.exists(migrated_table_name):
missing_tables.add(migrated_table_name)
assert not missing_tables, f"Missing migrated tables: {missing_tables}"

# assert the cluster is configured correctly with ext hms
install_state = ext_hms_ctx.installation.load(RawState)
for job_cluster in ws.jobs.get(install_state.resources["jobs"]["migrate-tables"]).settings.job_clusters:
Expand Down
132 changes: 66 additions & 66 deletions tests/integration/hive_metastore/test_workflows.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,43 @@
import dataclasses
from typing import Literal

import pytest
from databricks.sdk.errors import NotFound
from databricks.labs.lsql.core import Row

from databricks.labs.ucx.framework.utils import escape_sql_identifier
from databricks.labs.ucx.hive_metastore.tables import Table


@pytest.mark.parametrize(
"prepare_tables_for_migration,workflow",
"scenario, workflow",
[
("regular", "migrate-tables"),
("hiveserde", "migrate-external-hiveserde-tables-in-place-experimental"),
("hiveserde", "migrate-external-tables-ctas"),
],
indirect=("prepare_tables_for_migration",),
)
def test_table_migration_job_refreshes_migration_status(
ws,
installation_ctx,
prepare_tables_for_migration,
workflow,
):
scenario: Literal["regular", "hiveserde"],
workflow: str,
make_table_migration_context,
) -> None:
"""The migration status should be refreshed after the migration job."""
tables, _ = prepare_tables_for_migration
tables, _ = make_table_migration_context(scenario, installation_ctx)
ctx = installation_ctx.replace(
config_transform=lambda wc: dataclasses.replace(
wc,
skip_tacl_migration=True,
),
extend_prompts={
r".*Do you want to update the existing installation?.*": 'yes',
},
)

ctx.workspace_installation.run()
ctx.deployed_workflows.run_workflow(workflow)
ctx.deployed_workflows.run_workflow(workflow, skip_job_wait=True)

assert ctx.deployed_workflows.validate_step(workflow)

# Avoiding MigrationStatusRefresh as it will refresh the status before fetching
migration_status_query = f"SELECT * FROM {ctx.config.inventory_database}.migration_status"
Expand Down Expand Up @@ -62,85 +70,76 @@ def test_table_migration_job_refreshes_migration_status(
assert len(asserts) == 0, assert_message


@pytest.mark.parametrize(
"prepare_tables_for_migration,workflow",
[
("managed", "migrate-tables"),
],
indirect=("prepare_tables_for_migration",),
)
def test_table_migration_for_managed_table(ws, installation_ctx, prepare_tables_for_migration, workflow, sql_backend):
# This test cases test the CONVERT_TO_EXTERNAL scenario.
tables, dst_schema = prepare_tables_for_migration
def test_table_migration_convert_manged_to_external(installation_ctx, make_table_migration_context) -> None:
tables, dst_schema = make_table_migration_context("managed", installation_ctx)
ctx = installation_ctx.replace(
config_transform=lambda wc: dataclasses.replace(
wc,
skip_tacl_migration=True,
),
extend_prompts={
r"If hive_metastore contains managed table with external.*": "0",
r".*Do you want to update the existing installation?.*": 'yes',
},
)

ctx.workspace_installation.run()
ctx.deployed_workflows.run_workflow(workflow)
ctx.deployed_workflows.run_workflow("migrate-tables", skip_job_wait=True)

assert ctx.deployed_workflows.validate_step("migrate-tables")

missing_tables = set[str]()
for table in tables.values():
try:
assert ws.tables.get(f"{dst_schema.catalog_name}.{dst_schema.name}.{table.name}").name
except NotFound:
assert False, f"{table.name} not found in {dst_schema.catalog_name}.{dst_schema.name}"
managed_table = tables["src_managed_table"]
migrated_table_name = f"{dst_schema.catalog_name}.{dst_schema.name}.{table.name}"
if not ctx.workspace_client.tables.exists(migrated_table_name):
missing_tables.add(migrated_table_name)
assert not missing_tables, f"Missing migrated tables: {missing_tables}"

for key, value, _ in sql_backend.fetch(f"DESCRIBE TABLE EXTENDED {escape_sql_identifier(managed_table.full_name)}"):
managed_table = tables["src_managed_table"]
for key, value, _ in ctx.sql_backend.fetch(
f"DESCRIBE TABLE EXTENDED {escape_sql_identifier(managed_table.full_name)}"
):
if key == "Type":
assert value == "EXTERNAL"
break


@pytest.mark.parametrize('prepare_tables_for_migration', [('hiveserde')], indirect=True)
def test_hiveserde_table_in_place_migration_job(ws, installation_ctx, prepare_tables_for_migration):
tables, dst_schema = prepare_tables_for_migration
@pytest.mark.parametrize(
"workflow", ["migrate-external-hiveserde-tables-in-place-experimental", "migrate-external-tables-ctas"]
)
def test_hiveserde_table_in_place_migration_job(installation_ctx, make_table_migration_context, workflow) -> None:
tables, dst_schema = make_table_migration_context("hiveserde", installation_ctx)
ctx = installation_ctx.replace(
config_transform=lambda wc: dataclasses.replace(
wc,
skip_tacl_migration=True,
),
extend_prompts={
r".*Do you want to update the existing installation?.*": 'yes',
},
)
ctx.workspace_installation.run()
ctx.deployed_workflows.run_workflow("migrate-external-hiveserde-tables-in-place-experimental")
# assert the workflow is successful
assert ctx.deployed_workflows.validate_step("migrate-external-hiveserde-tables-in-place-experimental")
# assert the tables are migrated

ctx.deployed_workflows.run_workflow(workflow, skip_job_wait=True)

assert installation_ctx.deployed_workflows.validate_step(workflow), f"Workflow failed: {workflow}"
missing_tables = set[str]()
for table in tables.values():
try:
assert ws.tables.get(f"{dst_schema.catalog_name}.{dst_schema.name}.{table.name}").name
except NotFound:
assert False, f"{table.name} not found in {dst_schema.catalog_name}.{dst_schema.name}"
migrated_table_name = f"{dst_schema.catalog_name}.{dst_schema.name}.{table.name}"
if not ctx.workspace_client.tables.exists(migrated_table_name):
missing_tables.add(migrated_table_name)
assert not missing_tables, f"Missing migrated tables: {missing_tables}"


@pytest.mark.parametrize('prepare_tables_for_migration', [('hiveserde')], indirect=True)
def test_hiveserde_table_ctas_migration_job(ws, installation_ctx, prepare_tables_for_migration):
tables, dst_schema = prepare_tables_for_migration
def test_table_migration_job_publishes_remaining_tables(installation_ctx, make_table_migration_context) -> None:
tables, dst_schema = make_table_migration_context("regular", installation_ctx)
ctx = installation_ctx.replace(
extend_prompts={
r".*Do you want to update the existing installation?.*": 'yes',
},
config_transform=lambda wc: dataclasses.replace(
wc,
skip_tacl_migration=True,
),
)
ctx.workspace_installation.run()
ctx.deployed_workflows.run_workflow("migrate-external-tables-ctas")
# assert the workflow is successful
assert ctx.deployed_workflows.validate_step("migrate-external-tables-ctas")
# assert the tables are migrated
for table in tables.values():
try:
assert ws.tables.get(f"{dst_schema.catalog_name}.{dst_schema.name}.{table.name}").name
except NotFound:
assert False, f"{table.name} not found in {dst_schema.catalog_name}.{dst_schema.name}"


@pytest.mark.parametrize('prepare_tables_for_migration', ['regular'], indirect=True)
def test_table_migration_job_publishes_remaining_tables(
ws, installation_ctx, sql_backend, prepare_tables_for_migration, caplog
):
tables, dst_schema = prepare_tables_for_migration
installation_ctx.workspace_installation.run()
second_table = list(tables.values())[1]
table = Table(
"hive_metastore",
Expand All @@ -149,19 +148,20 @@ def test_table_migration_job_publishes_remaining_tables(
object_type="UNKNOWN",
table_format="UNKNOWN",
)
installation_ctx.table_mapping.skip_table_or_view(dst_schema.name, second_table.name, load_table=lambda *_: table)
installation_ctx.deployed_workflows.run_workflow("migrate-tables")
assert installation_ctx.deployed_workflows.validate_step("migrate-tables")
ctx.table_mapping.skip_table_or_view(dst_schema.name, second_table.name, load_table=lambda *_: table)

ctx.deployed_workflows.run_workflow("migrate-tables", skip_job_wait=True)

assert ctx.deployed_workflows.validate_step("migrate-tables")
remaining_tables = list(
sql_backend.fetch(
ctx.sql_backend.fetch(
f"""
SELECT
SUBSTRING(message, LENGTH('remained-hive-metastore-table: ') + 1)
AS message
FROM {installation_ctx.inventory_database}.logs
FROM {ctx.inventory_database}.logs
WHERE message LIKE 'remained-hive-metastore-table: %'
"""
)
)
assert remaining_tables[0].message == f'hive_metastore.{dst_schema.name}.{second_table.name}'
assert remaining_tables == [Row(message=f"hive_metastore.{dst_schema.name}.{second_table.name}")]
Loading