@@ -3894,7 +3894,8 @@ Y_UNIT_TEST_SUITE(Cdc) {
3894
3894
MustNotLoseSchemaSnapshot (true );
3895
3895
}
3896
3896
3897
- Y_UNIT_TEST (ShouldBreakLocksOnConcurrentSchemeTx) {
3897
+ template <typename TPrepareFunc, typename TTestFunc>
3898
+ void ShouldBreakLocksOnConcurrentSchemeTx (TPrepareFunc prepare, TTestFunc test, Ydb::StatusIds::StatusCode finalCode = Ydb::StatusIds::ABORTED) {
3898
3899
TPortManager portManager;
3899
3900
TServer::TPtr server = new TServer (TServerSettings (portManager.GetPort (2134 ), {}, DefaultPQConfig ())
3900
3901
.SetUseRealThreads (false )
@@ -3911,6 +3912,8 @@ Y_UNIT_TEST_SUITE(Cdc) {
3911
3912
WaitTxNotification (server, edgeActor, AsyncAlterAddStream (server, " /Root" , " Table" ,
3912
3913
Updates (NKikimrSchemeOp::ECdcStreamFormatJson)));
3913
3914
3915
+ prepare (server, edgeActor);
3916
+
3914
3917
ExecSQL (server, edgeActor, " UPSERT INTO `/Root/Table` (key, value) VALUES (1, 10);" );
3915
3918
3916
3919
TString sessionId;
@@ -3921,17 +3924,89 @@ Y_UNIT_TEST_SUITE(Cdc) {
3921
3924
KqpSimpleContinue (runtime, sessionId, txId, " SELECT key, value FROM `/Root/Table`;" ),
3922
3925
" { items { uint32_value: 1 } items { uint32_value: 11 } }" );
3923
3926
3924
- WaitTxNotification (server, edgeActor, AsyncAlterAddExtraColumn (server, " /Root " , " Table " ) );
3927
+ test (server, edgeActor);
3925
3928
3926
3929
UNIT_ASSERT_VALUES_EQUAL (
3927
3930
KqpSimpleCommit (runtime, sessionId, txId, " SELECT 1;" ),
3928
- " ERROR: ABORTED " );
3931
+ Sprintf ( " ERROR: %s " , Ydb::StatusIds::StatusCode_Name (finalCode). c_str ()) );
3929
3932
3930
3933
WaitForContent (server, edgeActor, " /Root/Table/Stream" , {
3931
3934
R"( {"update":{"value":10},"key":[1]})" ,
3932
3935
});
3933
3936
}
3934
3937
3938
+ void Nop (TServer::TPtr, const TActorId&) {
3939
+ }
3940
+
3941
+ Y_UNIT_TEST (ShouldBreakLocksOnConcurrentAlterTable) {
3942
+ ShouldBreakLocksOnConcurrentSchemeTx (&Nop, [](TServer::TPtr server, const TActorId& edgeActor) {
3943
+ WaitTxNotification (server, edgeActor, AsyncAlterAddExtraColumn (server, " /Root" , " Table" ));
3944
+ });
3945
+ }
3946
+
3947
+ Y_UNIT_TEST (ShouldBreakLocksOnConcurrentMoveTable) {
3948
+ // nop: "Cannot move table with cdc streams"
3949
+ }
3950
+
3951
+ void AddIndex (TServer::TPtr server, const TActorId& edgeActor) {
3952
+ WaitTxNotification (server, edgeActor, AsyncAlterAddIndex (server, " /Root" , " /Root/Table" ,
3953
+ TShardedTableOptions::TIndex{" Index" , {" value" }}));
3954
+ }
3955
+
3956
+ Y_UNIT_TEST (ShouldBreakLocksOnConcurrentAddIndex) {
3957
+ ShouldBreakLocksOnConcurrentSchemeTx (&Nop, &AddIndex);
3958
+ }
3959
+
3960
+ Y_UNIT_TEST (ShouldBreakLocksOnConcurrentMoveIndex) {
3961
+ ShouldBreakLocksOnConcurrentSchemeTx (&AddIndex, [](TServer::TPtr server, const TActorId& edgeActor) {
3962
+ WaitTxNotification (server, edgeActor, AsyncMoveIndex (server, " /Root/Table" , " Index" , " MovedIndex" ));
3963
+ });
3964
+ }
3965
+
3966
+ Y_UNIT_TEST (ShouldBreakLocksOnConcurrentDropIndex) {
3967
+ ShouldBreakLocksOnConcurrentSchemeTx (&AddIndex, [](TServer::TPtr server, const TActorId& edgeActor) {
3968
+ WaitTxNotification (server, edgeActor, AsyncAlterDropIndex (server, " /Root" , " Table" , " Index" ));
3969
+ }, Ydb::StatusIds::UNAVAILABLE);
3970
+ }
3971
+
3972
+ Y_UNIT_TEST (ShouldBreakLocksOnConcurrentCancelBuildIndex) {
3973
+ ui64 buildIndexId = 0 ;
3974
+ auto addIndexWithBlock = [&buildIndexId](TServer::TPtr server, const TActorId&) {
3975
+ auto & runtime = *server->GetRuntime ();
3976
+ TBlockEvents<TEvDataShard::TEvBuildIndexProgressResponse> blockProgress (runtime);
3977
+ buildIndexId = AsyncAlterAddIndex (server, " /Root" , " /Root/Table" , TShardedTableOptions::TIndex{" Index" , {" value" }});
3978
+ runtime.WaitFor (" Progress" , [&]{ return blockProgress.size (); });
3979
+ blockProgress.Stop ();
3980
+ };
3981
+ auto cancelBuildIndex = [&buildIndexId](TServer::TPtr server, const TActorId& edgeActor) {
3982
+ UNIT_ASSERT (buildIndexId != 0 );
3983
+ CancelAddIndex (server, " /Root" , buildIndexId);
3984
+ WaitTxNotification (server, edgeActor, buildIndexId);
3985
+ };
3986
+ ShouldBreakLocksOnConcurrentSchemeTx (addIndexWithBlock, cancelBuildIndex, Ydb::StatusIds::UNAVAILABLE);
3987
+ }
3988
+
3989
+ void AddStream (TServer::TPtr server, const TActorId& edgeActor) {
3990
+ WaitTxNotification (server, edgeActor, AsyncAlterAddStream (server, " /Root" , " Table" ,
3991
+ Updates (NKikimrSchemeOp::ECdcStreamFormatJson, " Stream2" )));
3992
+ }
3993
+
3994
+ Y_UNIT_TEST (ShouldBreakLocksOnConcurrentAddStream) {
3995
+ ShouldBreakLocksOnConcurrentSchemeTx (&Nop, &AddStream);
3996
+ }
3997
+
3998
+ Y_UNIT_TEST (ShouldBreakLocksOnConcurrentAlterStream) {
3999
+ ShouldBreakLocksOnConcurrentSchemeTx (&AddStream, [](TServer::TPtr server, const TActorId& edgeActor) {
4000
+ WaitTxNotification (server, edgeActor, AsyncAlterDisableStream (server, " /Root" , " Table" , " Stream2" ));
4001
+ });
4002
+ }
4003
+
4004
+ Y_UNIT_TEST (ShouldBreakLocksOnConcurrentDropStream) {
4005
+ ShouldBreakLocksOnConcurrentSchemeTx (&AddStream, [](TServer::TPtr server, const TActorId& edgeActor) {
4006
+ WaitTxNotification (server, edgeActor, AsyncAlterDropStream (server, " /Root" , " Table" , " Stream2" ));
4007
+ });
4008
+ }
4009
+
3935
4010
Y_UNIT_TEST (ResolvedTimestampsContinueAfterMerge) {
3936
4011
TPortManager portManager;
3937
4012
TServer::TPtr server = new TServer (TServerSettings (portManager.GetPort (2134 ), {}, DefaultPQConfig ())
0 commit comments