Skip to content

Commit 7a9a4c6

Browse files
authored
Merge pull request #18209 from nikvas0/datashard-evwrite-fix
[25-1] Merge datashard precharge (EvWrite fix)
2 parents 15e4573 + 9f997e3 commit 7a9a4c6

File tree

4 files changed

+132
-36
lines changed

4 files changed

+132
-36
lines changed

ydb/core/kqp/ut/query/kqp_query_ut.cpp

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#include <fmt/format.h>
12
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
23

34
#include <ydb/core/tx/datashard/datashard_failpoints.h>
@@ -2637,6 +2638,43 @@ Y_UNIT_TEST_SUITE(KqpQuery) {
26372638
}
26382639
}
26392640

2641+
Y_UNIT_TEST_TWIN(ExecuteWriteQuery, UseSink) {
2642+
using namespace fmt::literals;
2643+
2644+
NKikimrConfig::TAppConfig appConfig;
2645+
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
2646+
appConfig.MutableTableServiceConfig()->SetEnableCreateTableAs(true);
2647+
auto settings = TKikimrSettings()
2648+
.SetAppConfig(appConfig)
2649+
.SetWithSampleTables(true)
2650+
.SetEnableTempTables(true);
2651+
2652+
TKikimrRunner kikimr(settings);
2653+
auto client = kikimr.GetQueryClient();
2654+
2655+
{ // Just generate table
2656+
const auto sql = fmt::format(R"(
2657+
CREATE TABLE test_table (
2658+
PRIMARY KEY (id)
2659+
) AS SELECT
2660+
ROW_NUMBER() OVER w AS id, data
2661+
FROM
2662+
AS_TABLE(ListReplicate(<|data: '{data}'|>, 500000))
2663+
WINDOW
2664+
w AS (ORDER BY data))",
2665+
"data"_a = std::string(137, 'a')
2666+
);
2667+
const auto result = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
2668+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2669+
}
2670+
2671+
Cerr << TInstant::Now() << " --------------- Start update ---------------\n";
2672+
2673+
const auto hangingResult = client.ExecuteQuery(R"(
2674+
UPDATE test_table SET data = "a"
2675+
)", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
2676+
UNIT_ASSERT_VALUES_EQUAL_C(hangingResult.GetStatus(), EStatus::SUCCESS, hangingResult.GetIssues().ToString());
2677+
}
26402678
}
26412679

26422680
} // namespace NKqp

ydb/core/tx/datashard/datashard_user_db.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,16 @@ void TDataShardUserDb::EraseRow(
178178
Counters.EraseRowBytes += keyBytes + 8;
179179
}
180180

181+
bool TDataShardUserDb::PrechargeRow(
182+
const TTableId& tableId,
183+
const TArrayRef<const TRawTypeValue> key)
184+
{
185+
auto localTableId = Self.GetLocalTableId(tableId);
186+
Y_ENSURE(localTableId != 0, "Unexpected PrechargeRow for an unknown table");
187+
188+
return Db.Precharge(localTableId, key, key, {}, 0, Max<ui64>(), Max<ui64>());
189+
}
190+
181191
void TDataShardUserDb::IncreaseUpdateCounters(
182192
const TArrayRef<const TRawTypeValue> key,
183193
const TArrayRef<const NIceDb::TUpdateOp> ops)

ydb/core/tx/datashard/datashard_user_db.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,10 @@ class TDataShardUserDb final
176176
NMiniKQL::TEngineHostCounters& GetCounters();
177177
const NMiniKQL::TEngineHostCounters& GetCounters() const;
178178

179+
bool PrechargeRow(
180+
const TTableId& tableId,
181+
const TArrayRef<const TRawTypeValue> key);
182+
179183
private:
180184
static TSmallVec<TCell> ConvertTableKeys(const TArrayRef<const TRawTypeValue> key);
181185

ydb/core/tx/datashard/execute_write_unit.cpp

Lines changed: 80 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,71 @@ class TExecuteWriteUnit : public TExecutionUnit {
8282
return false;
8383
}
8484

85-
EExecutionStatus OnTabletNotReadyException(TDataShardUserDb& userDb, TWriteOperation& writeOp, TTransactionContext& txc, const TActorContext& ctx) {
85+
void FillOps(const NTable::TScheme& scheme, const TUserTable& userTable, const NTable::TScheme::TTableInfo& tableInfo, const TValidatedWriteTxOperation& validatedOperation, ui32 rowIdx, TSmallVec<NTable::TUpdateOp>& ops) {
86+
const TSerializedCellMatrix& matrix = validatedOperation.GetMatrix();
87+
const auto& columnIds = validatedOperation.GetColumnIds();
88+
89+
ops.clear();
90+
Y_ENSURE(matrix.GetColCount() >= userTable.KeyColumnIds.size());
91+
ops.reserve(matrix.GetColCount() - userTable.KeyColumnIds.size());
92+
93+
for (ui16 valueColIdx = userTable.KeyColumnIds.size(); valueColIdx < matrix.GetColCount(); ++valueColIdx) {
94+
ui32 columnTag = columnIds[valueColIdx];
95+
const TCell& cell = matrix.GetCell(rowIdx, valueColIdx);
96+
97+
const NScheme::TTypeId vtypeId = scheme.GetColumnInfo(&tableInfo, columnTag)->PType.GetTypeId();
98+
ops.emplace_back(columnTag, NTable::ECellOp::Set, cell.IsNull() ? TRawTypeValue() : TRawTypeValue(cell.Data(), cell.Size(), vtypeId));
99+
}
100+
};
101+
102+
void FillKey(const NTable::TScheme& scheme, const TUserTable& userTable, const NTable::TScheme::TTableInfo& tableInfo, const TValidatedWriteTxOperation& validatedOperation, ui32 rowIdx, TSmallVec<TRawTypeValue>& key) {
103+
const TSerializedCellMatrix& matrix = validatedOperation.GetMatrix();
104+
105+
key.clear();
106+
key.reserve(userTable.KeyColumnIds.size());
107+
for (ui16 keyColIdx = 0; keyColIdx < userTable.KeyColumnIds.size(); ++keyColIdx) {
108+
const TCell& cell = matrix.GetCell(rowIdx, keyColIdx);
109+
ui32 keyCol = tableInfo.KeyColumns[keyColIdx];
110+
if (cell.IsNull()) {
111+
key.emplace_back();
112+
} else {
113+
NScheme::TTypeId vtypeId = scheme.GetColumnInfo(&tableInfo, keyCol)->PType.GetTypeId();
114+
key.emplace_back(cell.Data(), cell.Size(), vtypeId);
115+
}
116+
}
117+
};
118+
119+
EExecutionStatus OnTabletNotReadyException(TDataShardUserDb& userDb, TWriteOperation& writeOp, size_t operationIndexToPrecharge, TTransactionContext& txc, const TActorContext& ctx) {
86120
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Tablet " << DataShard.TabletID() << " is not ready for " << writeOp << " execution");
121+
122+
// Precharge
123+
if (operationIndexToPrecharge != SIZE_MAX) {
124+
const TValidatedWriteTx::TPtr& writeTx = writeOp.GetWriteTx();
125+
for (size_t operationIndex = operationIndexToPrecharge; operationIndex < writeTx->GetOperations().size(); ++operationIndex) {
126+
const TValidatedWriteTxOperation& validatedOperation = writeTx->GetOperations()[operationIndex];
127+
const ui64 tableId = validatedOperation.GetTableId().PathId.LocalPathId;
128+
const TTableId fullTableId(DataShard.GetPathOwnerId(), tableId);
129+
const TUserTable& userTable = *DataShard.GetUserTables().at(tableId);
130+
131+
const NTable::TScheme& scheme = txc.DB.GetScheme();
132+
const NTable::TScheme::TTableInfo& tableInfo = *scheme.GetTableInfo(userTable.LocalTid);
133+
134+
const TSerializedCellMatrix& matrix = validatedOperation.GetMatrix();
135+
const auto operationType = validatedOperation.GetOperationType();
136+
137+
TSmallVec<TRawTypeValue> key;
138+
139+
if (operationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_INSERT ||
140+
operationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPDATE ||
141+
userDb.NeedToReadBeforeWrite(fullTableId))
142+
{
143+
for (ui32 rowIdx = 0; rowIdx < matrix.GetRowCount(); ++rowIdx) {
144+
FillKey(scheme, userTable, tableInfo, validatedOperation, rowIdx, key);
145+
userDb.PrechargeRow(fullTableId, key);
146+
}
147+
}
148+
}
149+
}
87150

88151
DataShard.IncCounter(COUNTER_TX_TABLET_NOT_READY);
89152

@@ -107,51 +170,28 @@ class TExecuteWriteUnit : public TExecutionUnit {
107170
const TUserTable& userTable = *DataShard.GetUserTables().at(tableId);
108171

109172
const NTable::TScheme& scheme = txc.DB.GetScheme();
110-
const NTable::TScheme::TTableInfo* tableInfo = scheme.GetTableInfo(userTable.LocalTid);
111-
112-
TSmallVec<TRawTypeValue> key;
113-
TSmallVec<NTable::TUpdateOp> ops;
173+
const NTable::TScheme::TTableInfo& tableInfo = *scheme.GetTableInfo(userTable.LocalTid);
114174

115175
const TSerializedCellMatrix& matrix = validatedOperation.GetMatrix();
116176
const auto operationType = validatedOperation.GetOperationType();
117177

118-
auto fillOps = [&](ui32 rowIdx) {
119-
ops.clear();
120-
Y_ABORT_UNLESS(matrix.GetColCount() >= userTable.KeyColumnIds.size());
121-
ops.reserve(matrix.GetColCount() - userTable.KeyColumnIds.size());
122-
123-
for (ui16 valueColIdx = userTable.KeyColumnIds.size(); valueColIdx < matrix.GetColCount(); ++valueColIdx) {
124-
ui32 columnTag = validatedOperation.GetColumnIds()[valueColIdx];
125-
const TCell& cell = matrix.GetCell(rowIdx, valueColIdx);
178+
TSmallVec<TRawTypeValue> key;
179+
TSmallVec<NTable::TUpdateOp> ops;
126180

127-
const NScheme::TTypeId vtypeId = scheme.GetColumnInfo(tableInfo, columnTag)->PType.GetTypeId();
128-
ops.emplace_back(columnTag, NTable::ECellOp::Set, cell.IsNull() ? TRawTypeValue() : TRawTypeValue(cell.Data(), cell.Size(), vtypeId));
129-
}
130-
};
181+
// Main update cycle
131182

132183
for (ui32 rowIdx = 0; rowIdx < matrix.GetRowCount(); ++rowIdx)
133184
{
134-
key.clear();
135-
key.reserve(userTable.KeyColumnIds.size());
136-
for (ui16 keyColIdx = 0; keyColIdx < userTable.KeyColumnIds.size(); ++keyColIdx) {
137-
const TCell& cell = matrix.GetCell(rowIdx, keyColIdx);
138-
ui32 keyCol = tableInfo->KeyColumns[keyColIdx];
139-
if (cell.IsNull()) {
140-
key.emplace_back();
141-
} else {
142-
NScheme::TTypeId vtypeId = scheme.GetColumnInfo(tableInfo, keyCol)->PType.GetTypeId();
143-
key.emplace_back(cell.Data(), cell.Size(), vtypeId);
144-
}
145-
}
185+
FillKey(scheme, userTable, tableInfo, validatedOperation, rowIdx, key);
146186

147187
switch (operationType) {
148188
case NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT: {
149-
fillOps(rowIdx);
189+
FillOps(scheme, userTable, tableInfo, validatedOperation, rowIdx, ops);
150190
userDb.UpsertRow(fullTableId, key, ops);
151191
break;
152192
}
153193
case NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE: {
154-
fillOps(rowIdx);
194+
FillOps(scheme, userTable, tableInfo, validatedOperation, rowIdx, ops);
155195
userDb.ReplaceRow(fullTableId, key, ops);
156196
break;
157197
}
@@ -160,12 +200,12 @@ class TExecuteWriteUnit : public TExecutionUnit {
160200
break;
161201
}
162202
case NKikimrDataEvents::TEvWrite::TOperation::OPERATION_INSERT: {
163-
fillOps(rowIdx);
203+
FillOps(scheme, userTable, tableInfo, validatedOperation, rowIdx, ops);
164204
userDb.InsertRow(fullTableId, key, ops);
165205
break;
166206
}
167207
case NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPDATE: {
168-
fillOps(rowIdx);
208+
FillOps(scheme, userTable, tableInfo, validatedOperation, rowIdx, ops);
169209
userDb.UpdateRow(fullTableId, key, ops);
170210
break;
171211
}
@@ -175,6 +215,8 @@ class TExecuteWriteUnit : public TExecutionUnit {
175215
}
176216
}
177217

218+
// Counters
219+
178220
switch (operationType) {
179221
case NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT:
180222
case NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE:
@@ -214,6 +256,7 @@ class TExecuteWriteUnit : public TExecutionUnit {
214256
}
215257

216258
const TValidatedWriteTx::TPtr& writeTx = writeOp->GetWriteTx();
259+
size_t validatedOperationIndex = SIZE_MAX;
217260

218261
DataShard.ReleaseCache(*writeOp);
219262

@@ -357,12 +400,13 @@ class TExecuteWriteUnit : public TExecutionUnit {
357400

358401
KqpCommitLocks(tabletId, kqpLocks, sysLocks, writeVersion, userDb);
359402

360-
TValidatedWriteTx::TPtr& writeTx = writeOp->GetWriteTx();
361403
if (writeTx->HasOperations()) {
362-
for (const auto& validatedOperation : writeTx->GetOperations()) {
404+
for (validatedOperationIndex = 0; validatedOperationIndex < writeTx->GetOperations().size(); ++validatedOperationIndex) {
405+
const TValidatedWriteTxOperation& validatedOperation = writeTx->GetOperations()[validatedOperationIndex];
363406
DoUpdateToUserDb(userDb, validatedOperation, txc);
364407
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Executed write operation for " << *writeOp << " at " << DataShard.TabletID() << ", row count=" << validatedOperation.GetMatrix().GetRowCount());
365408
}
409+
validatedOperationIndex = SIZE_MAX;
366410
} else {
367411
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Skip empty write operation for " << *writeOp << " at " << DataShard.TabletID());
368412
}
@@ -448,7 +492,7 @@ class TExecuteWriteUnit : public TExecutionUnit {
448492
}
449493
return EExecutionStatus::Continue;
450494
} catch (const TNotReadyTabletException&) {
451-
return OnTabletNotReadyException(userDb, *writeOp, txc, ctx);
495+
return OnTabletNotReadyException(userDb, *writeOp, validatedOperationIndex, txc, ctx);
452496
} catch (const TLockedWriteLimitException&) {
453497
userDb.ResetCollectedChanges();
454498

0 commit comments

Comments
 (0)