Skip to content

Commit 21037c5

Browse files
committed
Extract build index helpers (#17622)
1 parent 806f683 commit 21037c5

File tree

8 files changed

+871
-1086
lines changed

8 files changed

+871
-1086
lines changed
Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
#pragma once
2+
3+
#include <ydb/core/tx/datashard/datashard_impl.h>
4+
#include <ydb/core/tx/datashard/scan_common.h>
5+
#include <ydb/library/actors/core/log.h>
6+
7+
namespace NKikimr::NDataShard {
8+
9+
#define LOG_T(stream) LOG_TRACE_S (*TlsActivationContext, NKikimrServices::BUILD_INDEX, stream)
10+
#define LOG_D(stream) LOG_DEBUG_S (*TlsActivationContext, NKikimrServices::BUILD_INDEX, stream)
11+
#define LOG_I(stream) LOG_INFO_S (*TlsActivationContext, NKikimrServices::BUILD_INDEX, stream)
12+
#define LOG_W(stream) LOG_WARN_S (*TlsActivationContext, NKikimrServices::BUILD_INDEX, stream)
13+
#define LOG_N(stream) LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::BUILD_INDEX, stream)
14+
#define LOG_E(stream) LOG_ERROR_S (*TlsActivationContext, NKikimrServices::BUILD_INDEX, stream)
15+
16+
class TBatchRowsUploader
17+
{
18+
struct TDestination {
19+
TBufferData Buffer;
20+
TString Table;
21+
std::shared_ptr<NTxProxy::TUploadTypes> Types;
22+
23+
operator bool() const {
24+
return !Buffer.IsEmpty();
25+
}
26+
};
27+
28+
public:
29+
TBatchRowsUploader(const TIndexBuildScanSettings& scanSettings)
30+
: ScanSettings(scanSettings)
31+
{}
32+
33+
TBufferData* AddDestination(TString table, std::shared_ptr<NTxProxy::TUploadTypes> types) {
34+
auto& dst = Destinations[table];
35+
dst.Table = std::move(table);
36+
dst.Types = std::move(types);
37+
return &dst.Buffer;
38+
}
39+
40+
void Handle(TEvTxUserProxy::TEvUploadRowsResponse::TPtr& ev) {
41+
Y_ENSURE(UploaderId == ev->Sender, "Mismatch"
42+
<< " Uploader: " << UploaderId.ToString()
43+
<< " Sender: " << ev->Sender.ToString());
44+
Y_ENSURE(Uploading);
45+
46+
UploaderId = {};
47+
48+
UploadStatus.StatusCode = ev->Get()->Status;
49+
UploadStatus.Issues = ev->Get()->Issues;
50+
if (!UploadStatus.IsSuccess()) {
51+
return;
52+
}
53+
54+
UploadRows += Uploading.Buffer.GetRows();
55+
UploadBytes += Uploading.Buffer.GetBytes();
56+
Uploading.Buffer.Clear();
57+
RetryCount = 0;
58+
59+
for (auto& [_, dst] : Destinations) {
60+
if (TryUpload(dst, true /* by limit */)) {
61+
break;
62+
}
63+
}
64+
}
65+
66+
bool ShouldWaitUpload()
67+
{
68+
bool hasReachedLimit = false;
69+
for (auto& [_, dst] : Destinations) {
70+
if (HasReachedLimits(dst.Buffer, ScanSettings)) {
71+
hasReachedLimit = true;
72+
break;
73+
}
74+
}
75+
if (!hasReachedLimit) {
76+
return false;
77+
}
78+
79+
if (Uploading) {
80+
return true;
81+
}
82+
for (auto& [_, dst] : Destinations) {
83+
if (TryUpload(dst, true /* by limit */)) {
84+
break;
85+
}
86+
}
87+
88+
hasReachedLimit = false;
89+
for (auto& [_, dst] : Destinations) {
90+
if (HasReachedLimits(dst.Buffer, ScanSettings)) {
91+
hasReachedLimit = true;
92+
break;
93+
}
94+
}
95+
return hasReachedLimit;
96+
}
97+
98+
std::optional<TDuration> GetRetryAfter() const {
99+
if (RetryCount < ScanSettings.GetMaxBatchRetries() && UploadStatus.IsRetriable()) {
100+
return GetRetryWakeupTimeoutBackoff(RetryCount);
101+
}
102+
return {};
103+
}
104+
105+
void RetryUpload()
106+
{
107+
if (!Uploading) {
108+
return;
109+
}
110+
111+
++RetryCount;
112+
StartUploadRowsInternal();
113+
}
114+
115+
bool CanFinish() {
116+
if (Uploading) {
117+
return false;
118+
}
119+
120+
for (auto& [_, dst] : Destinations) {
121+
if (TryUpload(dst, false /* not by limit */)) {
122+
return false;
123+
}
124+
}
125+
126+
return true;
127+
}
128+
129+
template<typename TResponse>
130+
void Finish(TResponse& response, NTable::EAbort abort) {
131+
if (UploaderId) {
132+
TlsActivationContext->Send(new IEventHandle(UploaderId, TActorId(), new TEvents::TEvPoison));
133+
UploaderId = {};
134+
}
135+
136+
response.SetUploadRows(UploadRows);
137+
response.SetUploadBytes(UploadBytes);
138+
if (abort != NTable::EAbort::None) {
139+
response.SetStatus(NKikimrIndexBuilder::EBuildStatus::ABORTED);
140+
} else if (UploadStatus.IsNone() || UploadStatus.IsSuccess()) {
141+
response.SetStatus(NKikimrIndexBuilder::EBuildStatus::DONE);
142+
if (UploadStatus.IsNone()) {
143+
UploadStatus.Issues.AddIssue(NYql::TIssue("Shard or requested range is empty"));
144+
}
145+
NYql::IssuesToMessage(UploadStatus.Issues, response.MutableIssues());
146+
} else {
147+
response.SetStatus(NKikimrIndexBuilder::EBuildStatus::BUILD_ERROR);
148+
NYql::IssuesToMessage(UploadStatus.Issues, response.MutableIssues());
149+
}
150+
}
151+
152+
const TUploadStatus& GetUploadStatus() const {
153+
return UploadStatus;
154+
}
155+
156+
bool IsSuccess() const {
157+
return UploadStatus.IsSuccess();
158+
}
159+
160+
void SetOwner(const TActorId& owner) {
161+
Owner = owner;
162+
}
163+
164+
TString Debug() const {
165+
TStringBuilder result;
166+
167+
if (Uploading) {
168+
result << "UploadTable: " << Uploading.Table << " UploadBuf size: " << Uploading.Buffer.Size() << " RetryCount: " << RetryCount;
169+
}
170+
171+
return result;
172+
}
173+
174+
private:
175+
bool TryUpload(TDestination& destination, bool byLimit) {
176+
if (Y_UNLIKELY(Uploading)) {
177+
// already uploading something
178+
return true;
179+
}
180+
181+
if (!destination.Buffer.IsEmpty() && (!byLimit || HasReachedLimits(destination.Buffer, ScanSettings))) {
182+
Uploading.Table = destination.Table;
183+
Uploading.Types = destination.Types;
184+
destination.Buffer.FlushTo(Uploading.Buffer);
185+
StartUploadRowsInternal();
186+
return true;
187+
}
188+
189+
return false;
190+
}
191+
192+
void StartUploadRowsInternal() {
193+
LOG_D("TBatchRowsUploader StartUploadRowsInternal " << Debug());
194+
195+
Y_ASSERT(Uploading);
196+
Y_ASSERT(!Uploading.Buffer.IsEmpty());
197+
Y_ASSERT(!UploaderId);
198+
Y_ASSERT(Owner);
199+
auto actor = NTxProxy::CreateUploadRowsInternal(
200+
Owner, Uploading.Table, Uploading.Types, Uploading.Buffer.GetRowsData(),
201+
NTxProxy::EUploadRowsMode::WriteToTableShadow, true /*writeToPrivateTable*/);
202+
203+
UploaderId = TlsActivationContext->Register(actor);
204+
}
205+
206+
private:
207+
const TIndexBuildScanSettings ScanSettings;
208+
TActorId Owner;
209+
210+
TMap<TString, TDestination> Destinations;
211+
TActorId UploaderId = {};
212+
TDestination Uploading;
213+
TUploadStatus UploadStatus = {};
214+
ui64 UploadRows = 0;
215+
ui64 UploadBytes = 0;
216+
ui32 RetryCount = 0;
217+
};
218+
219+
inline void StartScan(TDataShard* dataShard, TAutoPtr<NTable::IScan>&& scan, ui64 id,
220+
TScanRecord::TSeqNo seqNo, TRowVersion rowVersion, ui32 tableId)
221+
{
222+
auto& scanManager = dataShard->GetScanManager();
223+
224+
if (const auto* recCard = scanManager.Get(id)) {
225+
if (recCard->SeqNo == seqNo) {
226+
// do no start one more scan
227+
return;
228+
}
229+
230+
for (auto scanId : recCard->ScanIds) {
231+
dataShard->CancelScan(tableId, scanId);
232+
}
233+
scanManager.Drop(id);
234+
}
235+
236+
TScanOptions scanOpts;
237+
scanOpts.SetSnapshotRowVersion(rowVersion);
238+
scanOpts.SetResourceBroker("build_index", 10);
239+
const auto scanId = dataShard->QueueScan(tableId, std::move(scan), 0, scanOpts);
240+
scanManager.Set(id, seqNo).push_back(scanId);
241+
}
242+
243+
}

0 commit comments

Comments
 (0)