@@ -3970,6 +3970,190 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
3970
3970
" { items { uint32_value: 4 } items { uint32_value: 40 } }" );
3971
3971
}
3972
3972
3973
+ Y_UNIT_TEST (UncommittedWriteRestartDuringCommitThenBulkErase) {
3974
+ NKikimrConfig::TAppConfig app;
3975
+
3976
+ TPortManager pm;
3977
+ TServerSettings serverSettings (pm.GetPort (2134 ));
3978
+ serverSettings.SetDomainName (" Root" )
3979
+ .SetUseRealThreads (false )
3980
+ .SetDomainPlanResolution (100 )
3981
+ .SetAppConfig (app)
3982
+ // Bug was with non-volatile transactions
3983
+ .SetEnableDataShardVolatileTransactions (false );
3984
+
3985
+ Tests::TServer::TPtr server = new TServer (serverSettings);
3986
+ auto &runtime = *server->GetRuntime ();
3987
+ auto sender = runtime.AllocateEdgeActor ();
3988
+
3989
+ runtime.SetLogPriority (NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
3990
+ runtime.SetLogPriority (NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
3991
+ runtime.SetLogPriority (NKikimrServices::KQP_EXECUTER, NLog::PRI_TRACE);
3992
+ runtime.SetLogPriority (NKikimrServices::KQP_SESSION, NLog::PRI_TRACE);
3993
+
3994
+ InitRoot (server, sender);
3995
+
3996
+ TDisableDataShardLogBatching disableDataShardLogBatching;
3997
+ UNIT_ASSERT_VALUES_EQUAL (
3998
+ KqpSchemeExec (runtime, R"(
3999
+ CREATE TABLE `/Root/table` (key Uint32, value Uint32, PRIMARY KEY (key))
4000
+ WITH (PARTITION_AT_KEYS = (5));
4001
+ )" ),
4002
+ " SUCCESS" );
4003
+
4004
+ // Insert some initial data
4005
+ ExecSQL (server, sender, " UPSERT INTO `/Root/table` (key, value) VALUES (1, 10), (5, 50);" );
4006
+
4007
+ const auto shards = GetTableShards (server, sender, " /Root/table" );
4008
+ const auto tableId = ResolveTableId (server, sender, " /Root/table" );
4009
+ UNIT_ASSERT_VALUES_EQUAL (shards.size (), 2u );
4010
+
4011
+ TString sessionId, txId;
4012
+
4013
+ // Start inserting a couple of rows into the table
4014
+ Cerr << " ... sending initial upsert" << Endl;
4015
+ UNIT_ASSERT_VALUES_EQUAL (
4016
+ KqpSimpleBegin (runtime, sessionId, txId, R"(
4017
+ SELECT key, value FROM `/Root/table` WHERE key = 1;
4018
+ UPSERT INTO `/Root/table` (key, value) VALUES (2, 20), (6, 60);
4019
+ )" ),
4020
+ " { items { uint32_value: 1 } items { uint32_value: 10 } }" );
4021
+
4022
+ // We want to block readsets next
4023
+ std::vector<std::unique_ptr<IEventHandle>> readSets;
4024
+ auto blockReadSets = runtime.AddObserver <TEvTxProcessing::TEvReadSet>([&](TEvTxProcessing::TEvReadSet::TPtr& ev) {
4025
+ readSets.emplace_back (ev.Release ());
4026
+ });
4027
+
4028
+ // Start committing an additional read/write
4029
+ // Note: select on the table flushes accumulated changes first
4030
+ Cerr << " ... sending commit request" << Endl;
4031
+ auto commitFuture = SendRequest (runtime, MakeSimpleRequestRPC (R"(
4032
+ SELECT key, value FROM `/Root/table` ORDER BY key;
4033
+ )" , sessionId, txId, /* commitTx */ true ));
4034
+
4035
+ WaitFor (runtime, [&]{ return readSets.size () >= 2 ; }, " readset exchange" );
4036
+ UNIT_ASSERT_VALUES_EQUAL (readSets.size (), 2u );
4037
+
4038
+ // We want to make sure we block the first progress message when shards reboot
4039
+ std::vector<TActorId> shardActors (shards.size ());
4040
+ UNIT_ASSERT_VALUES_EQUAL (shardActors.size (), 2u );
4041
+ std::vector<std::unique_ptr<IEventHandle>> blockedProgress;
4042
+ auto blockProgressQueue = runtime.AddObserver ([&](TAutoPtr<IEventHandle>& ev) noexcept {
4043
+ switch (ev->GetTypeRewrite ()) {
4044
+ case TEvTablet::TEvBoot::EventType: {
4045
+ auto * msg = ev->Get <TEvTablet::TEvBoot>();
4046
+ Cerr << " ... observed TEvBoot for " << msg->TabletID << " at " << ev->GetRecipientRewrite () << Endl;
4047
+ auto it = std::find (shards.begin (), shards.end (), msg->TabletID );
4048
+ if (it != shards.end ()) {
4049
+ shardActors.at (it - shards.begin ()) = ev->GetRecipientRewrite ();
4050
+ }
4051
+ break ;
4052
+ }
4053
+ case EventSpaceBegin (TKikimrEvents::ES_PRIVATE) + 0 /* EvProgressTransaction */ : {
4054
+ auto it = std::find (shardActors.begin (), shardActors.end (), ev->GetRecipientRewrite ());
4055
+ if (it != shardActors.end ()) {
4056
+ ui64 shardId = shards.at (it - shardActors.begin ());
4057
+ Cerr << " ... blocking TEvProgressTranasction at " << ev->GetRecipientRewrite () << " shard " << shardId << Endl;
4058
+ blockedProgress.emplace_back (ev.Release ());
4059
+ return ;
4060
+ }
4061
+ break ;
4062
+ }
4063
+ }
4064
+ });
4065
+
4066
+ // Clear old readsets and reboot both shards with TEvPoison
4067
+ // This way shards don't have a chance to reply causing an UNDETERMINED error
4068
+ readSets.clear ();
4069
+ for (ui64 shardId : shards) {
4070
+ Cerr << " ... sending TEvPoison to " << shardId << Endl;
4071
+ ForwardToTablet (runtime, shardId, sender, new TEvents::TEvPoison);
4072
+ }
4073
+
4074
+ // Note: we cannot wait for the commit result, since KQP is blocked trying to abort
4075
+
4076
+ // Sleep a little to make sure everything settles
4077
+ Cerr << " ... sleeping for 1 second" << Endl;
4078
+ runtime.SimulateSleep (TDuration::Seconds (1 ));
4079
+
4080
+ UNIT_ASSERT_VALUES_EQUAL (readSets.size (), 2u );
4081
+ UNIT_ASSERT_VALUES_EQUAL (blockedProgress.size (), 2u );
4082
+
4083
+ // Send an erase rows request before the progress queue resumes
4084
+ {
4085
+ Cerr << " ... sending TEvEraseRowsRequest to shard 1 for key 1" << Endl;
4086
+ auto req = std::make_unique<TEvDataShard::TEvEraseRowsRequest>();
4087
+ req->Record .SetTableId (tableId.PathId .LocalPathId );
4088
+ req->Record .SetSchemaVersion (tableId.SchemaVersion );
4089
+ req->Record .AddKeyColumnIds (1 );
4090
+ ui32 key = 1 ;
4091
+ TCell keyCell = TCell::Make (key);
4092
+ req->Record .AddKeyColumns (TSerializedCellVec::Serialize (TArrayRef<const TCell>(&keyCell, 1 )));
4093
+ runtime.Send (new IEventHandle (shardActors.at (0 ), sender, req.release ()), 0 , true );
4094
+ // Give shard 1 a chance to process this request incorrectly
4095
+ Cerr << " ... sleeping for 1 second" << Endl;
4096
+ runtime.SimulateSleep (TDuration::Seconds (1 ));
4097
+ }
4098
+
4099
+ // Unblock progress queue and resend blocked messages
4100
+ Cerr << " ... resending progress queue" << Endl;
4101
+ blockProgressQueue.Remove ();
4102
+ for (auto & ev : blockedProgress) {
4103
+ runtime.Send (ev.release (), 0 , true );
4104
+ }
4105
+ blockedProgress.clear ();
4106
+
4107
+ // This insert must run after the currently committing transaction, so it must fail: either read happens before
4108
+ // the commit and is broken later by the commit, or the read finds a duplicate row and insert fails. Due to a
4109
+ // bug the commit lock might already be broken, causing conflicts not to work properly, and allowing the insert
4110
+ // to overwrite key = 2.
4111
+ Cerr << " ... sending an insert" << Endl;
4112
+ auto insertFuture = KqpSimpleSend (runtime, R"(
4113
+ INSERT INTO `/Root/table` (key, value) VALUES (2, 22);
4114
+ )" );
4115
+
4116
+ // Sleep a little to make sure everything settles
4117
+ Cerr << " ... sleeping for 1 second" << Endl;
4118
+ runtime.SimulateSleep (TDuration::Seconds (1 ));
4119
+
4120
+ // Unblock readsets letting transaction to complete
4121
+ Cerr << " ... resending readsets" << Endl;
4122
+ blockReadSets.Remove ();
4123
+ for (auto & ev : readSets) {
4124
+ runtime.Send (ev.release (), 0 , true );
4125
+ }
4126
+ readSets.clear ();
4127
+
4128
+ // Sleep a little to make sure everything settles
4129
+ Cerr << " ... sleeping for 1 second" << Endl;
4130
+ runtime.SimulateSleep (TDuration::Seconds (1 ));
4131
+
4132
+ // We expect erase to succeed by this point
4133
+ Cerr << " ... checking the erase result" << Endl;
4134
+ {
4135
+ auto ev = runtime.GrabEdgeEventRethrow <TEvDataShard::TEvEraseRowsResponse>(sender);
4136
+ UNIT_ASSERT_VALUES_EQUAL (ev->Get ()->Record .GetStatus (), NKikimrTxDataShard::TEvEraseRowsResponse::OK);
4137
+ }
4138
+
4139
+ // We expect commit to fail with an UNDETERMINED error
4140
+ Cerr << " ... checking the commit result" << Endl;
4141
+ UNIT_ASSERT_VALUES_EQUAL (
4142
+ FormatResult (AwaitResponse (runtime, std::move (commitFuture))),
4143
+ " ERROR: UNDETERMINED" );
4144
+
4145
+ // Now make a read query, we must not observe any partial commits
4146
+ Cerr << " ... checking final table state" << Endl;
4147
+ UNIT_ASSERT_VALUES_EQUAL (
4148
+ KqpSimpleExec (runtime, R"(
4149
+ SELECT key, value FROM `/Root/table`
4150
+ ORDER BY key;
4151
+ )" ),
4152
+ " { items { uint32_value: 2 } items { uint32_value: 20 } }, "
4153
+ " { items { uint32_value: 5 } items { uint32_value: 50 } }, "
4154
+ " { items { uint32_value: 6 } items { uint32_value: 60 } }" );
4155
+ }
4156
+
3973
4157
/* *
3974
4158
* This observer forces newly created nodes to start on particular nodes
3975
4159
*/
0 commit comments