Skip to content

Commit 38f14e7

Browse files
committed
Merge branch 'main' of https://github.com/ydb-platform/ydb into mergelibs-yql
2 parents 22ba6e9 + d4c693f commit 38f14e7

File tree

311 files changed

+6931
-3792
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

311 files changed

+6931
-3792
lines changed

.github/CODEOWNERS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
/ydb/library/yql/ @ydb-platform/yql
2020
/ydb/library/yql/dq @ydb-platform/yql @ydb-platform/qp
21+
/ydb/library/yql/dq/actors/common @ydb-platform/fq
2122
/ydb/library/yql/providers/common/http_gateway @ydb-platform/fq
2223
/ydb/library/yql/providers/common/db_id_async_resolver @ydb-platform/fq
2324
/ydb/library/yql/providers/common/pushdown @ydb-platform/fq

.github/config/muted_ya.txt

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,7 @@ ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleSplitsWithRestartsAfterWait
3131
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleSplitsWithRestartsWhenWait
3232
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleMergesWithRestartsAfterWait
3333
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleMergesWithRestartsWhenWait
34-
ydb/core/kqp/ut/olap KqpOlapIndexes.IndexesActualization
35-
ydb/core/kqp/ut/olap KqpOlapIndexes.IndexesInBS
36-
ydb/core/kqp/ut/olap KqpOlapIndexes.IndexesInLocalMetadata
37-
ydb/core/kqp/ut/olap KqpOlapIndexes.IndexesModificationError
34+
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.*
3835
ydb/core/kqp/ut/olap KqpOlapStatistics.StatsUsageWithTTL
3936
ydb/core/kqp/ut/olap KqpOlapWrite.TierDraftsGCWithRestart
4037
ydb/core/kqp/ut/olap [*/*] chunk chunk
@@ -159,7 +156,6 @@ ydb/tests/functional/restarts test_restarts.py.TestRestartSingleBlock42.test_res
159156
ydb/tests/functional/serializable test.py.test_local
160157
ydb/tests/functional/serverless test_serverless.py.test_database_with_disk_quotas[enable_alter_database_create_hive_first--false]
161158
ydb/tests/functional/serverless test_serverless.py.test_database_with_disk_quotas[enable_alter_database_create_hive_first--true]
162-
ydb/tests/functional/suite_tests test_postgres.py.TestPGSQL.test_sql_suite[plan-jointest/join2.test]
163159
ydb/tests/functional/tenants test_dynamic_tenants.py.test_create_and_drop_tenants[enable_alter_database_create_hive_first--false]
164160
ydb/tests/functional/tenants test_dynamic_tenants.py.test_create_and_drop_tenants[enable_alter_database_create_hive_first--true]
165161
ydb/tests/functional/tenants test_dynamic_tenants.py.test_create_and_drop_the_same_tenant2[enable_alter_database_create_hive_first--false]

ydb/core/base/blobstorage.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -738,6 +738,9 @@ struct TEvBlobStorage {
738738
EvHugePreCompactResult,
739739
EvPDiskMetadataLoaded,
740740
EvBalancingSendPartsOnMain,
741+
EvHugeAllocateSlots,
742+
EvHugeAllocateSlotsResult,
743+
EvHugeDropAllocatedSlots,
741744

742745
EvYardInitResult = EvPut + 9 * 512, /// 268 636 672
743746
EvLogResult,

ydb/core/blobstorage/backpressure/queue.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,10 @@ TBlobStorageQueue::~TBlobStorageQueue() {
4646
for (TItemList *queue : {&Queues.Waiting, &Queues.InFlight, &Queues.Unused}) {
4747
for (TItem& item : *queue) {
4848
SetItemQueue(item, EItemQueue::NotSet);
49+
--*QueueSize;
4950
}
5051
}
52+
*QueueWindowSize -= WindowSize;
5153
}
5254

5355
void TBlobStorageQueue::UpdateCostModel(TInstant now, const NKikimrBlobStorage::TVDiskCostSettings& settings,
Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
#include <ydb/core/blobstorage/ut_blobstorage/lib/env.h>
2+
#include <ydb/core/blobstorage/ut_blobstorage/lib/common.h>
3+
#include <ydb/core/util/lz4_data_generator.h>
4+
#include <ydb/core/erasure/erasure.h>
5+
6+
namespace {
7+
8+
class THugeBlobTest {
9+
TEnvironmentSetup Env;
10+
TTestActorSystem& Runtime;
11+
ui32 DataSize = 32_KB;
12+
TString Data = FastGenDataForLZ4(DataSize, 1 /*seed*/);
13+
TLogoBlobID BlobId{1000, 1, 1, 0, DataSize, 0};
14+
ui32 GroupId;
15+
TIntrusivePtr<TBlobStorageGroupInfo> Info;
16+
TBlobStorageGroupType GType;
17+
TBlobStorageGroupInfo::TVDiskIds VDiskIds;
18+
TBlobStorageGroupInfo::TServiceIds ServiceIds;
19+
std::vector<TActorId> PutQueueIds;
20+
std::vector<TActorId> GetQueueIds;
21+
std::vector<TRope> Parts;
22+
ui8 TestSubgroupNodeId = 6;
23+
24+
public:
25+
THugeBlobTest()
26+
: Env{{
27+
.Erasure = TBlobStorageGroupType::Erasure4Plus2Block,
28+
.UseFakeConfigDispatcher = true,
29+
}}
30+
, Runtime(*Env.Runtime)
31+
{
32+
Env.CreateBoxAndPool(1, 1);
33+
34+
std::vector<ui32> groups = Env.GetGroups();
35+
UNIT_ASSERT(!groups.empty());
36+
GroupId = groups.front();
37+
38+
Info = Env.GetGroupInfo(GroupId);
39+
GType = Info->Type;
40+
41+
Info->PickSubgroup(BlobId.Hash(), &VDiskIds, &ServiceIds);
42+
43+
for (const TVDiskID& vdiskId : VDiskIds) {
44+
PutQueueIds.push_back(Env.CreateQueueActor(vdiskId, NKikimrBlobStorage::PutTabletLog, 0));
45+
GetQueueIds.push_back(Env.CreateQueueActor(vdiskId, NKikimrBlobStorage::GetFastRead, 0));
46+
}
47+
48+
Parts.resize(GType.TotalPartCount());
49+
const bool success = ErasureSplit(TErasureType::CrcModeNone, GType, TRope(Data), Parts);
50+
UNIT_ASSERT(success);
51+
52+
// for (ui32 i = 0; i < 6; ++i) { // put main parts
53+
// Put(i, i);
54+
// }
55+
}
56+
57+
void Put(ui8 subgroupNodeId, ui8 partIdx) {
58+
Cerr << "writing partIdx# " << (int)partIdx << " to " << (int)subgroupNodeId << Endl;
59+
const TActorId queueActorId = PutQueueIds[subgroupNodeId];
60+
auto edge = Runtime.AllocateEdgeActor(queueActorId.NodeId(), __FILE__, __LINE__);
61+
const TLogoBlobID putId(BlobId, partIdx + 1);
62+
Runtime.Send(new IEventHandle(queueActorId, edge, new TEvBlobStorage::TEvVPut(putId, Parts[partIdx],
63+
VDiskIds[subgroupNodeId], false, nullptr, TInstant::Max(), NKikimrBlobStorage::EPutHandleClass::TabletLog)),
64+
queueActorId.NodeId());
65+
auto res = Env.WaitForEdgeActorEvent<TEvBlobStorage::TEvVPutResult>(edge);
66+
auto& record = res->Get()->Record;
67+
UNIT_ASSERT_VALUES_EQUAL(record.GetStatus(), NKikimrProto::OK);
68+
}
69+
70+
void SwitchHugeBlobSize(bool small) {
71+
Cerr << "small blob# " << small << Endl;
72+
73+
auto ev = std::make_unique<NConsole::TEvConsole::TEvConfigNotificationRequest>();
74+
auto& record = ev->Record;
75+
auto& config = *record.MutableConfig();
76+
auto& bsConfig = *config.MutableBlobStorageConfig();
77+
auto& perf = *bsConfig.MutableVDiskPerformanceSettings();
78+
auto& type = *perf.AddVDiskTypes();
79+
type.SetPDiskType(NKikimrBlobStorage::EPDiskType::ROT);
80+
type.SetMinHugeBlobSizeInBytes(small ? 4096 : 524288);
81+
82+
const ui32 serviceNodeId = ServiceIds[TestSubgroupNodeId].NodeId();
83+
TActorId edge = Runtime.AllocateEdgeActor(serviceNodeId, __FILE__, __LINE__);
84+
Runtime.Send(new IEventHandle(NConsole::MakeConfigsDispatcherID(serviceNodeId), edge, ev.release()),
85+
serviceNodeId);
86+
Env.WaitForEdgeActorEvent<NConsole::TEvConsole::TEvConfigNotificationResponse>(edge);
87+
}
88+
89+
void CompactFresh() {
90+
Cerr << "compacting fresh" << Endl;
91+
92+
const TActorId serviceId = ServiceIds[TestSubgroupNodeId];
93+
const ui32 serviceNodeId = serviceId.NodeId();
94+
TActorId edge = Runtime.AllocateEdgeActor(serviceNodeId, __FILE__, __LINE__);
95+
Runtime.Send(new IEventHandle(serviceId, edge, TEvCompactVDisk::Create(EHullDbType::LogoBlobs,
96+
TEvCompactVDisk::EMode::FRESH_ONLY)), serviceNodeId);
97+
Env.WaitForEdgeActorEvent<TEvCompactVDiskResult>(edge);
98+
}
99+
100+
void CompactLevels() {
101+
Cerr << "compacting levels" << Endl;
102+
103+
const TActorId serviceId = ServiceIds[TestSubgroupNodeId];
104+
const ui32 serviceNodeId = serviceId.NodeId();
105+
TActorId edge = Runtime.AllocateEdgeActor(serviceNodeId, __FILE__, __LINE__);
106+
Runtime.Send(new IEventHandle(serviceId, edge, TEvCompactVDisk::Create(EHullDbType::LogoBlobs,
107+
TEvCompactVDisk::EMode::FULL)), serviceNodeId);
108+
Env.WaitForEdgeActorEvent<TEvCompactVDiskResult>(edge);
109+
}
110+
111+
void PutPartsInMask(ui32 mask) {
112+
for (ui32 i = 0; i < GType.TotalPartCount(); ++i) {
113+
if (mask & (1 << i)) {
114+
Put(TestSubgroupNodeId, i);
115+
}
116+
}
117+
}
118+
119+
void CheckPartsInPlace(ui32 mask, ui32 numDistinctSST) {
120+
Cerr << "checking parts in place mask# " << mask << Endl;
121+
122+
std::set<ui64> sstIds;
123+
const TActorId serviceId = ServiceIds[TestSubgroupNodeId];
124+
auto captureRes = Env.SyncQuery<TEvBlobStorage::TEvCaptureVDiskLayoutResult,
125+
TEvBlobStorage::TEvCaptureVDiskLayout>(serviceId);
126+
for (const auto& item : captureRes->Layout) {
127+
using T = TEvBlobStorage::TEvCaptureVDiskLayoutResult;
128+
if (item.Database == T::EDatabase::LogoBlobs) {
129+
Cerr << item.ToString() << Endl;
130+
if (item.RecordType == T::ERecordType::IndexRecord && item.SstId) {
131+
sstIds.insert(item.SstId);
132+
}
133+
}
134+
}
135+
UNIT_ASSERT_VALUES_EQUAL(sstIds.size(), numDistinctSST);
136+
137+
auto getQuery = TEvBlobStorage::TEvVGet::CreateExtremeDataQuery(VDiskIds[TestSubgroupNodeId],
138+
TInstant::Max(), NKikimrBlobStorage::EGetHandleClass::FastRead, {}, {}, {{BlobId}});
139+
const TActorId queueId = GetQueueIds[TestSubgroupNodeId];
140+
const ui32 queueNodeId = queueId.NodeId();
141+
auto edge = Runtime.AllocateEdgeActor(queueId.NodeId(), __FILE__, __LINE__);
142+
Runtime.Send(new IEventHandle(queueId, edge, getQuery.release()), queueNodeId);
143+
auto res = Env.WaitForEdgeActorEvent<TEvBlobStorage::TEvVGetResult>(edge);
144+
auto& record = res->Get()->Record;
145+
146+
UNIT_ASSERT_VALUES_EQUAL(record.GetStatus(), NKikimrProto::OK);
147+
if (!mask) {
148+
UNIT_ASSERT_VALUES_EQUAL(record.ResultSize(), 1);
149+
auto& result = record.GetResult(0);
150+
UNIT_ASSERT_VALUES_EQUAL(LogoBlobIDFromLogoBlobID(result.GetBlobID()), BlobId);
151+
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NKikimrProto::NODATA);
152+
} else {
153+
for (const auto& result : record.GetResult()) {
154+
const TLogoBlobID& id = LogoBlobIDFromLogoBlobID(result.GetBlobID());
155+
UNIT_ASSERT_VALUES_EQUAL(id.FullID(), BlobId);
156+
UNIT_ASSERT(id.PartId());
157+
const ui8 partIdx = id.PartId() - 1;
158+
159+
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NKikimrProto::OK);
160+
UNIT_ASSERT_EQUAL(res->Get()->GetBlobData(result), Parts[partIdx]);
161+
UNIT_ASSERT(mask & (1 << partIdx));
162+
163+
mask &= ~(1 << partIdx);
164+
}
165+
}
166+
167+
UNIT_ASSERT_VALUES_EQUAL(mask, 0);
168+
}
169+
170+
void RunTest(ui32 fresh1, ui32 fresh2, ui32 huge1, ui32 huge2, bool targetHuge, ui32 fresh3, ui32 huge3,
171+
bool targetHuge2, bool targetHuge3) {
172+
if (fresh1 || fresh2) {
173+
SwitchHugeBlobSize(false);
174+
PutPartsInMask(fresh1);
175+
PutPartsInMask(fresh2);
176+
CheckPartsInPlace(fresh1 | fresh2, 0);
177+
}
178+
if (huge1 || huge2) {
179+
SwitchHugeBlobSize(true);
180+
PutPartsInMask(huge1);
181+
PutPartsInMask(huge2);
182+
CheckPartsInPlace(huge1 | huge2 | fresh1 | fresh2, 0);
183+
}
184+
const ui32 baseMask = fresh1 | fresh2 | huge1 | huge2;
185+
if (baseMask) {
186+
SwitchHugeBlobSize(targetHuge);
187+
CompactFresh();
188+
CheckPartsInPlace(baseMask, 1);
189+
}
190+
if (const ui32 extraMask = fresh3 | huge3) {
191+
if (fresh3) {
192+
SwitchHugeBlobSize(false);
193+
PutPartsInMask(fresh3);
194+
}
195+
if (huge3) {
196+
SwitchHugeBlobSize(true);
197+
PutPartsInMask(huge3);
198+
}
199+
SwitchHugeBlobSize(targetHuge2);
200+
CompactFresh();
201+
CheckPartsInPlace(baseMask | extraMask, !!baseMask + !!extraMask);
202+
203+
SwitchHugeBlobSize(targetHuge3);
204+
CompactLevels();
205+
CheckPartsInPlace(baseMask | extraMask, 1);
206+
}
207+
}
208+
209+
static void CompactionTest() {
210+
for (ui32 fresh1 = 0; fresh1 < 8; ++fresh1) {
211+
for (ui32 fresh2 = 0; fresh2 < 2; ++fresh2) {
212+
for (ui32 huge1 = 0; huge1 < 4; ++huge1) {
213+
for (ui32 huge2 = 0; huge2 < 2; ++huge2) {
214+
for (bool targetHuge : {true, false}) {
215+
for (ui32 fresh3 = 0; fresh3 < 4; ++fresh3) {
216+
for (ui32 huge3 = 0; huge3 < 2; ++huge3) {
217+
for (bool targetHuge2 : {true, false}) {
218+
for (bool targetHuge3 : {true, false}) {
219+
Cerr << "fresh1# " << fresh1 << " fresh2# " << fresh2 << " huge1# " << huge1
220+
<< " huge2# " << huge2 << " targetHuge# " << targetHuge
221+
<< " fresh3# " << fresh3
222+
<< " huge3# " << huge3
223+
<< " targetHuge2# " << targetHuge2
224+
<< " targetHuge3# " << targetHuge3
225+
<< Endl;
226+
THugeBlobTest test;
227+
test.RunTest(fresh1, fresh2, huge1, huge2, targetHuge, fresh3, huge3, targetHuge2, targetHuge3);
228+
}}}}}}}}}
229+
}
230+
};
231+
232+
}
233+
234+
Y_UNIT_TEST_SUITE(HugeBlobOnlineSizeChange) {
235+
236+
Y_UNIT_TEST(Compaction) {
237+
THugeBlobTest::CompactionTest();
238+
}
239+
240+
}

ydb/core/blobstorage/ut_blobstorage/lib/defs.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include <library/cpp/testing/unittest/registar.h>
2929
#include <util/system/rusage.h>
3030
#include <util/random/fast.h>
31+
#include <util/stream/null.h>
3132

3233
using namespace NActors;
3334
using namespace NKikimr;

0 commit comments

Comments
 (0)