|
1 | 1 | import logging
|
2 | 2 | from .base import ColumnFamilyTestBase
|
3 |
| -from typing import Callable |
4 | 3 | from ydb.tests.library.common.helpers import plain_or_under_sanitizer
|
5 |
| -from ydb.tests.olap.common.thread_helper import TestThread, TestThreads |
6 | 4 | from ydb.tests.olap.common.column_table_helper import ColumnTableHelper
|
| 5 | +import pytest |
7 | 6 |
|
8 | 7 | logger = logging.getLogger(__name__)
|
9 | 8 |
|
10 | 9 |
|
11 |
| -class TestAlterCompression(ColumnFamilyTestBase): |
12 |
| - class_name = "alter_compression" |
| 10 | +class TestCompressionBase(ColumnFamilyTestBase): |
13 | 11 |
|
14 | 12 | @classmethod
|
15 | 13 | def setup_class(cls):
|
16 |
| - super(TestAlterCompression, cls).setup_class() |
| 14 | + super(TestCompressionBase, cls).setup_class() |
17 | 15 |
|
| 16 | + @classmethod |
18 | 17 | def upsert_and_wait_portions(self, table: ColumnTableHelper, number_rows_for_insert: int, count_upsert: int):
|
19 | 18 | current_num_rows: int = table.get_row_count()
|
20 | 19 | for _ in range(count_upsert):
|
@@ -49,73 +48,13 @@ def upsert_and_wait_portions(self, table: ColumnTableHelper, number_rows_for_ins
|
49 | 48 | ):
|
50 | 49 | raise Exception("not all portions have been updated")
|
51 | 50 |
|
| 51 | + @classmethod |
52 | 52 | def add_family_in_create(self, name: str, settings: str):
|
53 | 53 | return f"FAMILY {name} ({settings})"
|
54 | 54 |
|
55 |
| - def test_all_supported_compression(self): |
56 |
| - ''' Implements https://github.com/ydb-platform/ydb/issues/13640 ''' |
57 |
| - |
58 |
| - single_upsert_rows_count: int = 10**5 |
59 |
| - upsert_count: int = 10 |
60 |
| - test_name: str = "all_supported_compression" |
61 |
| - test_dir: str = f"{self.ydb_client.database}/{self.class_name}/{test_name}" |
62 |
| - tables_path: list[str] = [ |
63 |
| - f"{test_dir}/off_compression", |
64 |
| - f"{test_dir}/lz4_compression", |
65 |
| - f"{test_dir}/zstd_compression", |
66 |
| - ] |
67 |
| - add_default_family: Callable[[str], str] = lambda settings: self.add_family_in_create(name='default', settings=settings) |
68 |
| - tables_family: list[str] = [ |
69 |
| - add_default_family('COMPRESSION = "off"'), |
70 |
| - add_default_family('COMPRESSION = "lz4"'), |
71 |
| - add_default_family('COMPRESSION = "zstd"'), |
72 |
| - ] |
73 |
| - |
74 |
| - for i in range(2, 22): |
75 |
| - tables_path.append(f"{test_dir}/zstd_{i}_compression") |
76 |
| - tables_family.append(add_default_family(f'COMPRESSION = "zstd", COMPRESSION_LEVEL = {i}')) |
77 |
| - |
78 |
| - assert len(tables_path) == len(tables_family) |
79 |
| - |
80 |
| - tables: list[ColumnTableHelper] = [] |
81 |
| - for table_path, table_family in zip(tables_path, tables_family): |
82 |
| - self.ydb_client.query( |
83 |
| - f""" |
84 |
| - CREATE TABLE `{table_path}` ( |
85 |
| - value Uint64 NOT NULL, |
86 |
| - value1 Uint64, |
87 |
| - PRIMARY KEY(value), |
88 |
| - {table_family} |
89 |
| - ) |
90 |
| - WITH (STORE = COLUMN) |
91 |
| - """ |
92 |
| - ) |
93 |
| - logger.info(f"Table {table_path} created") |
94 |
| - tables.append(ColumnTableHelper(self.ydb_client, table_path)) |
95 |
| - |
96 |
| - assert len(tables) == len(tables_path) |
97 |
| - |
98 |
| - tasks: TestThreads = TestThreads() |
99 |
| - for table in tables: |
100 |
| - tasks.append(TestThread(target=self.upsert_and_wait_portions, args=[table, single_upsert_rows_count, upsert_count])) |
101 | 55 |
|
102 |
| - tasks.start_and_wait_all() |
103 |
| - |
104 |
| - expected_raw = upsert_count * single_upsert_rows_count * 8 |
105 |
| - volumes_without_compression: tuple[int, int] = tables[0].get_volumes_column("value") |
106 |
| - |
107 |
| - for table in tables: |
108 |
| - volumes = table.get_volumes_column("value") |
109 |
| - assert volumes[0] == expected_raw |
110 |
| - assert table.get_portion_stat_by_tier()['__DEFAULT']['Rows'] == expected_raw // 8 |
111 |
| - |
112 |
| - for i in range(1, len(tables_path)): |
113 |
| - volumes: tuple[int, int] = tables[i].get_volumes_column("value") |
114 |
| - koef: float = volumes_without_compression[1] / volumes[1] |
115 |
| - logging.info( |
116 |
| - f"compression in `{tables[i].path}` {volumes_without_compression[1]} / {volumes[1]}: {koef}" |
117 |
| - ) |
118 |
| - assert koef > 1 |
| 56 | +class TestAlterCompression(TestCompressionBase): |
| 57 | + class_name = "alter_compression" |
119 | 58 |
|
120 | 59 | def test_availability_data(self):
|
121 | 60 | ''' Implements https://github.com/ydb-platform/ydb/issues/13643 '''
|
@@ -178,3 +117,85 @@ def check_data(table: ColumnTableHelper, rows_cont: int):
|
178 | 117 | self.upsert_and_wait_portions(test_table, single_upsert_rows_count, upsert_rows_count)
|
179 | 118 | rows_count += single_upsert_rows_count * upsert_rows_count
|
180 | 119 | check_data(table=test_table, rows_cont=rows_count)
|
| 120 | + |
| 121 | + |
| 122 | +class TestAllCompression(TestCompressionBase): |
| 123 | + class_name = "all_compression" |
| 124 | + |
| 125 | + @classmethod |
| 126 | + def setup_class(cls): |
| 127 | + super(TestAllCompression, cls).setup_class() |
| 128 | + cls.single_upsert_rows_count: int = 10**5 |
| 129 | + cls.upsert_count: int = 10 |
| 130 | + cls.volumes_without_compression: tuple[int, int] |
| 131 | + cls.test_name: str = "all_supported_compression" |
| 132 | + cls.test_dir: str = f"{cls.ydb_client.database}/{cls.class_name}/{cls.test_name}" |
| 133 | + cls.create_table_without_compression() |
| 134 | + |
| 135 | + COMPRESSION_CASES = [ |
| 136 | + ("lz4_compression", 'COMPRESSION = "lz4"'), |
| 137 | + ("zstd_compression", 'COMPRESSION = "zstd"'), |
| 138 | + ] + [ |
| 139 | + (f"zstd_{lvl}_compression", f'COMPRESSION = "zstd", COMPRESSION_LEVEL = {lvl}') |
| 140 | + for lvl in range(2, 22) |
| 141 | + ] |
| 142 | + |
| 143 | + @classmethod |
| 144 | + def create_table_without_compression(cls): |
| 145 | + table_path: str = f"{cls.test_dir}/off_compression" |
| 146 | + table_family: str = cls.add_family_in_create(name='default', settings='COMPRESSION = "off"') |
| 147 | + |
| 148 | + cls.ydb_client.query( |
| 149 | + f""" |
| 150 | + CREATE TABLE `{table_path}` ( |
| 151 | + value Uint64 NOT NULL, |
| 152 | + value1 Uint64, |
| 153 | + PRIMARY KEY(value), |
| 154 | + {table_family} |
| 155 | + ) |
| 156 | + WITH (STORE = COLUMN) |
| 157 | + """ |
| 158 | + ) |
| 159 | + logger.info(f"Table {table_path} created") |
| 160 | + table = ColumnTableHelper(cls.ydb_client, table_path) |
| 161 | + cls.upsert_and_wait_portions(table, cls.single_upsert_rows_count, cls.upsert_count) |
| 162 | + |
| 163 | + expected_raw = cls.upsert_count * cls.single_upsert_rows_count * 8 |
| 164 | + cls.volumes_without_compression: tuple[int, int] = table.get_volumes_column("value") |
| 165 | + |
| 166 | + volumes = table.get_volumes_column("value") |
| 167 | + assert volumes[0] == expected_raw |
| 168 | + assert table.get_portion_stat_by_tier()['__DEFAULT']['Rows'] == expected_raw // 8 |
| 169 | + |
| 170 | + @pytest.mark.parametrize("suffix, family_settings", COMPRESSION_CASES) |
| 171 | + def test_all_supported_compression(self, suffix: str, family_settings: str): |
| 172 | + ''' Implements https://github.com/ydb-platform/ydb/issues/13640 ''' |
| 173 | + table_path: str = f"{self.test_dir}/{suffix}" |
| 174 | + table_family: str = self.add_family_in_create(name='default', settings=family_settings) |
| 175 | + |
| 176 | + self.ydb_client.query( |
| 177 | + f""" |
| 178 | + CREATE TABLE `{table_path}` ( |
| 179 | + value Uint64 NOT NULL, |
| 180 | + value1 Uint64, |
| 181 | + PRIMARY KEY(value), |
| 182 | + {table_family} |
| 183 | + ) |
| 184 | + WITH (STORE = COLUMN) |
| 185 | + """ |
| 186 | + ) |
| 187 | + logger.info(f"Table {table_path} created") |
| 188 | + table = ColumnTableHelper(self.ydb_client, table_path) |
| 189 | + self.upsert_and_wait_portions(table, self.single_upsert_rows_count, self.upsert_count) |
| 190 | + |
| 191 | + expected_raw = self.upsert_count * self.single_upsert_rows_count * 8 |
| 192 | + volumes = table.get_volumes_column("value") |
| 193 | + assert volumes[0] == expected_raw |
| 194 | + assert table.get_portion_stat_by_tier()['__DEFAULT']['Rows'] == expected_raw // 8 |
| 195 | + |
| 196 | + volumes: tuple[int, int] = table.get_volumes_column("value") |
| 197 | + koef: float = self.volumes_without_compression[1] / volumes[1] |
| 198 | + logging.info( |
| 199 | + f"compression in `{table.path}` {self.volumes_without_compression[1]} / {volumes[1]}: {koef}" |
| 200 | + ) |
| 201 | + assert koef > 1 |
0 commit comments