Skip to content

Commit 7b31943

Browse files
authored
Extract stream scan internal structs for further reuse (#9183)
1 parent 47ba04b commit 7b31943

File tree

6 files changed

+138
-81
lines changed

6 files changed

+138
-81
lines changed

ydb/core/tx/datashard/cdc_stream_scan.cpp

Lines changed: 6 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#include "cdc_stream_scan.h"
22
#include "change_record_body_serializer.h"
33
#include "datashard_impl.h"
4+
#include "incr_restore_helpers.h"
5+
#include "stream_scan_common.h"
46

57
#include <ydb/core/protos/datashard_config.pb.h>
68
#include <ydb/core/protos/tx_datashard.pb.h>
@@ -183,17 +185,6 @@ class TDataShard::TTxCdcStreamScanProgress
183185
TVector<IDataShardChangeCollector::TChange> ChangeRecords;
184186
bool Reschedule = false;
185187

186-
static TVector<TRawTypeValue> MakeKey(TArrayRef<const TCell> cells, TUserTable::TCPtr table) {
187-
TVector<TRawTypeValue> key(Reserve(cells.size()));
188-
189-
Y_ABORT_UNLESS(cells.size() == table->KeyColumnTypes.size());
190-
for (TPos pos = 0; pos < cells.size(); ++pos) {
191-
key.emplace_back(cells.at(pos).AsRef(), table->KeyColumnTypes.at(pos));
192-
}
193-
194-
return key;
195-
}
196-
197188
static TVector<TUpdateOp> MakeUpdates(TArrayRef<const TCell> cells, TArrayRef<const TTag> tags, TUserTable::TCPtr table) {
198189
TVector<TUpdateOp> updates(Reserve(cells.size()));
199190

@@ -208,30 +199,6 @@ class TDataShard::TTxCdcStreamScanProgress
208199
return updates;
209200
}
210201

211-
static std::optional<TVector<TUpdateOp>> MakeRestoreUpdates(TArrayRef<const TCell> cells, TArrayRef<const TTag> tags, TUserTable::TCPtr table) {
212-
Y_ABORT_UNLESS(cells.size() >= 1);
213-
TVector<TUpdateOp> updates(::Reserve(cells.size() - 1));
214-
215-
bool foundSpecialColumn = false;
216-
Y_ABORT_UNLESS(cells.size() == tags.size());
217-
for (TPos pos = 0; pos < cells.size(); ++pos) {
218-
const auto tag = tags.at(pos);
219-
auto it = table->Columns.find(tag);
220-
Y_ABORT_UNLESS(it != table->Columns.end());
221-
if (it->second.Name == "__ydb_incrBackupImpl_deleted") {
222-
if (const auto& cell = cells.at(pos); !cell.IsNull() && cell.AsValue<bool>()) {
223-
return std::nullopt;
224-
}
225-
foundSpecialColumn = true;
226-
continue;
227-
}
228-
updates.emplace_back(tag, ECellOp::Set, TRawTypeValue(cells.at(pos).AsRef(), it->second.Type));
229-
}
230-
Y_ABORT_UNLESS(foundSpecialColumn);
231-
232-
return updates;
233-
}
234-
235202
static TRowState MakeRow(TArrayRef<const TCell> cells) {
236203
TRowState row(cells.size());
237204

@@ -309,7 +276,7 @@ class TDataShard::TTxCdcStreamScanProgress
309276
bool pageFault = false;
310277

311278
for (const auto& [k, v] : ev.Rows) {
312-
const auto key = MakeKey(k.GetCells(), table);
279+
const auto key = NStreamScan::MakeKey(k.GetCells(), table);
313280
const auto& keyTags = table->KeyColumnIds;
314281

315282
TRowState row(0);
@@ -332,7 +299,7 @@ class TDataShard::TTxCdcStreamScanProgress
332299
Serialize(body, ERowOp::Upsert, key, keyTags, MakeUpdates(v.GetCells(), valueTags, table));
333300
break;
334301
case NKikimrSchemeOp::ECdcStreamModeRestoreIncrBackup:
335-
if (auto updates = MakeRestoreUpdates(v.GetCells(), valueTags, table); updates) {
302+
if (auto updates = NIncrRestoreHelpers::MakeRestoreUpdates(v.GetCells(), valueTags, table); updates) {
336303
Serialize(body, ERowOp::Upsert, key, keyTags, *updates);
337304
} else {
338305
Serialize(body, ERowOp::Erase, key, keyTags, {});
@@ -425,50 +392,8 @@ class TCdcStreamScan: public IActorCallback, public IScan {
425392
ui64 TabletId;
426393
};
427394

428-
struct TLimits {
429-
ui32 BatchMaxBytes;
430-
ui32 BatchMinRows;
431-
ui32 BatchMaxRows;
432-
433-
TLimits(const NKikimrTxDataShard::TEvCdcStreamScanRequest::TLimits& proto)
434-
: BatchMaxBytes(proto.GetBatchMaxBytes())
435-
, BatchMinRows(proto.GetBatchMinRows())
436-
, BatchMaxRows(proto.GetBatchMaxRows())
437-
{
438-
}
439-
};
440-
441-
class TBuffer {
442-
public:
443-
void AddRow(TArrayRef<const TCell> key, TArrayRef<const TCell> value) {
444-
const auto& [k, v] = Data.emplace_back(
445-
TSerializedCellVec(key),
446-
TSerializedCellVec(value)
447-
);
448-
ByteSize += k.GetBuffer().size() + v.GetBuffer().size();
449-
}
450-
451-
auto&& Flush() {
452-
ByteSize = 0;
453-
return std::move(Data);
454-
}
455-
456-
ui64 Bytes() const {
457-
return ByteSize;
458-
}
459-
460-
ui64 Rows() const {
461-
return Data.size();
462-
}
463-
464-
explicit operator bool() const {
465-
return !Data.empty();
466-
}
467-
468-
private:
469-
TVector<std::pair<TSerializedCellVec, TSerializedCellVec>> Data; // key & value (if any)
470-
ui64 ByteSize = 0;
471-
};
395+
using TLimits = NStreamScan::TLimits;
396+
using TBuffer = NStreamScan::TBuffer;
472397

473398
STATEFN(StateWork) {
474399
switch (ev->GetTypeRewrite()) {
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
#include "incr_restore_helpers.h"
2+
3+
namespace NKikimr::NDataShard::NIncrRestoreHelpers {
4+
5+
std::optional<TVector<TUpdateOp>> MakeRestoreUpdates(TArrayRef<const TCell> cells, TArrayRef<const TTag> tags, TUserTable::TCPtr table) {
6+
Y_ABORT_UNLESS(cells.size() >= 1);
7+
TVector<TUpdateOp> updates(::Reserve(cells.size() - 1));
8+
9+
bool foundSpecialColumn = false;
10+
Y_ABORT_UNLESS(cells.size() == tags.size());
11+
for (TPos pos = 0; pos < cells.size(); ++pos) {
12+
const auto tag = tags.at(pos);
13+
auto it = table->Columns.find(tag);
14+
Y_ABORT_UNLESS(it != table->Columns.end());
15+
if (it->second.Name == "__ydb_incrBackupImpl_deleted") {
16+
if (const auto& cell = cells.at(pos); !cell.IsNull() && cell.AsValue<bool>()) {
17+
return std::nullopt;
18+
}
19+
foundSpecialColumn = true;
20+
continue;
21+
}
22+
updates.emplace_back(tag, ECellOp::Set, TRawTypeValue(cells.at(pos).AsRef(), it->second.Type));
23+
}
24+
Y_ABORT_UNLESS(foundSpecialColumn);
25+
26+
return updates;
27+
}
28+
29+
} // namespace NKikimr::NBackup::NImpl
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
#pragma once
2+
3+
#include <ydb/core/tablet_flat/flat_update_op.h>
4+
#include <ydb/core/tx/datashard/datashard_user_table.h>
5+
6+
#include <util/generic/vector.h>
7+
8+
#include <optional>
9+
10+
namespace NKikimr::NDataShard::NIncrRestoreHelpers {
11+
12+
using namespace NTable;
13+
14+
std::optional<TVector<TUpdateOp>> MakeRestoreUpdates(TArrayRef<const TCell> cells, TArrayRef<const TTag> tags, TUserTable::TCPtr table);
15+
16+
} // namespace NKikimr::NBackup::NImpl
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
#include "stream_scan_common.h"
2+
3+
#include <ydb/core/protos/tx_datashard.pb.h>
4+
5+
namespace NKikimr::NDataShard::NStreamScan {
6+
7+
using namespace NTable;
8+
9+
TLimits::TLimits(const NKikimrTxDataShard::TEvCdcStreamScanRequest_TLimits& proto)
10+
: BatchMaxBytes(proto.GetBatchMaxBytes())
11+
, BatchMinRows(proto.GetBatchMinRows())
12+
, BatchMaxRows(proto.GetBatchMaxRows())
13+
{
14+
}
15+
16+
TVector<TRawTypeValue> MakeKey(TArrayRef<const TCell> cells, TUserTable::TCPtr table) {
17+
TVector<TRawTypeValue> key(Reserve(cells.size()));
18+
19+
Y_ABORT_UNLESS(cells.size() == table->KeyColumnTypes.size());
20+
for (TPos pos = 0; pos < cells.size(); ++pos) {
21+
key.emplace_back(cells.at(pos).AsRef(), table->KeyColumnTypes.at(pos));
22+
}
23+
24+
return key;
25+
}
26+
27+
} // namespace NKikimr::NDataShard::NStreamScan
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
#pragma once
2+
3+
#include <ydb/core/tablet_flat/flat_update_op.h>
4+
#include <ydb/core/tx/datashard/datashard_user_table.h>
5+
6+
#include <util/generic/vector.h>
7+
8+
namespace NKikimrTxDataShard {
9+
10+
class TEvCdcStreamScanRequest_TLimits;
11+
12+
} // namespace NKikimrTxDataShard
13+
14+
namespace NKikimr::NDataShard::NStreamScan {
15+
16+
TVector<TRawTypeValue> MakeKey(TArrayRef<const TCell> cells, TUserTable::TCPtr table);
17+
18+
struct TLimits {
19+
ui32 BatchMaxBytes;
20+
ui32 BatchMinRows;
21+
ui32 BatchMaxRows;
22+
23+
TLimits(const NKikimrTxDataShard::TEvCdcStreamScanRequest_TLimits& proto);
24+
};
25+
26+
class TBuffer {
27+
public:
28+
inline void AddRow(TArrayRef<const TCell> key, TArrayRef<const TCell> value) {
29+
const auto& [k, v] = Data.emplace_back(
30+
TSerializedCellVec(key),
31+
TSerializedCellVec(value)
32+
);
33+
ByteSize += k.GetBuffer().size() + v.GetBuffer().size();
34+
}
35+
36+
inline auto&& Flush() {
37+
ByteSize = 0;
38+
return std::move(Data);
39+
}
40+
41+
inline ui64 Bytes() const {
42+
return ByteSize;
43+
}
44+
45+
inline ui64 Rows() const {
46+
return Data.size();
47+
}
48+
49+
inline explicit operator bool() const {
50+
return !Data.empty();
51+
}
52+
53+
private:
54+
TVector<std::pair<TSerializedCellVec, TSerializedCellVec>> Data; // key & value (if any)
55+
ui64 ByteSize = 0;
56+
};
57+
58+
} // namespace NKikimr::NDataShard::NStreamScan

ydb/core/tx/datashard/ya.make

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ SRCS(
159159
finish_propose_unit.cpp
160160
finish_propose_write_unit.cpp
161161
follower_edge.cpp
162+
incr_restore_helpers.cpp
162163
initiate_build_index_unit.cpp
163164
key_conflicts.cpp
164165
key_conflicts.h
@@ -205,6 +206,7 @@ SRCS(
205206
store_scheme_tx_unit.cpp
206207
store_snapshot_tx_unit.cpp
207208
store_write_unit.cpp
209+
stream_scan_common.cpp
208210
upload_stats.cpp
209211
volatile_tx.cpp
210212
wait_for_plan_unit.cpp

0 commit comments

Comments
 (0)