Skip to content

Commit 6f755c8

Browse files
committed
Merge pull request #18209 from nikvas0/datashard-evwrite-fix
[25-1] Merge datashard precharge (EvWrite fix)
1 parent 003bbcc commit 6f755c8

File tree

4 files changed

+132
-36
lines changed

4 files changed

+132
-36
lines changed

ydb/core/kqp/ut/query/kqp_query_ut.cpp

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#include <fmt/format.h>
12
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
23

34
#include <ydb/core/tx/datashard/datashard_failpoints.h>
@@ -2637,6 +2638,43 @@ Y_UNIT_TEST_SUITE(KqpQuery) {
26372638
}
26382639
}
26392640

2641+
Y_UNIT_TEST_TWIN(ExecuteWriteQuery, UseSink) {
2642+
using namespace fmt::literals;
2643+
2644+
NKikimrConfig::TAppConfig appConfig;
2645+
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
2646+
appConfig.MutableTableServiceConfig()->SetEnableCreateTableAs(true);
2647+
auto settings = TKikimrSettings()
2648+
.SetAppConfig(appConfig)
2649+
.SetWithSampleTables(true)
2650+
.SetEnableTempTables(true);
2651+
2652+
TKikimrRunner kikimr(settings);
2653+
auto client = kikimr.GetQueryClient();
2654+
2655+
{ // Just generate table
2656+
const auto sql = fmt::format(R"(
2657+
CREATE TABLE test_table (
2658+
PRIMARY KEY (id)
2659+
) AS SELECT
2660+
ROW_NUMBER() OVER w AS id, data
2661+
FROM
2662+
AS_TABLE(ListReplicate(<|data: '{data}'|>, 500000))
2663+
WINDOW
2664+
w AS (ORDER BY data))",
2665+
"data"_a = std::string(137, 'a')
2666+
);
2667+
const auto result = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
2668+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2669+
}
2670+
2671+
Cerr << TInstant::Now() << " --------------- Start update ---------------\n";
2672+
2673+
const auto hangingResult = client.ExecuteQuery(R"(
2674+
UPDATE test_table SET data = "a"
2675+
)", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
2676+
UNIT_ASSERT_VALUES_EQUAL_C(hangingResult.GetStatus(), EStatus::SUCCESS, hangingResult.GetIssues().ToString());
2677+
}
26402678
}
26412679

26422680
} // namespace NKqp

ydb/core/tx/datashard/datashard_user_db.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,16 @@ void TDataShardUserDb::EraseRow(
178178
Counters.EraseRowBytes += keyBytes + 8;
179179
}
180180

181+
bool TDataShardUserDb::PrechargeRow(
182+
const TTableId& tableId,
183+
const TArrayRef<const TRawTypeValue> key)
184+
{
185+
auto localTableId = Self.GetLocalTableId(tableId);
186+
Y_ENSURE(localTableId != 0, "Unexpected PrechargeRow for an unknown table");
187+
188+
return Db.Precharge(localTableId, key, key, {}, 0, Max<ui64>(), Max<ui64>());
189+
}
190+
181191
void TDataShardUserDb::IncreaseUpdateCounters(
182192
const TArrayRef<const TRawTypeValue> key,
183193
const TArrayRef<const NIceDb::TUpdateOp> ops)

ydb/core/tx/datashard/datashard_user_db.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,10 @@ class TDataShardUserDb final
176176
NMiniKQL::TEngineHostCounters& GetCounters();
177177
const NMiniKQL::TEngineHostCounters& GetCounters() const;
178178

179+
bool PrechargeRow(
180+
const TTableId& tableId,
181+
const TArrayRef<const TRawTypeValue> key);
182+
179183
private:
180184
static TSmallVec<TCell> ConvertTableKeys(const TArrayRef<const TRawTypeValue> key);
181185

ydb/core/tx/datashard/execute_write_unit.cpp

Lines changed: 80 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,71 @@ class TExecuteWriteUnit : public TExecutionUnit {
8484
return false;
8585
}
8686

87-
EExecutionStatus OnTabletNotReadyException(TDataShardUserDb& userDb, TWriteOperation& writeOp, TTransactionContext& txc, const TActorContext& ctx) {
87+
void FillOps(const NTable::TScheme& scheme, const TUserTable& userTable, const NTable::TScheme::TTableInfo& tableInfo, const TValidatedWriteTxOperation& validatedOperation, ui32 rowIdx, TSmallVec<NTable::TUpdateOp>& ops) {
88+
const TSerializedCellMatrix& matrix = validatedOperation.GetMatrix();
89+
const auto& columnIds = validatedOperation.GetColumnIds();
90+
91+
ops.clear();
92+
Y_ENSURE(matrix.GetColCount() >= userTable.KeyColumnIds.size());
93+
ops.reserve(matrix.GetColCount() - userTable.KeyColumnIds.size());
94+
95+
for (ui16 valueColIdx = userTable.KeyColumnIds.size(); valueColIdx < matrix.GetColCount(); ++valueColIdx) {
96+
ui32 columnTag = columnIds[valueColIdx];
97+
const TCell& cell = matrix.GetCell(rowIdx, valueColIdx);
98+
99+
const NScheme::TTypeId vtypeId = scheme.GetColumnInfo(&tableInfo, columnTag)->PType.GetTypeId();
100+
ops.emplace_back(columnTag, NTable::ECellOp::Set, cell.IsNull() ? TRawTypeValue() : TRawTypeValue(cell.Data(), cell.Size(), vtypeId));
101+
}
102+
};
103+
104+
void FillKey(const NTable::TScheme& scheme, const TUserTable& userTable, const NTable::TScheme::TTableInfo& tableInfo, const TValidatedWriteTxOperation& validatedOperation, ui32 rowIdx, TSmallVec<TRawTypeValue>& key) {
105+
const TSerializedCellMatrix& matrix = validatedOperation.GetMatrix();
106+
107+
key.clear();
108+
key.reserve(userTable.KeyColumnIds.size());
109+
for (ui16 keyColIdx = 0; keyColIdx < userTable.KeyColumnIds.size(); ++keyColIdx) {
110+
const TCell& cell = matrix.GetCell(rowIdx, keyColIdx);
111+
ui32 keyCol = tableInfo.KeyColumns[keyColIdx];
112+
if (cell.IsNull()) {
113+
key.emplace_back();
114+
} else {
115+
NScheme::TTypeId vtypeId = scheme.GetColumnInfo(&tableInfo, keyCol)->PType.GetTypeId();
116+
key.emplace_back(cell.Data(), cell.Size(), vtypeId);
117+
}
118+
}
119+
};
120+
121+
EExecutionStatus OnTabletNotReadyException(TDataShardUserDb& userDb, TWriteOperation& writeOp, size_t operationIndexToPrecharge, TTransactionContext& txc, const TActorContext& ctx) {
88122
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Tablet " << DataShard.TabletID() << " is not ready for " << writeOp << " execution");
123+
124+
// Precharge
125+
if (operationIndexToPrecharge != SIZE_MAX) {
126+
const TValidatedWriteTx::TPtr& writeTx = writeOp.GetWriteTx();
127+
for (size_t operationIndex = operationIndexToPrecharge; operationIndex < writeTx->GetOperations().size(); ++operationIndex) {
128+
const TValidatedWriteTxOperation& validatedOperation = writeTx->GetOperations()[operationIndex];
129+
const ui64 tableId = validatedOperation.GetTableId().PathId.LocalPathId;
130+
const TTableId fullTableId(DataShard.GetPathOwnerId(), tableId);
131+
const TUserTable& userTable = *DataShard.GetUserTables().at(tableId);
132+
133+
const NTable::TScheme& scheme = txc.DB.GetScheme();
134+
const NTable::TScheme::TTableInfo& tableInfo = *scheme.GetTableInfo(userTable.LocalTid);
135+
136+
const TSerializedCellMatrix& matrix = validatedOperation.GetMatrix();
137+
const auto operationType = validatedOperation.GetOperationType();
138+
139+
TSmallVec<TRawTypeValue> key;
140+
141+
if (operationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_INSERT ||
142+
operationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPDATE ||
143+
userDb.NeedToReadBeforeWrite(fullTableId))
144+
{
145+
for (ui32 rowIdx = 0; rowIdx < matrix.GetRowCount(); ++rowIdx) {
146+
FillKey(scheme, userTable, tableInfo, validatedOperation, rowIdx, key);
147+
userDb.PrechargeRow(fullTableId, key);
148+
}
149+
}
150+
}
151+
}
89152

90153
DataShard.IncCounter(COUNTER_TX_TABLET_NOT_READY);
91154

@@ -109,51 +172,28 @@ class TExecuteWriteUnit : public TExecutionUnit {
109172
const TUserTable& userTable = *DataShard.GetUserTables().at(tableId);
110173

111174
const NTable::TScheme& scheme = txc.DB.GetScheme();
112-
const NTable::TScheme::TTableInfo* tableInfo = scheme.GetTableInfo(userTable.LocalTid);
113-
114-
TSmallVec<TRawTypeValue> key;
115-
TSmallVec<NTable::TUpdateOp> ops;
175+
const NTable::TScheme::TTableInfo& tableInfo = *scheme.GetTableInfo(userTable.LocalTid);
116176

117177
const TSerializedCellMatrix& matrix = validatedOperation.GetMatrix();
118178
const auto operationType = validatedOperation.GetOperationType();
119179

120-
auto fillOps = [&](ui32 rowIdx) {
121-
ops.clear();
122-
Y_ABORT_UNLESS(matrix.GetColCount() >= userTable.KeyColumnIds.size());
123-
ops.reserve(matrix.GetColCount() - userTable.KeyColumnIds.size());
124-
125-
for (ui16 valueColIdx = userTable.KeyColumnIds.size(); valueColIdx < matrix.GetColCount(); ++valueColIdx) {
126-
ui32 columnTag = validatedOperation.GetColumnIds()[valueColIdx];
127-
const TCell& cell = matrix.GetCell(rowIdx, valueColIdx);
180+
TSmallVec<TRawTypeValue> key;
181+
TSmallVec<NTable::TUpdateOp> ops;
128182

129-
const NScheme::TTypeId vtypeId = scheme.GetColumnInfo(tableInfo, columnTag)->PType.GetTypeId();
130-
ops.emplace_back(columnTag, NTable::ECellOp::Set, cell.IsNull() ? TRawTypeValue() : TRawTypeValue(cell.Data(), cell.Size(), vtypeId));
131-
}
132-
};
183+
// Main update cycle
133184

134185
for (ui32 rowIdx = 0; rowIdx < matrix.GetRowCount(); ++rowIdx)
135186
{
136-
key.clear();
137-
key.reserve(userTable.KeyColumnIds.size());
138-
for (ui16 keyColIdx = 0; keyColIdx < userTable.KeyColumnIds.size(); ++keyColIdx) {
139-
const TCell& cell = matrix.GetCell(rowIdx, keyColIdx);
140-
ui32 keyCol = tableInfo->KeyColumns[keyColIdx];
141-
if (cell.IsNull()) {
142-
key.emplace_back();
143-
} else {
144-
NScheme::TTypeId vtypeId = scheme.GetColumnInfo(tableInfo, keyCol)->PType.GetTypeId();
145-
key.emplace_back(cell.Data(), cell.Size(), vtypeId);
146-
}
147-
}
187+
FillKey(scheme, userTable, tableInfo, validatedOperation, rowIdx, key);
148188

149189
switch (operationType) {
150190
case NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT: {
151-
fillOps(rowIdx);
191+
FillOps(scheme, userTable, tableInfo, validatedOperation, rowIdx, ops);
152192
userDb.UpsertRow(fullTableId, key, ops);
153193
break;
154194
}
155195
case NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE: {
156-
fillOps(rowIdx);
196+
FillOps(scheme, userTable, tableInfo, validatedOperation, rowIdx, ops);
157197
userDb.ReplaceRow(fullTableId, key, ops);
158198
break;
159199
}
@@ -162,12 +202,12 @@ class TExecuteWriteUnit : public TExecutionUnit {
162202
break;
163203
}
164204
case NKikimrDataEvents::TEvWrite::TOperation::OPERATION_INSERT: {
165-
fillOps(rowIdx);
205+
FillOps(scheme, userTable, tableInfo, validatedOperation, rowIdx, ops);
166206
userDb.InsertRow(fullTableId, key, ops);
167207
break;
168208
}
169209
case NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPDATE: {
170-
fillOps(rowIdx);
210+
FillOps(scheme, userTable, tableInfo, validatedOperation, rowIdx, ops);
171211
userDb.UpdateRow(fullTableId, key, ops);
172212
break;
173213
}
@@ -177,6 +217,8 @@ class TExecuteWriteUnit : public TExecutionUnit {
177217
}
178218
}
179219

220+
// Counters
221+
180222
switch (operationType) {
181223
case NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT:
182224
case NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE:
@@ -216,6 +258,7 @@ class TExecuteWriteUnit : public TExecutionUnit {
216258
}
217259

218260
const TValidatedWriteTx::TPtr& writeTx = writeOp->GetWriteTx();
261+
size_t validatedOperationIndex = SIZE_MAX;
219262

220263
DataShard.ReleaseCache(*writeOp);
221264

@@ -360,12 +403,13 @@ class TExecuteWriteUnit : public TExecutionUnit {
360403

361404
KqpCommitLocks(tabletId, kqpLocks, sysLocks, writeVersion, userDb);
362405

363-
TValidatedWriteTx::TPtr& writeTx = writeOp->GetWriteTx();
364406
if (writeTx->HasOperations()) {
365-
for (const auto& validatedOperation : writeTx->GetOperations()) {
407+
for (validatedOperationIndex = 0; validatedOperationIndex < writeTx->GetOperations().size(); ++validatedOperationIndex) {
408+
const TValidatedWriteTxOperation& validatedOperation = writeTx->GetOperations()[validatedOperationIndex];
366409
DoUpdateToUserDb(userDb, validatedOperation, txc);
367410
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Executed write operation for " << *writeOp << " at " << DataShard.TabletID() << ", row count=" << validatedOperation.GetMatrix().GetRowCount());
368411
}
412+
validatedOperationIndex = SIZE_MAX;
369413
} else {
370414
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Skip empty write operation for " << *writeOp << " at " << DataShard.TabletID());
371415
}
@@ -451,7 +495,7 @@ class TExecuteWriteUnit : public TExecutionUnit {
451495
}
452496
return EExecutionStatus::Continue;
453497
} catch (const TNotReadyTabletException&) {
454-
return OnTabletNotReadyException(userDb, *writeOp, txc, ctx);
498+
return OnTabletNotReadyException(userDb, *writeOp, validatedOperationIndex, txc, ctx);
455499
} catch (const TLockedWriteLimitException&) {
456500
userDb.ResetCollectedChanges();
457501

0 commit comments

Comments
 (0)