Skip to content

Commit 7a0b0c7

Browse files
authored
Fix immediate EvWrite with TxId=0 (#14565)
1 parent 262bf50 commit 7a0b0c7

File tree

6 files changed

+115
-2
lines changed

6 files changed

+115
-2
lines changed

ydb/core/tx/datashard/datashard_user_db.cpp

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,9 @@ void TDataShardUserDb::CommitChanges(const TTableId& tableId, ui64 lockId, const
334334
auto* info = Self.GetVolatileTxManager().FindByCommitTxId(txId);
335335
if (info && info->State != EVolatileTxState::Aborting) {
336336
if (VolatileDependencies.insert(txId).second && !VolatileTxId) {
337+
if (GlobalTxId == 0) {
338+
throw TNeedGlobalTxId();
339+
}
337340
SetVolatileTxId(GlobalTxId);
338341
}
339342
}
@@ -564,6 +567,9 @@ void TDataShardUserDb::CheckWriteConflicts(const TTableId& tableId, TArrayRef<co
564567
} else if (auto* cached = Self.GetConflictsCache().GetTableCache(localTableId).FindUncommittedWrites(keyCells)) {
565568
for (ui64 txId : *cached) {
566569
BreakWriteConflict(txId);
570+
if (NeedGlobalTxId) {
571+
throw TNeedGlobalTxId();
572+
}
567573
}
568574
return;
569575
} else {
@@ -581,6 +587,10 @@ void TDataShardUserDb::CheckWriteConflicts(const TTableId& tableId, TArrayRef<co
581587
nullptr, txObserver
582588
);
583589

590+
if (NeedGlobalTxId) {
591+
throw TNeedGlobalTxId();
592+
}
593+
584594
if (res.Ready == NTable::EReady::Page) {
585595
if (mustFindConflicts || LockTxId) {
586596
// We must gather all conflicts
@@ -590,6 +600,9 @@ void TDataShardUserDb::CheckWriteConflicts(const TTableId& tableId, TArrayRef<co
590600
// Upgrade to volatile ordered commit and ignore the page fault
591601
if (!VolatileCommitOrdered) {
592602
if (!VolatileTxId) {
603+
if (GlobalTxId == 0) {
604+
throw TNeedGlobalTxId();
605+
}
593606
SetVolatileTxId(GlobalTxId);
594607
}
595608
VolatileCommitOrdered = true;
@@ -634,6 +647,10 @@ void TDataShardUserDb::BreakWriteConflict(ui64 txId) {
634647
// it into a real volatile transaction, it works as usual in
635648
// every sense, only persistent commit order is affected by
636649
// a dependency below.
650+
if (GlobalTxId == 0) {
651+
NeedGlobalTxId = true;
652+
return;
653+
}
637654
SetVolatileTxId(GlobalTxId);
638655
}
639656
VolatileDependencies.insert(info->TxId);

ydb/core/tx/datashard/datashard_user_db.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ class TDataShardUserDb final
208208
absl::flat_hash_set<ui64> CommittedLockChanges;
209209
absl::flat_hash_map<TPathId, TIntrusivePtr<NTable::TDynamicTransactionMap>> TxMaps;
210210
absl::flat_hash_map<TPathId, NTable::ITransactionObserverPtr> TxObservers;
211+
bool NeedGlobalTxId = false;
211212

212213
absl::flat_hash_set<ui64> VolatileCommitTxIds;
213214
YDB_ACCESSOR_DEF(absl::flat_hash_set<ui64>, VolatileDependencies);

ydb/core/tx/datashard/datashard_ut_write.cpp

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ using namespace NSchemeShard;
1515
using namespace Tests;
1616

1717
Y_UNIT_TEST_SUITE(DataShardWrite) {
18+
19+
constexpr i32 operator""_i32(unsigned long long val) { return static_cast<i32>(val); }
20+
constexpr ui32 operator""_ui32(unsigned long long val) { return static_cast<ui32>(val); }
21+
1822
const TString expectedTableState = "key = 0, value = 1\nkey = 2, value = 3\nkey = 4, value = 5\n";
1923

2024
std::tuple<TTestActorRuntime&, Tests::TServer::TPtr, TActorId> TestCreateServer(std::optional<TServerSettings> serverSettings = {}) {
@@ -1687,5 +1691,80 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
16871691
}
16881692
}
16891693

1690-
} // Y_UNIT_TEST_SUITE
1694+
Y_UNIT_TEST(DelayedVolatileTxAndEvWrite) {
1695+
TPortManager pm;
1696+
TServerSettings serverSettings(pm.GetPort(2134));
1697+
serverSettings.SetDomainName("Root")
1698+
.SetUseRealThreads(false)
1699+
.SetEnableDataShardVolatileTransactions(true);
1700+
1701+
auto [runtime, server, sender] = TestCreateServer(serverSettings);
1702+
1703+
TDisableDataShardLogBatching disableDataShardLogBatching;
1704+
1705+
UNIT_ASSERT_VALUES_EQUAL(
1706+
KqpSchemeExec(runtime, R"(
1707+
CREATE TABLE `/Root/table` (key int, a int, b int, c int, PRIMARY KEY (key))
1708+
WITH (PARTITION_AT_KEYS = (10));
1709+
)"),
1710+
"SUCCESS");
1711+
1712+
const auto shards = GetTableShards(server, sender, "/Root/table");
1713+
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u);
1714+
const auto tableId = ResolveTableId(server, sender, "/Root/table");
1715+
1716+
auto [tablesMap, ownerId_] = GetTablesByPathId(server, shards.at(0));
1717+
1718+
// Start blocking readsets
1719+
TBlockEvents<TEvTxProcessing::TEvReadSet> blockedReadSets(runtime);
1720+
1721+
// Prepare a distributed upsert
1722+
Cerr << "... starting a distributed upsert" << Endl;
1723+
auto upsertFuture = KqpSimpleSend(runtime, R"(
1724+
UPSERT INTO `/Root/table` (key, a, b, c) VALUES (1, 2, 2, 2), (11, 12, 12, 12);
1725+
)");
1726+
runtime.WaitFor("blocked readsets", [&]{ return blockedReadSets.size() >= 4; });
1727+
1728+
// 1. Make an upsert to (key, b)
1729+
{
1730+
Cerr << "... making a write to " << shards.at(0) << Endl;
1731+
auto req = MakeWriteRequest(
1732+
std::nullopt,
1733+
NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE,
1734+
NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT,
1735+
tableId,
1736+
{ 1_ui32, 3_ui32 },
1737+
{ TCell::Make(1_i32), TCell::Make(3_i32) });
1738+
Write(runtime, sender, shards.at(0), std::move(req));
1739+
}
1740+
1741+
// 1. Make an upsert to (key, c)
1742+
{
1743+
Cerr << "... making a write to " << shards.at(0) << Endl;
1744+
auto req = MakeWriteRequest(
1745+
std::nullopt,
1746+
NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE,
1747+
NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT,
1748+
tableId,
1749+
{ 1_ui32, 4_ui32 },
1750+
{ TCell::Make(1_i32), TCell::Make(4_i32) });
1751+
Write(runtime, sender, shards.at(0), std::move(req));
1752+
}
1753+
1754+
// Unblock readsets
1755+
blockedReadSets.Stop().Unblock();
1756+
1757+
runtime.SimulateSleep(TDuration::MilliSeconds(1));
1758+
1759+
// Make a validating read, the volatile tx changes must not be lost
1760+
Cerr << "... validating table" << Endl;
1761+
UNIT_ASSERT_VALUES_EQUAL(
1762+
KqpSimpleExec(runtime, R"(
1763+
SELECT key, a, b, c FROM `/Root/table` ORDER BY key;
1764+
)"),
1765+
"{ items { int32_value: 1 } items { int32_value: 2 } items { int32_value: 3 } items { int32_value: 4 } }, "
1766+
"{ items { int32_value: 11 } items { int32_value: 12 } items { int32_value: 12 } items { int32_value: 12 } }");
1767+
}
1768+
1769+
} // Y_UNIT_TEST_SUITE(DataShardWrite)
16911770
} // namespace NKikimr

ydb/core/tx/datashard/execute_write_unit.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,7 @@ class TExecuteWriteUnit : public TExecutionUnit {
392392
if (commitTxIds) {
393393
TVector<ui64> participants(awaitingDecisions.begin(), awaitingDecisions.end());
394394
DataShard.GetVolatileTxManager().PersistAddVolatileTx(
395-
txId,
395+
userDb.GetVolatileTxId(),
396396
writeVersion,
397397
commitTxIds,
398398
userDb.GetVolatileDependencies(),

ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2156,6 +2156,21 @@ std::unique_ptr<NEvents::TDataEvents::TEvWrite> MakeWriteRequest(std::optional<u
21562156
return evWrite;
21572157
}
21582158

2159+
std::unique_ptr<NEvents::TDataEvents::TEvWrite> MakeWriteRequest(std::optional<ui64> txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWrite_TOperation::EOperationType operationType, const TTableId& tableId, const std::vector<ui32>& columnIds, const std::vector<TCell>& cells) {
2160+
UNIT_ASSERT((cells.size() % columnIds.size()) == 0);
2161+
2162+
TSerializedCellMatrix matrix(cells, cells.size() / columnIds.size(), columnIds.size());
2163+
TString blobData = matrix.ReleaseBuffer();
2164+
2165+
std::unique_ptr<NKikimr::NEvents::TDataEvents::TEvWrite> evWrite = txId
2166+
? std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(*txId, txMode)
2167+
: std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txMode);
2168+
ui64 payloadIndex = NKikimr::NEvWrite::TPayloadWriter<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
2169+
evWrite->AddOperation(operationType, tableId, columnIds, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC);
2170+
2171+
return evWrite;
2172+
}
2173+
21592174
std::unique_ptr<NEvents::TDataEvents::TEvWrite> MakeWriteRequestOneKeyValue(std::optional<ui64> txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWrite_TOperation::EOperationType operationType, const TTableId& tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui64 key, ui64 value) {
21602175
UNIT_ASSERT_VALUES_EQUAL(columns.size(), 2);
21612176

ydb/core/tx/datashard/ut_common/datashard_ut_common.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -789,6 +789,7 @@ void ExecSQL(Tests::TServer::TPtr server,
789789
TRowVersion AcquireReadSnapshot(TTestActorRuntime& runtime, const TString& databaseName, ui32 nodeIndex = 0);
790790

791791
std::unique_ptr<NEvents::TDataEvents::TEvWrite> MakeWriteRequest(std::optional<ui64> txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWrite_TOperation::EOperationType operationType, const TTableId& tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, ui64 seed = 0);
792+
std::unique_ptr<NEvents::TDataEvents::TEvWrite> MakeWriteRequest(std::optional<ui64> txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWrite_TOperation::EOperationType operationType, const TTableId& tableId, const std::vector<ui32>& columnIds, const std::vector<TCell>& cells);
792793
std::unique_ptr<NEvents::TDataEvents::TEvWrite> MakeWriteRequestOneKeyValue(std::optional<ui64> txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWrite_TOperation::EOperationType operationType, const TTableId& tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui64 key, ui64 value);
793794

794795
NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, std::unique_ptr<NEvents::TDataEvents::TEvWrite>&& request, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED);

0 commit comments

Comments
 (0)