Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions myhoard/basebackup_operation.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
# Copyright (c) 2019 Aiven, Helsinki, Finland. https://aiven.io/
from contextlib import suppress
from distutils.version import LooseVersion # pylint:disable=deprecated-module
from myhoard.errors import XtraBackupError
from myhoard.util import get_mysql_version, mysql_cursor
from rohmu.util import increase_pipe_capacity, set_stream_nonblocking
from typing import Optional

import base64
import logging
Expand Down Expand Up @@ -73,6 +76,7 @@ def create_backup(self):
self.abort_reason = None
self.data_directory_size_start, self.data_directory_filtered_size = self._get_data_directory_size()
self._update_progress()
self._optimize_tables()

# Write encryption key to file to avoid having it on command line. NamedTemporaryFile has mode 0600
with tempfile.NamedTemporaryFile(
Expand Down Expand Up @@ -119,6 +123,45 @@ def create_backup(self):
self.data_directory_size_end, self.data_directory_filtered_size = self._get_data_directory_size()
self._update_progress(estimated_progress=100)

def _optimize_tables(self) -> None:
with mysql_cursor(**self.mysql_client_params) as cursor:
version = get_mysql_version(cursor)
if LooseVersion(version) < LooseVersion("8.0.29"):
return

# allow OPTIMIZE TABLE to run on tables without primary keys
cursor.execute("SET @@SESSION.sql_require_primary_key = 0;")

def unescape_to_utf8(escaped: str) -> Optional[str]:
ret = re.sub(r"@([0-9a-fA-F]{4})", lambda m: chr(int(m.group(1), 16)), escaped)
ret = re.sub(
r"@([0-9a-fA-F])([a-zA-Z])", lambda m: chr(ord(m.group(2)) + 121 + 20 * int(m.group(1), 16)), ret
)
if "`" in ret or "\\" in ret:
# bail out so we don't unescape ourselves below
return None
return ret

database_and_tables = []
cursor.execute("SELECT NAME FROM INFORMATION_SCHEMA.INNODB_TABLES WHERE TOTAL_ROW_VERSIONS > 0")
for row in cursor.fetchall():
db_and_table = row["NAME"].split("/")
table_info = {
"database": unescape_to_utf8(db_and_table[0]),
"table": unescape_to_utf8(db_and_table[1]),
}
if table_info["database"] is None or table_info["table"] is None:
self.log.warning("Could not decode database/table name of '%s'", row["NAME"])
continue
database_and_tables.append(table_info)

for database_and_table in database_and_tables:
self.stats.increase(metric="myhoard.basebackup.optimize_table")
self.log.info("Optimizing table %r", database_and_table)
# sending it as parameters doesn't work
cursor.execute(f"OPTIMIZE TABLE `{database_and_table['database']}`.`{database_and_table['table']}`")
cursor.execute("COMMIT")

def _get_data_directory_size(self):
total_filtered_size = 0
total_size = 0
Expand Down
9 changes: 9 additions & 0 deletions myhoard/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,15 @@ def relay_log_name(*, prefix, index, full_path=True):
return name


def get_mysql_version(cursor: pymysql.cursors.DictCursor) -> Optional[str]:
cursor.execute("SELECT @@GLOBAL.version AS mysql_version")
row = cursor.fetchone()
if row is not None and "mysql_version" in row:
return row["mysql_version"]
else:
return None


def track_rate(*, current, last_recorded, last_recorded_time, metric_name, min_increase=1_000_000, stats):
"""Calculates rate of change given current value and previously handled value and time. If there is
a relevant change (as defined by min_increase) and some time has passed, the current rate of change
Expand Down
54 changes: 52 additions & 2 deletions test/test_basebackup_operation.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# Copyright (c) 2019 Aiven, Helsinki, Finland. https://aiven.io/
from . import build_statsd_client, restart_mysql
from . import build_statsd_client, MySQLConfig, restart_mysql
from distutils.version import LooseVersion # pylint:disable=deprecated-module
from myhoard.basebackup_operation import BasebackupOperation
from typing import IO
from unittest import SkipTest

import myhoard.util as myhoard_util
import os
Expand Down Expand Up @@ -125,5 +128,52 @@ def stream_handler(_stream):
stream_handler=stream_handler,
temp_dir=mysql_master.base_dir,
)
with pytest.raises(Exception, match="^xtrabackup failed with code 13$"):
# we're opening a connection to check the version on mysql >= 8.0.29 below we're failing with the first message
with pytest.raises(Exception, match=r"(^xtrabackup failed with code 13$|^mysql_cursor\(\) missing 3 required keyword)"):
op.create_backup()


def test_backup_with_non_optimized_tables(mysql_master: MySQLConfig) -> None:
with myhoard_util.mysql_cursor(**mysql_master.connect_options) as cursor:
version = myhoard_util.get_mysql_version(cursor)
if LooseVersion(version) < LooseVersion("8.0.29"):
raise SkipTest("DB version doesn't need OPTIMIZE TABLE")

def create_test_db(*, db_name: str, table_name: str, add_pk: bool) -> None:
cursor.execute(f"CREATE DATABASE {db_name}")

if add_pk:
id_column_type = "integer primary key"
else:
id_column_type = "integer"

cursor.execute(f"CREATE TABLE {db_name}.{table_name} (id {id_column_type})")
for value in range(15):
cursor.execute(f"INSERT INTO {db_name}.{table_name} (id) VALUES ({value})")
cursor.execute("COMMIT")
cursor.execute(f"ALTER TABLE {db_name}.{table_name} ADD COLUMN foobar VARCHAR(15)")
cursor.execute("COMMIT")

for db_index in range(15):
create_test_db(db_name=f"test{db_index}", table_name=f"foo{db_index}", add_pk=db_index % 2 == 0)

create_test_db(db_name="`söme/thing'; weird`", table_name="`table with space`", add_pk=True)

def stream_handler(stream: IO) -> None:
while True:
if not stream.read(10 * 1024):
break

encryption_key = os.urandom(24)
op = BasebackupOperation(
encryption_algorithm="AES256",
encryption_key=encryption_key,
mysql_client_params=mysql_master.connect_options,
mysql_config_file_name=mysql_master.config_name,
mysql_data_directory=mysql_master.config_options.datadir,
progress_callback=None,
stats=build_statsd_client(),
stream_handler=stream_handler,
temp_dir=mysql_master.base_dir,
)
op.create_backup()