Skip to content

Commit 39ecbe0

Browse files
authored
functional tiering tests: data correctness, unstable connection (#13629)
1 parent 82981cc commit 39ecbe0

File tree

6 files changed

+386
-10
lines changed

6 files changed

+386
-10
lines changed

ydb/tests/olap/ttl_tiering/base.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,13 @@ def wait_connection(self, timeout=5):
6767
def query(self, statement):
6868
return self.session_pool.execute_with_retries(statement)
6969

70+
def bulk_upsert(self, table_path, column_types: ydb.BulkUpsertColumns, data_slice):
71+
self.driver.table_client.bulk_upsert(
72+
table_path,
73+
data_slice,
74+
column_types
75+
)
76+
7077

7178
class ColumnTableHelper:
7279
def __init__(self, ydb_client: YdbClient, path: str):
@@ -92,6 +99,15 @@ def get_blob_stat_by_tier(self) -> dict[str, (int, int)]:
9299
results = self.ydb_client.query(stmt)
93100
return {row["TierName"]: {"Portions": row["Portions"], "BlobSize": row["BlobSize"], "BlobCount": row["BlobCount"]} for result_set in results for row in result_set.rows}
94101

102+
def set_fast_compaction(self):
103+
self.ydb_client.query(
104+
f"""
105+
ALTER OBJECT `{self.path}` (TYPE TABLE) SET (ACTION=UPSERT_OPTIONS, `COMPACTION_PLANNER.CLASS_NAME`=`lc-buckets`, `COMPACTION_PLANNER.FEATURES`=`
106+
{{"levels" : [{{"class_name" : "Zero", "portions_live_duration" : "5s", "expected_blobs_size" : 1000000000000, "portions_count_available" : 2}},
107+
{{"class_name" : "Zero"}}]}}`);
108+
"""
109+
)
110+
95111

96112
class TllTieringTestBase(object):
97113
@classmethod
@@ -120,6 +136,7 @@ def _setup_ydb(cls):
120136
"optimizer_freshness_check_duration_ms": 0,
121137
"small_portion_detect_size_limit": 0,
122138
"max_read_staleness_ms": 5000,
139+
"alter_object_enabled": True,
123140
},
124141
additional_log_configs={
125142
"TX_COLUMNSHARD_TIERING": LogLevels.DEBUG,
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
import time
2+
import logging
3+
from .base import TllTieringTestBase, ColumnTableHelper
4+
import ydb
5+
import concurrent
6+
import random
7+
import datetime
8+
9+
logger = logging.getLogger(__name__)
10+
11+
12+
class TestDataCorrectness(TllTieringTestBase):
13+
test_name = "data_correctness"
14+
cold_bucket = "cold"
15+
n_shards = 4
16+
17+
@classmethod
18+
def setup_class(cls):
19+
super(TestDataCorrectness, cls).setup_class()
20+
cls.s3_client.create_bucket(cls.cold_bucket)
21+
22+
def write_data(
23+
self,
24+
table: str,
25+
timestamp_from_ms: int,
26+
rows: int,
27+
value: int = 1,
28+
):
29+
chunk_size = 100
30+
while rows:
31+
current_chunk_size = min(chunk_size, rows)
32+
data = [
33+
{
34+
"ts": timestamp_from_ms + i,
35+
"s": random.randbytes(1024 * 10),
36+
"val": value,
37+
}
38+
for i in range(current_chunk_size)
39+
]
40+
self.ydb_client.bulk_upsert(
41+
table,
42+
self.column_types,
43+
data,
44+
)
45+
timestamp_from_ms += current_chunk_size
46+
rows -= current_chunk_size
47+
assert rows >= 0
48+
49+
def total_values(self, table: str) -> int:
50+
return (
51+
self.ydb_client.query(f"select sum(val) as Values from `{table}`")[0].rows[
52+
0
53+
]["Values"]
54+
or 0
55+
)
56+
57+
def wait_eviction(self, table: ColumnTableHelper):
58+
deadline = datetime.datetime.now() + datetime.timedelta(seconds=60)
59+
while (
60+
table.get_portion_stat_by_tier().get("__DEFAULT", {}).get("Portions", 0)
61+
> self.n_shards
62+
):
63+
assert (
64+
datetime.datetime.now() < deadline
65+
), "Timeout for wait eviction is exceeded"
66+
67+
logger.info(
68+
f"Waiting for data eviction: {table.get_portion_stat_by_tier()}"
69+
)
70+
time.sleep(1)
71+
72+
stats = table.get_portion_stat_by_tier()
73+
assert len(stats) > 1 or '__DEFAULT' not in stats
74+
75+
def test(self):
76+
"""Implements https://github.com/ydb-platform/ydb/issues/13465"""
77+
test_dir = f"{self.ydb_client.database}/{self.test_name}"
78+
table_path = f"{test_dir}/table"
79+
secret_prefix = self.test_name
80+
access_key_id_secret_name = f"{secret_prefix}_key_id"
81+
access_key_secret_secret_name = f"{secret_prefix}_key_secret"
82+
eds_path = f"{test_dir}/{self.cold_bucket}"
83+
84+
# Expect empty buckets to avoid unintentional data deletion/modification
85+
if self.s3_client.get_bucket_stat(self.cold_bucket) != (0, 0):
86+
raise Exception("Bucket for cold data is not empty")
87+
88+
self.ydb_client.query(
89+
f"""
90+
CREATE TABLE `{table_path}` (
91+
ts Timestamp NOT NULL,
92+
s String,
93+
val Uint64,
94+
PRIMARY KEY(ts),
95+
)
96+
WITH (
97+
STORE = COLUMN,
98+
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = {self.n_shards}
99+
)
100+
"""
101+
)
102+
103+
table = ColumnTableHelper(self.ydb_client, table_path)
104+
table.set_fast_compaction()
105+
106+
self.column_types = ydb.BulkUpsertColumns()
107+
self.column_types.add_column("ts", ydb.PrimitiveType.Timestamp)
108+
self.column_types.add_column("s", ydb.PrimitiveType.String)
109+
self.column_types.add_column("val", ydb.PrimitiveType.Uint64)
110+
111+
logger.info(f"Table {table_path} created")
112+
113+
self.ydb_client.query(
114+
f"CREATE OBJECT {access_key_id_secret_name} (TYPE SECRET) WITH value='{self.s3_client.key_id}'"
115+
)
116+
self.ydb_client.query(
117+
f"CREATE OBJECT {access_key_secret_secret_name} (TYPE SECRET) WITH value='{self.s3_client.key_secret}'"
118+
)
119+
120+
self.ydb_client.query(
121+
f"""
122+
CREATE EXTERNAL DATA SOURCE `{eds_path}` WITH (
123+
SOURCE_TYPE="ObjectStorage",
124+
LOCATION="{self.s3_client.endpoint}/{self.cold_bucket}",
125+
AUTH_METHOD="AWS",
126+
AWS_ACCESS_KEY_ID_SECRET_NAME="{access_key_id_secret_name}",
127+
AWS_SECRET_ACCESS_KEY_SECRET_NAME="{access_key_secret_secret_name}",
128+
AWS_REGION="{self.s3_client.region}"
129+
)
130+
"""
131+
)
132+
133+
stmt = f"""
134+
ALTER TABLE `{table_path}` SET (TTL =
135+
Interval("PT1S") TO EXTERNAL DATA SOURCE `{eds_path}`
136+
ON ts
137+
)
138+
"""
139+
logger.info(stmt)
140+
self.ydb_client.query(stmt)
141+
142+
ts_start = int(datetime.datetime.now().timestamp() * 1000000)
143+
rows = 10000
144+
num_threads = 10
145+
assert rows % num_threads == 0
146+
chunk_size = rows // num_threads
147+
148+
# Write data
149+
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
150+
insert_futures = [
151+
executor.submit(
152+
self.write_data,
153+
table_path,
154+
ts_start + i * chunk_size,
155+
chunk_size,
156+
1,
157+
)
158+
for i in range(num_threads)
159+
]
160+
161+
concurrent.futures.wait(insert_futures)
162+
for future in insert_futures:
163+
future.result()
164+
165+
self.wait_eviction(table)
166+
assert self.total_values(table_path) == rows
167+
168+
# Update data
169+
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
170+
insert_futures = [
171+
executor.submit(
172+
self.write_data,
173+
table_path,
174+
ts_start + i * chunk_size,
175+
chunk_size,
176+
2,
177+
)
178+
for i in range(num_threads)
179+
]
180+
181+
concurrent.futures.wait(insert_futures)
182+
for future in insert_futures:
183+
future.result()
184+
185+
self.wait_eviction(table)
186+
assert self.total_values(table_path) == rows * 2
187+
188+
# Delete data
189+
self.ydb_client.query(f"delete from `{table_path}`")
190+
191+
assert not self.total_values(table_path)

ydb/tests/olap/ttl_tiering/ttl_delete_s3.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,8 @@ def test_data_unchanged_after_ttl_change(self):
6666
if self.s3_client.get_bucket_stat(medium_bucket) != (0, 0):
6767
raise Exception("Bucket for medium data is not empty")
6868

69-
self.ydb_client.query(f"""
69+
self.ydb_client.query(
70+
f"""
7071
CREATE TABLE `{table_path}` (
7172
ts Timestamp NOT NULL,
7273
s String,
@@ -229,7 +230,8 @@ def test_ttl_delete(self):
229230
if self.s3_client.get_bucket_stat(self.frozen_bucket) != (0, 0):
230231
raise Exception("Bucket for frozen data is not empty")
231232

232-
self.ydb_client.query(f"""
233+
self.ydb_client.query(
234+
f"""
233235
CREATE TABLE `{table_path}` (
234236
ts Timestamp NOT NULL,
235237
s String,

ydb/tests/olap/ttl_tiering/ttl_unavailable_s3.py

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@
22
import signal
33
import sys
44
import time
5+
import logging
56

6-
from .base import TllTieringTestBase
7+
from .base import TllTieringTestBase, ColumnTableHelper
8+
9+
logger = logging.getLogger(__name__)
710

811
ROWS_CHUNK_SIZE = 1000000
912
ROWS_CHUNKS_COUNT = 10
@@ -21,9 +24,15 @@ def test(self):
2124
v String,
2225
PRIMARY KEY(ts),
2326
)
24-
WITH (STORE = COLUMN)
27+
WITH (
28+
STORE = COLUMN,
29+
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 2
30+
)
2531
""")
2632

33+
table = ColumnTableHelper(self.ydb_client, f"{self.ydb_client.database}/table")
34+
table.set_fast_compaction()
35+
2736
self.s3_client.create_bucket(bucket_s3_name)
2837

2938
self.ydb_client.query(f"CREATE OBJECT s3_id (TYPE SECRET) WITH value = '{self.s3_client.key_id}'")
@@ -58,6 +67,9 @@ def upsert_chunk(i):
5867
SELECT * FROM AS_TABLE($rows_list);
5968
""")
6069

70+
def get_stat():
71+
return self.s3_client.get_bucket_stat(bucket_s3_name)[0]
72+
6173
for i in range(0, ROWS_CHUNKS_COUNT // 2):
6274
upsert_chunk(i)
6375

@@ -68,6 +80,8 @@ def upsert_chunk(i):
6880
)
6981
""")
7082

83+
assert self.wait_for(get_stat, 30), "initial eviction"
84+
7185
print("!!! simulating S3 hang up -- sending SIGSTOP", file=sys.stderr)
7286
os.kill(self.s3_pid, signal.SIGSTOP)
7387

@@ -76,13 +90,9 @@ def upsert_chunk(i):
7690
print("!!! simulating S3 recovery -- sending SIGCONT", file=sys.stderr)
7791
os.kill(self.s3_pid, signal.SIGCONT)
7892

79-
def get_stat():
80-
return self.s3_client.get_bucket_stat(bucket_s3_name)[0]
81-
82-
# stat_old = get_stat()
93+
stat_old = get_stat()
8394

8495
for i in range(ROWS_CHUNKS_COUNT // 2, ROWS_CHUNKS_COUNT):
8596
upsert_chunk(i)
8697

87-
# Uncomment after fixing https://github.com/ydb-platform/ydb/issues/13719
88-
# assert self.wait_for(lambda: get_stat() != stat_old, 120), "data distribution continuation"
98+
assert self.wait_for(lambda: get_stat() != stat_old, 120), "data distribution continuation"

0 commit comments

Comments
 (0)