Skip to content

Commit 696b497

Browse files
zverevgenyTony-Romanovivanmorozov333
authored
Merge from 24-2-4-analytics-2 (#6774)
Co-authored-by: Tony-Romanov <150126326+Tony-Romanov@users.noreply.github.com> Co-authored-by: ivanmorozov333 <ivanmorozov@ydb.tech>
1 parent 0abd28e commit 696b497

File tree

16 files changed

+79
-61
lines changed

16 files changed

+79
-61
lines changed

ydb/core/formats/arrow/size_calcer.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -241,11 +241,12 @@ ui64 GetArrayDataSize(const std::shared_ptr<arrow::Array>& column) {
241241
}
242242

243243
NKikimr::NArrow::TSerializedBatch TSerializedBatch::Build(std::shared_ptr<arrow::RecordBatch> batch, const TBatchSplitttingContext& context) {
244-
std::optional<TFirstLastSpecialKeys> specialKeys;
244+
std::optional<TString> specialKeys;
245245
if (context.GetFieldsForSpecialKeys().size()) {
246-
specialKeys = TFirstLastSpecialKeys(batch, context.GetFieldsForSpecialKeys());
246+
specialKeys = TFirstLastSpecialKeys(batch, context.GetFieldsForSpecialKeys()).SerializeToString();
247247
}
248-
return TSerializedBatch(NArrow::SerializeSchema(*batch->schema()), NArrow::SerializeBatchNoCompression(batch), batch->num_rows(), NArrow::GetBatchDataSize(batch), specialKeys);
248+
return TSerializedBatch(NArrow::SerializeSchema(*batch->schema()), NArrow::SerializeBatchNoCompression(batch), batch->num_rows(),
249+
NArrow::GetBatchDataSize(batch), specialKeys);
249250
}
250251

251252
bool TSerializedBatch::BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch, const TBatchSplitttingContext& context, std::optional<TSerializedBatch>& sbL, std::optional<TSerializedBatch>& sbR, TString* errorMessage) {

ydb/core/formats/arrow/size_calcer.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,13 +72,13 @@ class TSerializedBatch {
7272
YDB_READONLY_DEF(TString, Data);
7373
YDB_READONLY(ui32, RowsCount, 0);
7474
YDB_READONLY(ui32, RawBytes, 0);
75-
std::optional<TFirstLastSpecialKeys> SpecialKeys;
75+
std::optional<TString> SpecialKeys;
7676
public:
7777
size_t GetSize() const {
7878
return Data.size();
7979
}
8080

81-
const TFirstLastSpecialKeys& GetSpecialKeysSafe() const {
81+
const TString& GetSpecialKeysSafe() const {
8282
AFL_VERIFY(SpecialKeys);
8383
return *SpecialKeys;
8484
}
@@ -93,7 +93,7 @@ class TSerializedBatch {
9393
static bool BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch, const TBatchSplitttingContext& context, std::optional<TSerializedBatch>& sbL, std::optional<TSerializedBatch>& sbR, TString* errorMessage);
9494
static TSerializedBatch Build(std::shared_ptr<arrow::RecordBatch> batch, const TBatchSplitttingContext& context);
9595

96-
TSerializedBatch(TString&& schemaData, TString&& data, const ui32 rowsCount, const ui32 rawBytes, const std::optional<TFirstLastSpecialKeys>& specialKeys)
96+
TSerializedBatch(TString&& schemaData, TString&& data, const ui32 rowsCount, const ui32 rawBytes, const std::optional<TString>& specialKeys)
9797
: SchemaData(schemaData)
9898
, Data(data)
9999
, RowsCount(rowsCount)

ydb/core/kqp/ut/olap/kqp_olap_ut.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -940,7 +940,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
940940
R"(`resource_id` = "10001")",
941941
R"(`resource_id` != "10001")",
942942
R"("XXX" == "YYY" OR `resource_id` != "10001")",
943-
R"(`resource_id` != "10001" XOR "XXX" == "YYY")",
944943
R"(`level` = 1)",
945944
R"(`level` = Int8("1"))",
946945
R"(`level` = Int16("1"))",

ydb/core/tx/columnshard/background_controller.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ bool TBackgroundController::StartCompaction(const NOlap::TPlanCompactionInfo& in
1111
void TBackgroundController::CheckDeadlines() {
1212
for (auto&& i : ActiveCompactionInfo) {
1313
if (TMonotonic::Now() - i.second.GetStartTime() > NOlap::TCompactionLimits::CompactionTimeout) {
14-
AFL_EMERG(NKikimrServices::TX_COLUMNSHARD)("event", "deadline_compaction");
14+
AFL_CRIT(NKikimrServices::TX_COLUMNSHARD)("event", "deadline_compaction");
1515
Y_DEBUG_ABORT_UNLESS(false);
1616
}
1717
}
@@ -20,7 +20,7 @@ void TBackgroundController::CheckDeadlines() {
2020
void TBackgroundController::CheckDeadlinesIndexation() {
2121
for (auto&& i : ActiveIndexationTasks) {
2222
if (TMonotonic::Now() - i.second > NOlap::TCompactionLimits::CompactionTimeout) {
23-
AFL_EMERG(NKikimrServices::TX_COLUMNSHARD)("event", "deadline_compaction")("task_id", i.first);
23+
AFL_CRIT(NKikimrServices::TX_COLUMNSHARD)("event", "deadline_indexation")("task_id", i.first);
2424
Y_DEBUG_ABORT_UNLESS(false);
2525
}
2626
}

ydb/core/tx/columnshard/blobs_action/bs/blob_manager.cpp

Lines changed: 43 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -233,16 +233,6 @@ std::shared_ptr<NBlobOperations::NBlobStorage::TGCTask> TBlobManager::BuildGCTas
233233
const ui32 channelIdx = BLOB_CHANNEL;
234234
NBlobOperations::NBlobStorage::TGCTask::TGCListsByGroup perGroupGCListsInFlight;
235235
// Clear all possibly not kept trash in channel's groups: create an event for each group
236-
if (FirstGC) {
237-
FirstGC = false;
238-
239-
// TODO: we need only actual channel history here
240-
const auto& channelHistory = TabletInfo->ChannelInfo(channelIdx)->History;
241-
242-
for (auto it = channelHistory.begin(); it != channelHistory.end(); ++it) {
243-
perGroupGCListsInFlight[it->GroupID];
244-
}
245-
}
246236

247237
static const ui32 blobsGCCountLimit = 500000;
248238

@@ -270,27 +260,22 @@ std::shared_ptr<NBlobOperations::NBlobStorage::TGCTask> TBlobManager::BuildGCTas
270260
BlobsManagerCounters.OnCollectDropExplicit(logoBlobId.BlobSize());
271261
gl.DontKeepList.insert(logoBlobId);
272262
}
273-
if (extractedToRemoveFromDB.GetSize() >= blobsGCCountLimit) {
274-
newCollectGenSteps.clear();
275-
}
276263
}
277264

278265

279266
std::deque<TUnifiedBlobId> keepsToErase;
280-
std::deque<TUnifiedBlobId> deleteIndex;
281-
if (extractedToRemoveFromDB.GetSize() + keepsToErase.size() < blobsGCCountLimit) {
282-
deleteIndex = BlobsToDelete.GroupByGenStep();
283-
}
267+
std::deque<TUnifiedBlobId> deleteIndex = BlobsToDelete.GroupByGenStep();
284268
for (auto&& newCollectGenStep : newCollectGenSteps) {
285-
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "PreparePerGroupGCRequests")("gen", std::get<0>(newCollectGenStep))("step", std::get<1>(newCollectGenStep));
269+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "PreparePerGroupGCRequests")("gen_step", newCollectGenStep);
286270
BlobsManagerCounters.OnNewCollectStep(std::get<0>(newCollectGenStep), std::get<1>(newCollectGenStep));
287271

288272
// Make per-group Keep/DontKeep lists
289273

290274
{
291275
// Add all blobs to keep
292276
auto keepBlobIt = BlobsToKeep.begin();
293-
for (; keepBlobIt != BlobsToKeep.end(); ++keepBlobIt) {
277+
278+
for (; keepBlobIt != BlobsToKeep.end();) {
294279
TGenStep genStep{keepBlobIt->Generation(), keepBlobIt->Step()};
295280
AFL_VERIFY(genStep > LastCollectedGenStep);
296281
if (genStep > newCollectGenStep) {
@@ -300,16 +285,17 @@ std::shared_ptr<NBlobOperations::NBlobStorage::TGCTask> TBlobManager::BuildGCTas
300285
perGroupGCListsInFlight[blobGroup].KeepList.insert(*keepBlobIt);
301286
keepsToErase.emplace_back(TUnifiedBlobId(blobGroup, *keepBlobIt));
302287
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("to_keep_gc", *keepBlobIt);
303-
if (extractedToRemoveFromDB.GetSize() + keepsToErase.size() > blobsGCCountLimit) {
304-
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "a lot of blobs to gc")("to_remove", extractedToRemoveFromDB.GetSize())("keeps_to_erase", keepsToErase.size())("limit", blobsGCCountLimit);
288+
keepBlobIt = BlobsToKeep.erase(keepBlobIt);
289+
if (keepsToErase.size() > blobsGCCountLimit) {
290+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "a lot of blobs to gc")("to_remove", extractedToRemoveFromDB.GetSize())("keeps_to_erase", keepsToErase.size())("limit", blobsGCCountLimit)("has_border", !!CollectGenStepInFlight)("border", CollectGenStepInFlight.value_or(LastCollectedGenStep));
305291
break;
306292
}
307293
}
308-
if (extractedToRemoveFromDB.GetSize() + keepsToErase.size() > blobsGCCountLimit) {
309-
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "a lot of blobs to gc")("to_remove", extractedToRemoveFromDB.GetSize())("keeps_to_erase", keepsToErase.size())("limit", blobsGCCountLimit);
294+
AFL_VERIFY(!CollectGenStepInFlight || *CollectGenStepInFlight <= newCollectGenStep);
295+
if (keepsToErase.size() > blobsGCCountLimit) {
296+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "a lot of blobs to gc")("to_remove", extractedToRemoveFromDB.GetSize())("keeps_to_erase", keepsToErase.size())("limit", blobsGCCountLimit)("has_border", !!CollectGenStepInFlight)("border", CollectGenStepInFlight.value_or(LastCollectedGenStep));
310297
break;
311298
}
312-
BlobsToKeep.erase(BlobsToKeep.begin(), keepBlobIt);
313299
BlobsManagerCounters.OnBlobsKeep(BlobsToKeep);
314300

315301
TTabletsByBlob extractedSelf;
@@ -320,7 +306,7 @@ std::shared_ptr<NBlobOperations::NBlobStorage::TGCTask> TBlobManager::BuildGCTas
320306
break;
321307
}
322308
BlobsToDelete.ExtractBlobTo(deleteIndex.front(), extractedSelf);
323-
if (extractedToRemoveFromDB.GetSize() + extractedSelf.GetSize() + keepsToErase.size() > blobsGCCountLimit) {
309+
if (extractedToRemoveFromDB.GetSize() + extractedSelf.GetSize() > blobsGCCountLimit) {
324310
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "a lot of blobs to gc")("to_remove", extractedToRemoveFromDB.GetSize())("keeps_to_erase", keepsToErase.size())("limit", blobsGCCountLimit);
325311
break;
326312
}
@@ -358,20 +344,35 @@ std::shared_ptr<NBlobOperations::NBlobStorage::TGCTask> TBlobManager::BuildGCTas
358344
}
359345
BlobsManagerCounters.OnBlobsDelete(BlobsToDelete);
360346
}
361-
CollectGenStepInFlight = newCollectGenStep;
362-
if (extractedToRemoveFromDB.GetSize() + keepsToErase.size() > blobsGCCountLimit) {
347+
if (std::get<0>(newCollectGenStep) == CurrentGen) {
348+
CollectGenStepInFlight = newCollectGenStep;
349+
}
350+
if (keepsToErase.size() > blobsGCCountLimit) {
363351
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "a lot of blobs to gc")("to_remove", extractedToRemoveFromDB.GetSize())("keeps_to_erase", keepsToErase.size())("limit", blobsGCCountLimit);
364352
break;
365353
}
366354
}
367355
if (CollectGenStepInFlight) {
356+
if (FirstGC) {
357+
FirstGC = false;
358+
359+
// TODO: we need only actual channel history here
360+
const auto& channelHistory = TabletInfo->ChannelInfo(channelIdx)->History;
361+
362+
for (auto it = channelHistory.begin(); it != channelHistory.end(); ++it) {
363+
perGroupGCListsInFlight[it->GroupID];
364+
}
365+
}
368366
PopGCBarriers(*CollectGenStepInFlight);
369-
} else {
370-
CollectGenStepInFlight = LastCollectedGenStep;
371367
}
368+
if (BlobsToKeep.size() && CollectGenStepInFlight) {
369+
TGenStep genStepFront{BlobsToKeep.begin()->Generation(), BlobsToKeep.begin()->Step()};
370+
AFL_VERIFY(*CollectGenStepInFlight < genStepFront);
371+
}
372+
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("notice", "collect_gen_step")("value", CollectGenStepInFlight)("current_gen", CurrentGen);
372373
auto removeCategories = sharedBlobsInfo->BuildRemoveCategories(std::move(extractedToRemoveFromDB));
373374

374-
auto result = std::make_shared<NBlobOperations::NBlobStorage::TGCTask>(storageId, std::move(perGroupGCListsInFlight), *CollectGenStepInFlight,
375+
auto result = std::make_shared<NBlobOperations::NBlobStorage::TGCTask>(storageId, std::move(perGroupGCListsInFlight), CollectGenStepInFlight,
375376
std::move(keepsToErase), manager, std::move(removeCategories), counters, TabletInfo->TabletID, CurrentGen);
376377
if (result->IsEmpty()) {
377378
CollectGenStepInFlight = {};
@@ -383,6 +384,7 @@ std::shared_ptr<NBlobOperations::NBlobStorage::TGCTask> TBlobManager::BuildGCTas
383384

384385

385386
TBlobBatch TBlobManager::StartBlobBatch(ui32 channel) {
387+
AFL_VERIFY(++CurrentStep < Max<ui32>() - 10);
386388
++CountersUpdate.BatchesStarted;
387389
Y_ABORT_UNLESS(channel == BLOB_CHANNEL, "Support for mutiple blob channels is not implemented yet");
388390
++CurrentStep;
@@ -443,13 +445,19 @@ void TBlobManager::DeleteBlobOnComplete(const TTabletId tabletId, const TUnified
443445
}
444446
}
445447

446-
void TBlobManager::OnGCFinishedOnExecute(const TGenStep& genStep, IBlobManagerDb& db) {
447-
db.SaveLastGcBarrier(genStep);
448+
void TBlobManager::OnGCFinishedOnExecute(const std::optional<TGenStep>& genStep, IBlobManagerDb& db) {
449+
if (genStep) {
450+
db.SaveLastGcBarrier(*genStep);
451+
}
448452
}
449453

450-
void TBlobManager::OnGCFinishedOnComplete(const TGenStep& genStep) {
451-
LastCollectedGenStep = genStep;
452-
CollectGenStepInFlight.reset();
454+
void TBlobManager::OnGCFinishedOnComplete(const std::optional<TGenStep>& genStep) {
455+
if (genStep) {
456+
LastCollectedGenStep = *genStep;
457+
CollectGenStepInFlight.reset();
458+
} else {
459+
AFL_VERIFY(!CollectGenStepInFlight);
460+
}
453461
}
454462

455463
void TBlobManager::OnBlobFree(const TUnifiedBlobId& blobId) {

ydb/core/tx/columnshard/blobs_action/bs/blob_manager.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,8 +195,8 @@ class TBlobManager : public IBlobManager, public TCommonBlobsTracker {
195195
const std::shared_ptr<TBlobManager>& manager, const std::shared_ptr<NDataSharing::TStorageSharedBlobsManager>& sharedBlobsInfo,
196196
const std::shared_ptr<NBlobOperations::TRemoveGCCounters>& counters) noexcept;
197197

198-
void OnGCFinishedOnExecute(const TGenStep& genStep, IBlobManagerDb& db);
199-
void OnGCFinishedOnComplete(const TGenStep& genStep);
198+
void OnGCFinishedOnExecute(const std::optional<TGenStep>& genStep, IBlobManagerDb& db);
199+
void OnGCFinishedOnComplete(const std::optional<TGenStep>& genStep);
200200

201201
TBlobManagerCounters GetCountersUpdate() {
202202
TBlobManagerCounters res = CountersUpdate;

ydb/core/tx/columnshard/blobs_action/bs/gc.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ bool TGCTask::DoOnCompleteTxAfterCleaning(NColumnShard::TColumnShard& /*self*/,
2222
return true;
2323
}
2424

25-
TGCTask::TGCTask(const TString& storageId, TGCListsByGroup&& listsByGroupId, const TGenStep& collectGenStepInFlight, std::deque<TUnifiedBlobId>&& keepsToErase,
25+
TGCTask::TGCTask(const TString& storageId, TGCListsByGroup&& listsByGroupId, const std::optional<TGenStep>& collectGenStepInFlight, std::deque<TUnifiedBlobId>&& keepsToErase,
2626
const std::shared_ptr<TBlobManager>& manager, TBlobsCategories&& blobsToRemove, const std::shared_ptr<TRemoveGCCounters>& counters,
2727
const ui64 tabletId, const ui64 currentGen)
2828
: TBase(storageId, std::move(blobsToRemove), counters)
@@ -53,8 +53,8 @@ std::unique_ptr<TEvBlobStorage::TEvCollectGarbage> TGCTask::BuildRequest(const u
5353
AFL_VERIFY(++it->second.RequestsCount < 10);
5454
auto result = std::make_unique<TEvBlobStorage::TEvCollectGarbage>(
5555
TabletId, CurrentGen, PerGenerationCounter.Val(),
56-
channelIdx, true,
57-
std::get<0>(CollectGenStepInFlight), std::get<1>(CollectGenStepInFlight),
56+
channelIdx, !!CollectGenStepInFlight,
57+
CollectGenStepInFlight ? std::get<0>(*CollectGenStepInFlight) : 0, CollectGenStepInFlight ? std::get<1>(*CollectGenStepInFlight) : 0,
5858
new TVector<TLogoBlobID>(it->second.KeepList.begin(), it->second.KeepList.end()),
5959
new TVector<TLogoBlobID>(it->second.DontKeepList.begin(), it->second.DontKeepList.end()),
6060
TInstant::Max(), true);

ydb/core/tx/columnshard/blobs_action/bs/gc.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ class TGCTask: public IBlobsGCAction {
2020
using TGCListsByGroup = THashMap<ui32, TGCLists>;
2121
private:
2222
TGCListsByGroup ListsByGroupId;
23-
TGenStep CollectGenStepInFlight;
23+
std::optional<TGenStep> CollectGenStepInFlight;
2424
const ui64 TabletId;
2525
const ui64 CurrentGen;
2626
std::deque<TUnifiedBlobId> KeepsToErase;
@@ -30,11 +30,11 @@ class TGCTask: public IBlobsGCAction {
3030
virtual void DoOnExecuteTxAfterCleaning(NColumnShard::TColumnShard& self, TBlobManagerDb& dbBlobs) override;
3131
virtual bool DoOnCompleteTxAfterCleaning(NColumnShard::TColumnShard& self, const std::shared_ptr<IBlobsGCAction>& taskAction) override;
3232
virtual bool DoIsEmpty() const override {
33-
return false;
33+
return !CollectGenStepInFlight && KeepsToErase.empty();
3434
}
3535

3636
public:
37-
TGCTask(const TString& storageId, TGCListsByGroup&& listsByGroupId, const TGenStep& collectGenStepInFlight, std::deque<TUnifiedBlobId>&& keepsToErase,
37+
TGCTask(const TString& storageId, TGCListsByGroup&& listsByGroupId, const std::optional<TGenStep>& collectGenStepInFlight, std::deque<TUnifiedBlobId>&& keepsToErase,
3838
const std::shared_ptr<TBlobManager>& manager, TBlobsCategories&& blobsToRemove, const std::shared_ptr<TRemoveGCCounters>& counters, const ui64 tabletId, const ui64 currentGen);
3939

4040
const TGCListsByGroup& GetListsByGroupId() const {

ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSeriali
77
meta.SetNumRows(batch->GetRowsCount());
88
meta.SetRawBytes(batch->GetRawBytes());
99
meta.SetDirtyWriteTimeSeconds(batch.GetStartInstant().Seconds());
10-
meta.SetSpecialKeysRawData(batch->GetSpecialKeysSafe().SerializeToString());
10+
meta.SetSpecialKeysRawData(batch->GetSpecialKeysSafe());
1111

1212
const auto& blobRange = batch.GetRange();
1313
Y_ABORT_UNLESS(blobRange.GetBlobId().IsValid());

ydb/core/tx/columnshard/columnshard.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ void TColumnShard::UpdateInsertTableCounters() {
212212
SetCounter(COUNTER_COMMITTED_RECORDS, committed.Rows);
213213
SetCounter(COUNTER_COMMITTED_BYTES, committed.Bytes);
214214

215-
LOG_S_INFO("InsertTable. Prepared: " << prepared.Bytes << " in " << prepared.Rows
215+
LOG_S_TRACE("InsertTable. Prepared: " << prepared.Bytes << " in " << prepared.Rows
216216
<< " records, committed: " << committed.Bytes << " in " << committed.Rows
217217
<< " records at tablet " << TabletID());
218218
}

0 commit comments

Comments
 (0)