Skip to content

Commit 47cd6ea

Browse files
committed
Fix lost commit without write (EvWrite) (#18634)
1 parent 45af15c commit 47cd6ea

File tree

2 files changed

+84
-1
lines changed

2 files changed

+84
-1
lines changed

ydb/core/kqp/common/kqp_tx_manager.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {
5151
if (action & EAction::WRITE) {
5252
ReadOnly = false;
5353
}
54+
++ActionsCount;
5455
}
5556

5657
void AddTopic(ui64 topicId, const TString& path) override {
@@ -310,7 +311,8 @@ class TKqpTransactionManager : public IKqpTransactionManager {
310311
}
311312

312313
bool NeedCommit() const override {
313-
const bool dontNeedCommit = IsEmpty() || IsReadOnly() && (IsSingleShard() || HasSnapshot());
314+
AFL_ENSURE(ActionsCount != 1 || IsSingleShard()); // ActionsCount == 1 then IsSingleShard()
315+
const bool dontNeedCommit = IsEmpty() || IsReadOnly() && ((ActionsCount == 1) || HasSnapshot());
314316
return !dontNeedCommit;
315317
}
316318

@@ -515,6 +517,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {
515517
THashSet<ui64> ShardsIds;
516518
THashMap<ui64, TShardInfo> ShardsInfo;
517519
std::unordered_set<TString> TablePathes;
520+
ui64 ActionsCount = 0;
518521

519522
THashSet<ui32> ParticipantNodes;
520523

ydb/core/kqp/ut/tx/kqp_locks_tricky_ut.cpp

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <ydb/core/kqp/common/kqp.h>
77
#include <ydb/core/kqp/gateway/kqp_metadata_loader.h>
88
#include <ydb/core/kqp/host/kqp_host_impl.h>
9+
#include <ydb/core/tx/data_events/events.h>
910

1011
#include <ydb-cpp-sdk/client/proto/accessor.h>
1112
#include <ydb-cpp-sdk/client/table/table.h>
@@ -246,5 +247,84 @@ Y_UNIT_TEST_SUITE(KqpLocksTricky) {
246247
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
247248
}
248249
}
250+
251+
Y_UNIT_TEST(TestNoWrite) {
252+
NKikimrConfig::TAppConfig appConfig;
253+
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(true);
254+
255+
auto setting = NKikimrKqp::TKqpSetting();
256+
TKikimrSettings settings;
257+
settings.SetAppConfig(appConfig);
258+
settings.SetUseRealThreads(false);
259+
TKikimrRunner kikimr(settings);
260+
auto db = kikimr.GetTableClient();
261+
auto session = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); });
262+
auto deleteSession = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); });
263+
264+
auto& runtime = *kikimr.GetTestServer().GetRuntime();
265+
266+
kikimr.RunCall([&]{ CreateSampleTablesWithIndex(session, false /* no need in table data */); return true; });
267+
268+
{
269+
NYdb::NTable::TExecDataQuerySettings execSettings;
270+
execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
271+
272+
const TString query(Q1_(R"(
273+
SELECT * FROM `/Root/KeyValue` WHERE Key = 3u;
274+
275+
UPDATE `/Root/KeyValue` SET Value = "Test" WHERE Key = 3u AND Value = "Not exists";
276+
)"));
277+
278+
std::vector<std::unique_ptr<IEventHandle>> writes;
279+
bool blockWrites = true;
280+
281+
auto grab = [&](TAutoPtr<IEventHandle> &ev) -> auto {
282+
if (blockWrites && ev->GetTypeRewrite() == NKikimr::NEvents::TDataEvents::TEvWrite::EventType) {
283+
auto* evWrite = ev->Get<NKikimr::NEvents::TDataEvents::TEvWrite>();
284+
UNIT_ASSERT(evWrite->Record.OperationsSize() == 0);
285+
UNIT_ASSERT(evWrite->Record.GetLocks().GetLocks().size() != 0);
286+
writes.emplace_back(ev.Release());
287+
return TTestActorRuntime::EEventAction::DROP;
288+
}
289+
290+
return TTestActorRuntime::EEventAction::PROCESS;
291+
};
292+
293+
TDispatchOptions opts;
294+
opts.FinalEvents.emplace_back([&writes](IEventHandle&) {
295+
return writes.size() > 0;
296+
});
297+
298+
runtime.SetObserverFunc(grab);
299+
300+
auto future = kikimr.RunInThreadPool([&]{
301+
auto txc = TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx();
302+
return session.ExecuteDataQuery(query, txc, execSettings).ExtractValueSync();
303+
});
304+
305+
runtime.DispatchEvents(opts);
306+
UNIT_ASSERT(writes.size() > 0);
307+
308+
blockWrites = false;
309+
310+
const TString deleteQuery(Q1_(R"(
311+
DELETE FROM `/Root/KeyValue` WHERE Key = 3u;
312+
)"));
313+
314+
auto deleteResult = kikimr.RunCall([&]{
315+
auto txc = TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx();
316+
return deleteSession.ExecuteDataQuery(deleteQuery, txc, execSettings).ExtractValueSync();
317+
});
318+
319+
UNIT_ASSERT(deleteResult.IsSuccess());
320+
321+
for(auto& ev: writes) {
322+
runtime.Send(ev.release());
323+
}
324+
325+
auto result = runtime.WaitFuture(future);
326+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());
327+
}
328+
}
249329
}
250330
}

0 commit comments

Comments
 (0)