|
1 |
| -#include "inflight_request_tracker.h" |
2 | 1 | #include "columnshard_impl.h"
|
3 | 2 | #include "columnshard_schema.h"
|
| 3 | +#include "inflight_request_tracker.h" |
| 4 | + |
4 | 5 | #include "data_sharing/common/transactions/tx_extension.h"
|
5 | 6 | #include "engines/column_engine.h"
|
6 | 7 | #include "engines/reader/plain_reader/constructor/read_metadata.h"
|
|
9 | 10 | namespace NKikimr::NColumnShard {
|
10 | 11 |
|
11 | 12 | void TInFlightReadsTracker::RemoveInFlightRequest(ui64 cookie, const NOlap::TVersionedIndex* /*index*/, const TInstant now) {
|
12 |
| - Y_ABORT_UNLESS(RequestsMeta.contains(cookie), "Unknown request cookie %" PRIu64, cookie); |
13 |
| - const auto& readMetaList = RequestsMeta[cookie]; |
| 13 | + auto it = RequestsMeta.find(cookie); |
| 14 | + AFL_VERIFY(it != RequestsMeta.end())("cookie", cookie); |
| 15 | + const auto& readMetaList = it->second; |
14 | 16 |
|
15 | 17 | for (const auto& readMetaBase : readMetaList) {
|
16 |
| - NOlap::NReader::NPlain::TReadMetadata::TConstPtr readMeta = std::dynamic_pointer_cast<const NOlap::NReader::NPlain::TReadMetadata>(readMetaBase); |
17 |
| - |
18 |
| - if (!readMeta) { |
19 |
| - continue; |
20 |
| - } |
21 | 18 | {
|
22 |
| - auto it = SnapshotsLive.find(readMeta->GetRequestSnapshot()); |
| 19 | + auto it = SnapshotsLive.find(readMetaBase->GetRequestSnapshot()); |
23 | 20 | AFL_VERIFY(it != SnapshotsLive.end());
|
24 | 21 | if (it->second.DelRequest(cookie, now)) {
|
25 | 22 | SnapshotsLive.erase(it);
|
26 | 23 | }
|
27 | 24 | }
|
28 | 25 |
|
29 |
| - auto insertStorage = StoragesManager->GetInsertOperator(); |
30 |
| - auto tracker = insertStorage->GetBlobsTracker(); |
31 |
| - for (const auto& committedBlob : readMeta->CommittedBlobs) { |
32 |
| - tracker->FreeBlob(committedBlob.GetBlobRange().GetBlobId()); |
| 26 | + if (NOlap::NReader::NPlain::TReadMetadata::TConstPtr readMeta = |
| 27 | + std::dynamic_pointer_cast<const NOlap::NReader::NPlain::TReadMetadata>(readMetaBase)) { |
| 28 | + auto insertStorage = StoragesManager->GetInsertOperator(); |
| 29 | + auto tracker = insertStorage->GetBlobsTracker(); |
| 30 | + for (const auto& committedBlob : readMeta->CommittedBlobs) { |
| 31 | + tracker->FreeBlob(committedBlob.GetBlobRange().GetBlobId()); |
| 32 | + } |
33 | 33 | }
|
34 | 34 | }
|
35 | 35 | Counters->OnSnapshotsInfo(SnapshotsLive.size(), GetSnapshotToClean());
|
@@ -85,8 +85,7 @@ class TTransactionSavePersistentSnapshots: public NOlap::NDataSharing::TExtended
|
85 | 85 | NColumnShard::TColumnShard* self, std::set<NOlap::TSnapshot>&& saveSnapshots, std::set<NOlap::TSnapshot>&& removeSnapshots)
|
86 | 86 | : TBase(self)
|
87 | 87 | , SaveSnapshots(std::move(saveSnapshots))
|
88 |
| - , RemoveSnapshots(std::move(removeSnapshots)) |
89 |
| - { |
| 88 | + , RemoveSnapshots(std::move(removeSnapshots)) { |
90 | 89 | AFL_VERIFY(SaveSnapshots.size() || RemoveSnapshots.size());
|
91 | 90 | }
|
92 | 91 | };
|
@@ -139,4 +138,20 @@ bool TInFlightReadsTracker::LoadFromDatabase(NTable::TDatabase& tableDB) {
|
139 | 138 | return true;
|
140 | 139 | }
|
141 | 140 |
|
| 141 | +NKikimr::TConclusion<ui64> TInFlightReadsTracker::AddInFlightRequest( |
| 142 | + NOlap::NReader::TReadMetadataBase::TConstPtr readMeta, const NOlap::TVersionedIndex* index) { |
| 143 | + const ui64 cookie = NextCookie++; |
| 144 | + auto it = SnapshotsLive.find(readMeta->GetRequestSnapshot()); |
| 145 | + if (it == SnapshotsLive.end()) { |
| 146 | + it = SnapshotsLive.emplace(readMeta->GetRequestSnapshot(), TSnapshotLiveInfo::BuildFromRequest(readMeta->GetRequestSnapshot())).first; |
| 147 | + Counters->OnSnapshotsInfo(SnapshotsLive.size(), GetSnapshotToClean()); |
| 148 | + } |
| 149 | + it->second.AddRequest(cookie); |
| 150 | + auto status = AddToInFlightRequest(cookie, readMeta, index); |
| 151 | + if (!status) { |
| 152 | + return status; |
| 153 | + } |
| 154 | + return cookie; |
142 | 155 | }
|
| 156 | + |
| 157 | +} // namespace NKikimr::NColumnShard |
0 commit comments