Skip to content

Commit 8a5b353

Browse files
azevaykinnikvas0
authored andcommitted
Precharge in EvWrite (#17721)
1 parent 9566518 commit 8a5b353

File tree

4 files changed

+127
-32
lines changed

4 files changed

+127
-32
lines changed

ydb/core/kqp/ut/query/kqp_query_ut.cpp

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#include <fmt/format.h>
12
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
23

34
#include <ydb/core/tx/datashard/datashard_failpoints.h>
@@ -2637,6 +2638,35 @@ Y_UNIT_TEST_SUITE(KqpQuery) {
26372638
}
26382639
}
26392640

2641+
Y_UNIT_TEST(ExecuteWriteQuery) {
2642+
using namespace fmt::literals;
2643+
2644+
TKikimrRunner kikimr;
2645+
auto client = kikimr.GetQueryClient();
2646+
2647+
{ // Just generate table
2648+
const auto sql = fmt::format(R"(
2649+
CREATE TABLE test_table (
2650+
PRIMARY KEY (id)
2651+
) AS SELECT
2652+
ROW_NUMBER() OVER w AS id, data
2653+
FROM
2654+
AS_TABLE(ListReplicate(<|data: '{data}'|>, 500000))
2655+
WINDOW
2656+
w AS (ORDER BY data))",
2657+
"data"_a = std::string(137, 'a')
2658+
);
2659+
const auto result = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
2660+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2661+
}
2662+
2663+
Cerr << TInstant::Now() << " --------------- Start update ---------------\n";
2664+
2665+
const auto hangingResult = client.ExecuteQuery(R"(
2666+
UPDATE test_table SET data = "a"
2667+
)", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
2668+
UNIT_ASSERT_VALUES_EQUAL_C(hangingResult.GetStatus(), EStatus::SUCCESS, hangingResult.GetIssues().ToString());
2669+
}
26402670
}
26412671

26422672
} // namespace NKqp

ydb/core/tx/datashard/datashard_user_db.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,16 @@ void TDataShardUserDb::EraseRow(
178178
Counters.EraseRowBytes += keyBytes + 8;
179179
}
180180

181+
bool TDataShardUserDb::PrechargeRow(
182+
const TTableId& tableId,
183+
const TArrayRef<const TRawTypeValue> key)
184+
{
185+
auto localTableId = Self.GetLocalTableId(tableId);
186+
Y_ENSURE(localTableId != 0, "Unexpected PrechargeRow for an unknown table");
187+
188+
return Db.Precharge(localTableId, key, key, {}, 0, Max<ui64>(), Max<ui64>());
189+
}
190+
181191
void TDataShardUserDb::IncreaseUpdateCounters(
182192
const TArrayRef<const TRawTypeValue> key,
183193
const TArrayRef<const NIceDb::TUpdateOp> ops)

ydb/core/tx/datashard/datashard_user_db.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,10 @@ class TDataShardUserDb final
176176
NMiniKQL::TEngineHostCounters& GetCounters();
177177
const NMiniKQL::TEngineHostCounters& GetCounters() const;
178178

179+
bool PrechargeRow(
180+
const TTableId& tableId,
181+
const TArrayRef<const TRawTypeValue> key);
182+
179183
private:
180184
static TSmallVec<TCell> ConvertTableKeys(const TArrayRef<const TRawTypeValue> key);
181185

ydb/core/tx/datashard/execute_write_unit.cpp

Lines changed: 83 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,71 @@ class TExecuteWriteUnit : public TExecutionUnit {
8282
return false;
8383
}
8484

85-
EExecutionStatus OnTabletNotReadyException(TDataShardUserDb& userDb, TWriteOperation& writeOp, TTransactionContext& txc, const TActorContext& ctx) {
85+
void FillOps(const NTable::TScheme& scheme, const TUserTable& userTable, const NTable::TScheme::TTableInfo& tableInfo, const TValidatedWriteTxOperation& validatedOperation, ui32 rowIdx, TSmallVec<NTable::TUpdateOp>& ops) {
86+
const TSerializedCellMatrix& matrix = validatedOperation.GetMatrix();
87+
const auto& columnIds = validatedOperation.GetColumnIds();
88+
89+
ops.clear();
90+
Y_ENSURE(matrix.GetColCount() >= userTable.KeyColumnIds.size());
91+
ops.reserve(matrix.GetColCount() - userTable.KeyColumnIds.size());
92+
93+
for (ui16 valueColIdx = userTable.KeyColumnIds.size(); valueColIdx < matrix.GetColCount(); ++valueColIdx) {
94+
ui32 columnTag = columnIds[valueColIdx];
95+
const TCell& cell = matrix.GetCell(rowIdx, valueColIdx);
96+
97+
const NScheme::TTypeId vtypeId = scheme.GetColumnInfo(&tableInfo, columnTag)->PType.GetTypeId();
98+
ops.emplace_back(columnTag, NTable::ECellOp::Set, cell.IsNull() ? TRawTypeValue() : TRawTypeValue(cell.Data(), cell.Size(), vtypeId));
99+
}
100+
};
101+
102+
void FillKey(const NTable::TScheme& scheme, const TUserTable& userTable, const NTable::TScheme::TTableInfo& tableInfo, const TValidatedWriteTxOperation& validatedOperation, ui32 rowIdx, TSmallVec<TRawTypeValue>& key) {
103+
const TSerializedCellMatrix& matrix = validatedOperation.GetMatrix();
104+
105+
key.clear();
106+
key.reserve(userTable.KeyColumnIds.size());
107+
for (ui16 keyColIdx = 0; keyColIdx < userTable.KeyColumnIds.size(); ++keyColIdx) {
108+
const TCell& cell = matrix.GetCell(rowIdx, keyColIdx);
109+
ui32 keyCol = tableInfo.KeyColumns[keyColIdx];
110+
if (cell.IsNull()) {
111+
key.emplace_back();
112+
} else {
113+
NScheme::TTypeId vtypeId = scheme.GetColumnInfo(&tableInfo, keyCol)->PType.GetTypeId();
114+
key.emplace_back(cell.Data(), cell.Size(), vtypeId);
115+
}
116+
}
117+
};
118+
119+
EExecutionStatus OnTabletNotReadyException(TDataShardUserDb& userDb, TWriteOperation& writeOp, size_t operationIndexToPrecharge, TTransactionContext& txc, const TActorContext& ctx) {
86120
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Tablet " << DataShard.TabletID() << " is not ready for " << writeOp << " execution");
121+
122+
// Precharge
123+
if (operationIndexToPrecharge != SIZE_MAX) {
124+
const TValidatedWriteTx::TPtr& writeTx = writeOp.GetWriteTx();
125+
for (size_t operationIndex = operationIndexToPrecharge; operationIndex < writeTx->GetOperations().size(); ++operationIndex) {
126+
const TValidatedWriteTxOperation& validatedOperation = writeTx->GetOperations()[operationIndex];
127+
const ui64 tableId = validatedOperation.GetTableId().PathId.LocalPathId;
128+
const TTableId fullTableId(DataShard.GetPathOwnerId(), tableId);
129+
const TUserTable& userTable = *DataShard.GetUserTables().at(tableId);
130+
131+
const NTable::TScheme& scheme = txc.DB.GetScheme();
132+
const NTable::TScheme::TTableInfo& tableInfo = *scheme.GetTableInfo(userTable.LocalTid);
133+
134+
const TSerializedCellMatrix& matrix = validatedOperation.GetMatrix();
135+
const auto operationType = validatedOperation.GetOperationType();
136+
137+
TSmallVec<TRawTypeValue> key;
138+
139+
if (operationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_INSERT ||
140+
operationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPDATE ||
141+
userDb.NeedToReadBeforeWrite(fullTableId))
142+
{
143+
for (ui32 rowIdx = 0; rowIdx < matrix.GetRowCount(); ++rowIdx) {
144+
FillKey(scheme, userTable, tableInfo, validatedOperation, rowIdx, key);
145+
userDb.PrechargeRow(fullTableId, key);
146+
}
147+
}
148+
}
149+
}
87150

88151
DataShard.IncCounter(COUNTER_TX_TABLET_NOT_READY);
89152

@@ -107,51 +170,35 @@ class TExecuteWriteUnit : public TExecutionUnit {
107170
const TUserTable& userTable = *DataShard.GetUserTables().at(tableId);
108171

109172
const NTable::TScheme& scheme = txc.DB.GetScheme();
110-
const NTable::TScheme::TTableInfo* tableInfo = scheme.GetTableInfo(userTable.LocalTid);
111-
112-
TSmallVec<TRawTypeValue> key;
113-
TSmallVec<NTable::TUpdateOp> ops;
173+
const NTable::TScheme::TTableInfo& tableInfo = *scheme.GetTableInfo(userTable.LocalTid);
114174

115175
const TSerializedCellMatrix& matrix = validatedOperation.GetMatrix();
116176
const auto operationType = validatedOperation.GetOperationType();
117177

178+
<<<<<<< HEAD
118179
auto fillOps = [&](ui32 rowIdx) {
119180
ops.clear();
120181
Y_ABORT_UNLESS(matrix.GetColCount() >= userTable.KeyColumnIds.size());
121182
ops.reserve(matrix.GetColCount() - userTable.KeyColumnIds.size());
183+
=======
184+
TSmallVec<TRawTypeValue> key;
185+
TSmallVec<NTable::TUpdateOp> ops;
186+
>>>>>>> 27683edc32f (Precharge in EvWrite (#17721))
122187

123-
for (ui16 valueColIdx = userTable.KeyColumnIds.size(); valueColIdx < matrix.GetColCount(); ++valueColIdx) {
124-
ui32 columnTag = validatedOperation.GetColumnIds()[valueColIdx];
125-
const TCell& cell = matrix.GetCell(rowIdx, valueColIdx);
126-
127-
const NScheme::TTypeId vtypeId = scheme.GetColumnInfo(tableInfo, columnTag)->PType.GetTypeId();
128-
ops.emplace_back(columnTag, NTable::ECellOp::Set, cell.IsNull() ? TRawTypeValue() : TRawTypeValue(cell.Data(), cell.Size(), vtypeId));
129-
}
130-
};
188+
// Main update cycle
131189

132190
for (ui32 rowIdx = 0; rowIdx < matrix.GetRowCount(); ++rowIdx)
133191
{
134-
key.clear();
135-
key.reserve(userTable.KeyColumnIds.size());
136-
for (ui16 keyColIdx = 0; keyColIdx < userTable.KeyColumnIds.size(); ++keyColIdx) {
137-
const TCell& cell = matrix.GetCell(rowIdx, keyColIdx);
138-
ui32 keyCol = tableInfo->KeyColumns[keyColIdx];
139-
if (cell.IsNull()) {
140-
key.emplace_back();
141-
} else {
142-
NScheme::TTypeId vtypeId = scheme.GetColumnInfo(tableInfo, keyCol)->PType.GetTypeId();
143-
key.emplace_back(cell.Data(), cell.Size(), vtypeId);
144-
}
145-
}
192+
FillKey(scheme, userTable, tableInfo, validatedOperation, rowIdx, key);
146193

147194
switch (operationType) {
148195
case NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT: {
149-
fillOps(rowIdx);
196+
FillOps(scheme, userTable, tableInfo, validatedOperation, rowIdx, ops);
150197
userDb.UpsertRow(fullTableId, key, ops);
151198
break;
152199
}
153200
case NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE: {
154-
fillOps(rowIdx);
201+
FillOps(scheme, userTable, tableInfo, validatedOperation, rowIdx, ops);
155202
userDb.ReplaceRow(fullTableId, key, ops);
156203
break;
157204
}
@@ -160,12 +207,12 @@ class TExecuteWriteUnit : public TExecutionUnit {
160207
break;
161208
}
162209
case NKikimrDataEvents::TEvWrite::TOperation::OPERATION_INSERT: {
163-
fillOps(rowIdx);
210+
FillOps(scheme, userTable, tableInfo, validatedOperation, rowIdx, ops);
164211
userDb.InsertRow(fullTableId, key, ops);
165212
break;
166213
}
167214
case NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPDATE: {
168-
fillOps(rowIdx);
215+
FillOps(scheme, userTable, tableInfo, validatedOperation, rowIdx, ops);
169216
userDb.UpdateRow(fullTableId, key, ops);
170217
break;
171218
}
@@ -175,6 +222,8 @@ class TExecuteWriteUnit : public TExecutionUnit {
175222
}
176223
}
177224

225+
// Counters
226+
178227
switch (operationType) {
179228
case NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT:
180229
case NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE:
@@ -214,6 +263,7 @@ class TExecuteWriteUnit : public TExecutionUnit {
214263
}
215264

216265
const TValidatedWriteTx::TPtr& writeTx = writeOp->GetWriteTx();
266+
size_t validatedOperationIndex = SIZE_MAX;
217267

218268
DataShard.ReleaseCache(*writeOp);
219269

@@ -357,12 +407,13 @@ class TExecuteWriteUnit : public TExecutionUnit {
357407

358408
KqpCommitLocks(tabletId, kqpLocks, sysLocks, writeVersion, userDb);
359409

360-
TValidatedWriteTx::TPtr& writeTx = writeOp->GetWriteTx();
361410
if (writeTx->HasOperations()) {
362-
for (const auto& validatedOperation : writeTx->GetOperations()) {
411+
for (validatedOperationIndex = 0; validatedOperationIndex < writeTx->GetOperations().size(); ++validatedOperationIndex) {
412+
const TValidatedWriteTxOperation& validatedOperation = writeTx->GetOperations()[validatedOperationIndex];
363413
DoUpdateToUserDb(userDb, validatedOperation, txc);
364414
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Executed write operation for " << *writeOp << " at " << DataShard.TabletID() << ", row count=" << validatedOperation.GetMatrix().GetRowCount());
365415
}
416+
validatedOperationIndex = SIZE_MAX;
366417
} else {
367418
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Skip empty write operation for " << *writeOp << " at " << DataShard.TabletID());
368419
}
@@ -448,7 +499,7 @@ class TExecuteWriteUnit : public TExecutionUnit {
448499
}
449500
return EExecutionStatus::Continue;
450501
} catch (const TNotReadyTabletException&) {
451-
return OnTabletNotReadyException(userDb, *writeOp, txc, ctx);
502+
return OnTabletNotReadyException(userDb, *writeOp, validatedOperationIndex, txc, ctx);
452503
} catch (const TLockedWriteLimitException&) {
453504
userDb.ResetCollectedChanges();
454505

0 commit comments

Comments
 (0)