@@ -197,7 +197,15 @@ class TFixture : public NUnitTest::TBaseFixture {
197
197
TString MakeJsonDoc (const TVector<TTableRecord>& records);
198
198
199
199
void CreateTable (const TString& path);
200
- void WriteToTable (const TString& tablePath,
200
+ void UpsertToTable (const TString& tablePath,
201
+ const TVector<TTableRecord>& records,
202
+ ISession& session,
203
+ TTransactionBase* tx);
204
+ void InsertToTable (const TString& tablePath,
205
+ const TVector<TTableRecord>& records,
206
+ ISession& session,
207
+ TTransactionBase* tx);
208
+ void DeleteFromTable (const TString& tablePath,
201
209
const TVector<TTableRecord>& records,
202
210
ISession& session,
203
211
TTransactionBase* tx);
@@ -1795,7 +1803,7 @@ void TFixture::TestWriteToTopic24()
1795
1803
auto tx = session->BeginTx ();
1796
1804
1797
1805
auto records = MakeTableRecords ();
1798
- WriteToTable (" table_A" , records, *session, tx.get ());
1806
+ UpsertToTable (" table_A" , records, *session, tx.get ());
1799
1807
WriteToTopic (" topic_A" , TEST_MESSAGE_GROUP_ID, MakeJsonDoc (records), tx.get ());
1800
1808
1801
1809
session->CommitTx (*tx, EStatus::SUCCESS);
@@ -2790,7 +2798,7 @@ auto TFixture::MakeJsonDoc(const TVector<TTableRecord>& records) -> TString
2790
2798
return s;
2791
2799
}
2792
2800
2793
- void TFixture::WriteToTable (const TString& tablePath,
2801
+ void TFixture::UpsertToTable (const TString& tablePath,
2794
2802
const TVector<TTableRecord>& records,
2795
2803
ISession& session,
2796
2804
TTransactionBase* tx)
@@ -2810,6 +2818,46 @@ void TFixture::WriteToTable(const TString& tablePath,
2810
2818
}
2811
2819
}
2812
2820
2821
+ void TFixture::InsertToTable (const TString& tablePath,
2822
+ const TVector<TTableRecord>& records,
2823
+ ISession& session,
2824
+ TTransactionBase* tx)
2825
+ {
2826
+ TString query = Sprintf (" DECLARE $key AS Utf8;"
2827
+ " DECLARE $value AS Utf8;"
2828
+ " INSERT INTO `%s` (key, value) VALUES ($key, $value);" ,
2829
+ tablePath.data ());
2830
+
2831
+ for (const auto & r : records) {
2832
+ auto params = TParamsBuilder ()
2833
+ .AddParam (" $key" ).Utf8 (r.Key ).Build ()
2834
+ .AddParam (" $value" ).Utf8 (r.Value ).Build ()
2835
+ .Build ();
2836
+
2837
+ session.Execute (query, tx, false , params);
2838
+ }
2839
+ }
2840
+
2841
+ void TFixture::DeleteFromTable (const TString& tablePath,
2842
+ const TVector<TTableRecord>& records,
2843
+ ISession& session,
2844
+ TTransactionBase* tx)
2845
+ {
2846
+ TString query = Sprintf (" DECLARE $key AS Utf8;"
2847
+ " DECLARE $value AS Utf8;"
2848
+ " DELETE FROM `%s` ON (key, value) VALUES ($key, $value);" ,
2849
+ tablePath.data ());
2850
+
2851
+ for (const auto & r : records) {
2852
+ auto params = TParamsBuilder ()
2853
+ .AddParam (" $key" ).Utf8 (r.Key ).Build ()
2854
+ .AddParam (" $value" ).Utf8 (r.Value ).Build ()
2855
+ .Build ();
2856
+
2857
+ session.Execute (query, tx, false , params);
2858
+ }
2859
+ }
2860
+
2813
2861
size_t TFixture::GetTableRecordsCount (const TString& tablePath)
2814
2862
{
2815
2863
TString query = Sprintf (R"( SELECT COUNT(*) FROM `%s`)" ,
@@ -3566,10 +3614,12 @@ class TFixtureSinks : public TFixture {
3566
3614
void TestSinksOltpWriteToTopicAndTable3 ();
3567
3615
void TestSinksOltpWriteToTopicAndTable4 ();
3568
3616
void TestSinksOltpWriteToTopicAndTable5 ();
3617
+ void TestSinksOltpWriteToTopicAndTable6 ();
3569
3618
3570
3619
void TestSinksOlapWriteToTopicAndTable1 ();
3571
3620
void TestSinksOlapWriteToTopicAndTable2 ();
3572
3621
void TestSinksOlapWriteToTopicAndTable3 ();
3622
+ void TestSinksOlapWriteToTopicAndTable4 ();
3573
3623
};
3574
3624
3575
3625
class TFixtureSinksTable : public TFixtureSinks {
@@ -3759,7 +3809,7 @@ void TFixtureSinks::TestSinksOltpWriteToTopicAndTable2()
3759
3809
auto tx = session->BeginTx ();
3760
3810
3761
3811
auto records = MakeTableRecords ();
3762
- WriteToTable (" table_A" , records, *session, tx.get ());
3812
+ UpsertToTable (" table_A" , records, *session, tx.get ());
3763
3813
3764
3814
WriteToTopic (" topic_A" , TEST_MESSAGE_GROUP_ID, MakeJsonDoc (records), tx.get ());
3765
3815
@@ -3786,6 +3836,7 @@ void TFixtureSinks::TestSinksOltpWriteToTopicAndTable2()
3786
3836
CheckTabletKeys (" topic_B" );
3787
3837
}
3788
3838
3839
+
3789
3840
Y_UNIT_TEST_F (Sinks_Oltp_WriteToTopicAndTable_2_Table, TFixtureSinksTable)
3790
3841
{
3791
3842
TestSinksOltpWriteToTopicAndTable2 ();
@@ -3808,8 +3859,8 @@ void TFixtureSinks::TestSinksOltpWriteToTopicAndTable3()
3808
3859
auto tx = session->BeginTx ();
3809
3860
3810
3861
auto records = MakeTableRecords ();
3811
- WriteToTable (" table_A" , records, *session, tx.get ());
3812
- WriteToTable (" table_B" , records, *session, tx.get ());
3862
+ UpsertToTable (" table_A" , records, *session, tx.get ());
3863
+ UpsertToTable (" table_B" , records, *session, tx.get ());
3813
3864
3814
3865
WriteToTopic (" topic_A" , TEST_MESSAGE_GROUP_ID, MakeJsonDoc (records), tx.get ());
3815
3866
@@ -3860,7 +3911,7 @@ void TFixtureSinks::TestSinksOltpWriteToTopicAndTable4()
3860
3911
session->Execute (R"( SELECT COUNT(*) FROM `table_A`)" , tx1.get (), false );
3861
3912
3862
3913
auto records = MakeTableRecords ();
3863
- WriteToTable (" table_A" , records, *session, tx2.get ());
3914
+ UpsertToTable (" table_A" , records, *session, tx2.get ());
3864
3915
3865
3916
WriteToTopic (" topic_A" , TEST_MESSAGE_GROUP_ID, MakeJsonDoc (records), tx1.get ());
3866
3917
WaitForAcks (" topic_A" , TEST_MESSAGE_GROUP_ID);
@@ -3894,7 +3945,7 @@ void TFixtureSinks::TestSinksOltpWriteToTopicAndTable5()
3894
3945
auto tx = session->BeginTx ();
3895
3946
3896
3947
auto records = MakeTableRecords ();
3897
- WriteToTable (" table_A" , records, *session, tx.get ());
3948
+ UpsertToTable (" table_A" , records, *session, tx.get ());
3898
3949
3899
3950
WriteToTopic (" topic_A" , TEST_MESSAGE_GROUP_ID, MakeJsonDoc (records), tx.get ());
3900
3951
WaitForAcks (" topic_A" , TEST_MESSAGE_GROUP_ID);
@@ -3918,6 +3969,56 @@ Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopicAndTable_5_Query, TFixtureSinksQuery)
3918
3969
TestSinksOltpWriteToTopicAndTable5 ();
3919
3970
}
3920
3971
3972
+ void TFixtureSinks::TestSinksOltpWriteToTopicAndTable6 ()
3973
+ {
3974
+ CreateTopic (" topic_A" );
3975
+ CreateTopic (" topic_B" );
3976
+ CreateRowTable (" /Root/table_A" );
3977
+
3978
+ auto session = CreateSession ();
3979
+ auto tx = session->BeginTx ();
3980
+
3981
+ auto records = MakeTableRecords ();
3982
+ InsertToTable (" table_A" , records, *session, tx.get ());
3983
+
3984
+ WriteToTopic (" topic_A" , TEST_MESSAGE_GROUP_ID, MakeJsonDoc (records), tx.get ());
3985
+
3986
+ WriteToTopic (" topic_B" , TEST_MESSAGE_GROUP_ID, " message #1" , tx.get ());
3987
+ WriteToTopic (" topic_B" , TEST_MESSAGE_GROUP_ID, " message #2" , tx.get ());
3988
+ WriteToTopic (" topic_B" , TEST_MESSAGE_GROUP_ID, " message #3" , tx.get ());
3989
+
3990
+ DeleteFromTable (" table_A" , records, *session, tx.get ());
3991
+
3992
+ session->CommitTx (*tx, EStatus::SUCCESS);
3993
+
3994
+ {
3995
+ auto messages = Read_Exactly_N_Messages_From_Topic (" topic_A" , TEST_CONSUMER, 1 );
3996
+ UNIT_ASSERT_VALUES_EQUAL (messages.front (), MakeJsonDoc (records));
3997
+ }
3998
+
3999
+ {
4000
+ auto messages = Read_Exactly_N_Messages_From_Topic (" topic_B" , TEST_CONSUMER, 3 );
4001
+ UNIT_ASSERT_VALUES_EQUAL (messages.front (), " message #1" );
4002
+ UNIT_ASSERT_VALUES_EQUAL (messages.back (), " message #3" );
4003
+ }
4004
+
4005
+ UNIT_ASSERT_VALUES_EQUAL (GetTableRecordsCount (" table_A" ), 0 );
4006
+
4007
+ CheckTabletKeys (" topic_A" );
4008
+ CheckTabletKeys (" topic_B" );
4009
+ }
4010
+
4011
+
4012
+ Y_UNIT_TEST_F (Sinks_Oltp_WriteToTopicAndTable_6_Table, TFixtureSinksTable)
4013
+ {
4014
+ TestSinksOltpWriteToTopicAndTable6 ();
4015
+ }
4016
+
4017
+ Y_UNIT_TEST_F (Sinks_Oltp_WriteToTopicAndTable_6_Query, TFixtureSinksQuery)
4018
+ {
4019
+ TestSinksOltpWriteToTopicAndTable6 ();
4020
+ }
4021
+
3921
4022
void TFixtureSinks::TestSinksOlapWriteToTopicAndTable1 ()
3922
4023
{
3923
4024
return ; // https://github.com/ydb-platform/ydb/issues/17271
@@ -3928,7 +4029,7 @@ void TFixtureSinks::TestSinksOlapWriteToTopicAndTable1()
3928
4029
auto tx = session->BeginTx ();
3929
4030
3930
4031
auto records = MakeTableRecords ();
3931
- WriteToTable (" table_A" , records, *session, tx.get ());
4032
+ UpsertToTable (" table_A" , records, *session, tx.get ());
3932
4033
WriteToTopic (" topic_A" , TEST_MESSAGE_GROUP_ID, MakeJsonDoc (records), tx.get ());
3933
4034
3934
4035
session->CommitTx (*tx, EStatus::SUCCESS);
@@ -3965,8 +4066,8 @@ void TFixtureSinks::TestSinksOlapWriteToTopicAndTable2()
3965
4066
3966
4067
auto records = MakeTableRecords ();
3967
4068
3968
- WriteToTable (" table_A" , records, *session, tx.get ());
3969
- WriteToTable (" table_B" , records, *session, tx.get ());
4069
+ UpsertToTable (" table_A" , records, *session, tx.get ());
4070
+ UpsertToTable (" table_B" , records, *session, tx.get ());
3970
4071
3971
4072
WriteToTopic (" topic_A" , TEST_MESSAGE_GROUP_ID, MakeJsonDoc (records), tx.get ());
3972
4073
@@ -4014,7 +4115,7 @@ void TFixtureSinks::TestSinksOlapWriteToTopicAndTable3()
4014
4115
auto tx = session->BeginTx ();
4015
4116
4016
4117
auto records = MakeTableRecords ();
4017
- WriteToTable (" table_A" , records, *session, tx.get ());
4118
+ UpsertToTable (" table_A" , records, *session, tx.get ());
4018
4119
4019
4120
WriteToTopic (" topic_A" , TEST_MESSAGE_GROUP_ID, MakeJsonDoc (records), tx.get ());
4020
4121
WaitForAcks (" topic_A" , TEST_MESSAGE_GROUP_ID);
@@ -4038,6 +4139,65 @@ Y_UNIT_TEST_F(Sinks_Olap_WriteToTopicAndTable_3_Query, TFixtureSinksQuery)
4038
4139
TestSinksOlapWriteToTopicAndTable3 ();
4039
4140
}
4040
4141
4142
+ void TFixtureSinks::TestSinksOlapWriteToTopicAndTable4 ()
4143
+ {
4144
+ return ; // https://github.com/ydb-platform/ydb/issues/17271
4145
+ CreateTopic (" topic_A" );
4146
+ CreateTopic (" topic_B" );
4147
+
4148
+ CreateRowTable (" /Root/table_A" );
4149
+ CreateColumnTable (" /Root/table_B" );
4150
+ CreateColumnTable (" /Root/table_C" );
4151
+
4152
+ auto session = CreateSession ();
4153
+ auto tx = session->BeginTx ();
4154
+
4155
+ auto records = MakeTableRecords ();
4156
+
4157
+ InsertToTable (" table_A" , records, *session, tx.get ());
4158
+ InsertToTable (" table_B" , records, *session, tx.get ());
4159
+ UpsertToTable (" table_C" , records, *session, tx.get ());
4160
+
4161
+ WriteToTopic (" topic_A" , TEST_MESSAGE_GROUP_ID, MakeJsonDoc (records), tx.get ());
4162
+
4163
+ const size_t topicMsgCnt = 10 ;
4164
+ for (size_t i = 1 ; i <= topicMsgCnt; ++i) {
4165
+ WriteToTopic (" topic_B" , TEST_MESSAGE_GROUP_ID, " message #" + std::to_string (i), tx.get ());
4166
+ }
4167
+
4168
+ DeleteFromTable (" table_B" , records, *session, tx.get ());
4169
+
4170
+ session->CommitTx (*tx, EStatus::SUCCESS);
4171
+
4172
+ {
4173
+ auto messages = Read_Exactly_N_Messages_From_Topic (" topic_A" , TEST_CONSUMER, 1 );
4174
+ UNIT_ASSERT_VALUES_EQUAL (messages.front (), MakeJsonDoc (records));
4175
+ }
4176
+
4177
+ {
4178
+ auto messages = Read_Exactly_N_Messages_From_Topic (" topic_B" , TEST_CONSUMER, topicMsgCnt);
4179
+ UNIT_ASSERT_VALUES_EQUAL (messages.front (), " message #1" );
4180
+ UNIT_ASSERT_VALUES_EQUAL (messages.back (), " message #" + std::to_string (topicMsgCnt));
4181
+ }
4182
+
4183
+ UNIT_ASSERT_VALUES_EQUAL (GetTableRecordsCount (" table_A" ), records.size ());
4184
+ UNIT_ASSERT_VALUES_EQUAL (GetTableRecordsCount (" table_B" ), 0 );
4185
+ UNIT_ASSERT_VALUES_EQUAL (GetTableRecordsCount (" table_C" ), records.size ());
4186
+
4187
+ CheckTabletKeys (" topic_A" );
4188
+ CheckTabletKeys (" topic_B" );
4189
+ }
4190
+
4191
+ Y_UNIT_TEST_F (Sinks_Olap_WriteToTopicAndTable_4_Table, TFixtureSinksTable)
4192
+ {
4193
+ TestSinksOlapWriteToTopicAndTable4 ();
4194
+ }
4195
+
4196
+ Y_UNIT_TEST_F (Sinks_Olap_WriteToTopicAndTable_4_Query, TFixtureSinksQuery)
4197
+ {
4198
+ TestSinksOlapWriteToTopicAndTable4 ();
4199
+ }
4200
+
4041
4201
void TFixture::TestWriteRandomSizedMessagesInWideTransactions ()
4042
4202
{
4043
4203
// The test verifies the simultaneous execution of several transactions. There is a topic
0 commit comments