Skip to content

Commit 55be93b

Browse files
committed
expected portion size has been added for zero layer (#19111)
1 parent 673dab6 commit 55be93b

File tree

4 files changed

+228
-4
lines changed

4 files changed

+228
-4
lines changed

ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,11 @@ class TZeroLevelPortions: public IPortionsLevel {
7070

7171
virtual TCompactionTaskData DoGetOptimizationTask() const override;
7272

73+
public:
74+
ui64 GetExpectedPortionSize() const {
75+
return ExpectedBlobsSize;
76+
}
77+
7378
public:
7479
TZeroLevelPortions(const ui32 levelIdx, const std::shared_ptr<IPortionsLevel>& nextLevel, const TLevelCounters& levelCounters,
7580
const std::shared_ptr<IOverloadChecker>& overloadChecker, const TDuration durationToDrop, const ui64 expectedBlobsSize,

ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ TOptimizerPlanner::TOptimizerPlanner(const TInternalPathId pathId, const std::sh
2121
, Levels(std::move(levels))
2222
, StoragesManager(storagesManager)
2323
, PrimaryKeysSchema(primaryKeysSchema) {
24-
2524
RefreshWeights();
2625
}
2726

@@ -41,8 +40,9 @@ std::shared_ptr<TColumnEngineChanges> TOptimizerPlanner::DoGetOptimizationTask(
4140
// result->AddMovePortions(data.GetMovePortions());
4241
// }
4342
result->SetTargetCompactionLevel(data.GetTargetCompactionLevel());
44-
auto levelPortions = std::dynamic_pointer_cast<TOneLayerPortions>(Levels[data.GetTargetCompactionLevel()]);
45-
if (levelPortions) {
43+
if (auto levelPortions = std::dynamic_pointer_cast<TOneLayerPortions>(Levels[data.GetTargetCompactionLevel()])) {
44+
result->SetPortionExpectedSize(levelPortions->GetExpectedPortionSize());
45+
} else if (auto levelPortions = std::dynamic_pointer_cast<TZeroLevelPortions>(Levels[level->GetLevelId()])) {
4646
result->SetPortionExpectedSize(levelPortions->GetExpectedPortionSize());
4747
}
4848
auto positions = data.GetCheckPositions(PrimaryKeysSchema, level->GetLevelId() > 1);
Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
import logging
2+
from .base import TllTieringTestBase
3+
from ydb.tests.olap.common.column_table_helper import ColumnTableHelper
4+
import concurrent
5+
import random
6+
import datetime
7+
import ydb
8+
9+
logger = logging.getLogger(__name__)
10+
11+
12+
class TestPortionSizeTtl(TllTieringTestBase):
13+
14+
@classmethod
15+
def setup_class(cls):
16+
super(TestPortionSizeTtl, cls).setup_class()
17+
18+
def get_row_count_by_date(self, table_path: str, past_days: int) -> int:
19+
return self.ydb_client.query(
20+
f"SELECT count(*) as Rows from `{table_path}` WHERE ts < CurrentUtcTimestamp() - DateTime::IntervalFromDays({past_days})"
21+
)[0].rows[0]["Rows"]
22+
23+
def create_table(self, table_path):
24+
self.ydb_client.query(
25+
f"""
26+
CREATE TABLE `{table_path}` (
27+
ts Timestamp NOT NULL,
28+
s String,
29+
val Uint64,
30+
PRIMARY KEY(ts),
31+
)
32+
WITH (
33+
STORE = COLUMN,
34+
PARTITION_COUNT = 1
35+
)
36+
"""
37+
)
38+
39+
def drop_table(self, table_path):
40+
self.ydb_client.query(
41+
f"""
42+
DROP TABLE `{table_path}`
43+
"""
44+
)
45+
46+
def bulk_upsert(self, table_path):
47+
self.column_types = ydb.BulkUpsertColumns()
48+
self.column_types.add_column("ts", ydb.PrimitiveType.Timestamp)
49+
self.column_types.add_column("s", ydb.PrimitiveType.String)
50+
self.column_types.add_column("val", ydb.PrimitiveType.Uint64)
51+
52+
ts_start = int(datetime.datetime.now().timestamp() * 1000000)
53+
rows = 10000
54+
num_threads = 10
55+
assert rows % num_threads == 0
56+
chunk_size = rows // num_threads
57+
# Write data
58+
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
59+
insert_futures = [
60+
executor.submit(
61+
self.write_data,
62+
table_path,
63+
ts_start + i * chunk_size,
64+
chunk_size,
65+
1,
66+
)
67+
for i in range(num_threads)
68+
]
69+
70+
concurrent.futures.wait(insert_futures)
71+
for future in insert_futures:
72+
future.result()
73+
74+
# Update data
75+
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
76+
insert_futures = [
77+
executor.submit(
78+
self.write_data,
79+
table_path,
80+
ts_start + i * chunk_size,
81+
chunk_size,
82+
2,
83+
)
84+
for i in range(num_threads)
85+
]
86+
87+
concurrent.futures.wait(insert_futures)
88+
for future in insert_futures:
89+
future.result()
90+
91+
def set_compaction(self, table_path, total_level_size):
92+
self.ydb_client.query(
93+
f"""
94+
ALTER OBJECT `{table_path}` (TYPE TABLE) SET (ACTION=UPSERT_OPTIONS, `COMPACTION_PLANNER.CLASS_NAME`=`lc-buckets`, `COMPACTION_PLANNER.FEATURES`=`
95+
{{"levels" : [{{"class_name" : "Zero", "expected_blobs_size" : {total_level_size}}},
96+
{{"class_name" : "Zero"}}]}}`);
97+
"""
98+
)
99+
100+
def write_data(
101+
self,
102+
table: str,
103+
timestamp_from_ms: int,
104+
rows: int,
105+
value: int = 1,
106+
):
107+
chunk_size = 100
108+
while rows:
109+
current_chunk_size = min(chunk_size, rows)
110+
data = [
111+
{
112+
"ts": timestamp_from_ms + i,
113+
"s": random.randbytes(1000),
114+
"val": value,
115+
}
116+
for i in range(current_chunk_size)
117+
]
118+
self.ydb_client.bulk_upsert(
119+
table,
120+
self.column_types,
121+
data,
122+
)
123+
timestamp_from_ms += current_chunk_size
124+
rows -= current_chunk_size
125+
assert rows >= 0
126+
127+
def set_tiering(self, table_path, bucket):
128+
secret_prefix = self.test_name
129+
access_key_id_secret_name = f"{secret_prefix}_key_id"
130+
access_key_secret_secret_name = f"{secret_prefix}_key_secret"
131+
test_dir = f"{self.ydb_client.database}/{self.test_name}"
132+
cold_eds_path = f"{test_dir}/{bucket}"
133+
self.ydb_client.query(
134+
f"UPSERT OBJECT {access_key_id_secret_name} (TYPE SECRET) WITH value='{self.s3_client.key_id}'"
135+
)
136+
self.ydb_client.query(
137+
f"UPSERT OBJECT {access_key_secret_secret_name} (TYPE SECRET) WITH value='{self.s3_client.key_secret}'"
138+
)
139+
140+
self.ydb_client.query(
141+
f"""
142+
CREATE EXTERNAL DATA SOURCE `{cold_eds_path}` WITH (
143+
SOURCE_TYPE="ObjectStorage",
144+
LOCATION="{self.s3_client.endpoint}/{bucket}",
145+
AUTH_METHOD="AWS",
146+
AWS_ACCESS_KEY_ID_SECRET_NAME="{access_key_id_secret_name}",
147+
AWS_SECRET_ACCESS_KEY_SECRET_NAME="{access_key_secret_secret_name}",
148+
AWS_REGION="{self.s3_client.region}"
149+
)
150+
"""
151+
)
152+
153+
stmt = f"""
154+
ALTER TABLE `{table_path}` SET (TTL =
155+
Interval("PT1S") TO EXTERNAL DATA SOURCE `{cold_eds_path}`
156+
ON ts
157+
)
158+
"""
159+
160+
self.ydb_client.query(stmt)
161+
162+
def wait_tier_data(self, table_path, bucket):
163+
self.table = ColumnTableHelper(self.ydb_client, table_path)
164+
165+
def data_distributes_across_tiers():
166+
bucket_stat = self.s3_client.get_bucket_stat(bucket)
167+
logger.info(
168+
f"portions: {self.table.get_portion_stat_by_tier()}, blobs: {self.table.get_blob_stat_by_tier()}, cold bucket stat: {bucket_stat}"
169+
)
170+
return bucket_stat[0] != 0
171+
172+
assert self.wait_for(lambda: data_distributes_across_tiers(), 200), "Data eviction has not been started"
173+
174+
def validate_portion_tier_size(self, table_path, left, right):
175+
results = self.ydb_client.query(
176+
f"select * from `{table_path}/.sys/primary_index_portion_stats` WHERE TierName != '__DEFAULT'"
177+
)
178+
for result_set in results:
179+
for row in result_set.rows:
180+
assert left <= row["ColumnBlobBytes"] and row["ColumnBlobBytes"] <= right, row
181+
182+
def test_check_portion_size(self):
183+
self.test_name = 'check_portion_size_s3_tiering'
184+
cold_bucket_1 = 'cold1'
185+
cold_bucket_2 = 'cold2'
186+
test_dir = f"{self.ydb_client.database}/{self.test_name}"
187+
table_path1 = f"{test_dir}/table1"
188+
table_path2 = f"{test_dir}/table2"
189+
190+
self.s3_client.create_bucket(cold_bucket_1)
191+
192+
# Expect empty buckets to avoid unintentional data deletion/modification
193+
if self.s3_client.get_bucket_stat(cold_bucket_1) != (0, 0):
194+
raise Exception("Bucket for cold data is not empty")
195+
196+
self.create_table(table_path1)
197+
self.set_compaction(table_path1, 3000000)
198+
199+
self.set_tiering(table_path1, cold_bucket_1)
200+
201+
self.bulk_upsert(table_path1)
202+
self.wait_tier_data(table_path1, cold_bucket_1)
203+
self.validate_portion_tier_size(table_path1, 0.5 * 3000000, 2 * 3000000)
204+
205+
self.s3_client.create_bucket(cold_bucket_2)
206+
207+
# Expect empty buckets to avoid unintentional data deletion/modification
208+
if self.s3_client.get_bucket_stat(cold_bucket_2) != (0, 0):
209+
raise Exception("Bucket for cold data is not empty")
210+
211+
self.create_table(table_path2)
212+
self.set_compaction(table_path2, 1500000)
213+
214+
self.set_tiering(table_path2, cold_bucket_2)
215+
216+
self.bulk_upsert(table_path2)
217+
self.wait_tier_data(table_path2, cold_bucket_2)
218+
self.validate_portion_tier_size(table_path2, 0.5 * 1500000, 2 * 1500000)

ydb/tests/olap/ttl_tiering/ya.make

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@ ENV(YDB_ADDITIONAL_LOG_CONFIGS="TX_TIERING:DEBUG")
66
TEST_SRCS(
77
base.py
88
data_correctness.py
9+
data_migration_when_alter_ttl.py
910
ttl_delete_s3.py
11+
ttl_portion_size.py
1012
ttl_unavailable_s3.py
11-
data_migration_when_alter_ttl.py
1213
unstable_connection.py
1314
)
1415

0 commit comments

Comments
 (0)