Skip to content

Commit 69efbfc

Browse files
committed
Fix hasRead for sink (#17844)
1 parent faeaeb2 commit 69efbfc

File tree

1 file changed

+172
-12
lines changed

1 file changed

+172
-12
lines changed

src/client/topic/ut/topic_to_table_ut.cpp

Lines changed: 172 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,15 @@ class TFixture : public NUnitTest::TBaseFixture {
197197
TString MakeJsonDoc(const TVector<TTableRecord>& records);
198198

199199
void CreateTable(const TString& path);
200-
void WriteToTable(const TString& tablePath,
200+
void UpsertToTable(const TString& tablePath,
201+
const TVector<TTableRecord>& records,
202+
ISession& session,
203+
TTransactionBase* tx);
204+
void InsertToTable(const TString& tablePath,
205+
const TVector<TTableRecord>& records,
206+
ISession& session,
207+
TTransactionBase* tx);
208+
void DeleteFromTable(const TString& tablePath,
201209
const TVector<TTableRecord>& records,
202210
ISession& session,
203211
TTransactionBase* tx);
@@ -1795,7 +1803,7 @@ void TFixture::TestWriteToTopic24()
17951803
auto tx = session->BeginTx();
17961804

17971805
auto records = MakeTableRecords();
1798-
WriteToTable("table_A", records, *session, tx.get());
1806+
UpsertToTable("table_A", records, *session, tx.get());
17991807
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), tx.get());
18001808

18011809
session->CommitTx(*tx, EStatus::SUCCESS);
@@ -2790,7 +2798,7 @@ auto TFixture::MakeJsonDoc(const TVector<TTableRecord>& records) -> TString
27902798
return s;
27912799
}
27922800

2793-
void TFixture::WriteToTable(const TString& tablePath,
2801+
void TFixture::UpsertToTable(const TString& tablePath,
27942802
const TVector<TTableRecord>& records,
27952803
ISession& session,
27962804
TTransactionBase* tx)
@@ -2810,6 +2818,46 @@ void TFixture::WriteToTable(const TString& tablePath,
28102818
}
28112819
}
28122820

2821+
void TFixture::InsertToTable(const TString& tablePath,
2822+
const TVector<TTableRecord>& records,
2823+
ISession& session,
2824+
TTransactionBase* tx)
2825+
{
2826+
TString query = Sprintf("DECLARE $key AS Utf8;"
2827+
"DECLARE $value AS Utf8;"
2828+
"INSERT INTO `%s` (key, value) VALUES ($key, $value);",
2829+
tablePath.data());
2830+
2831+
for (const auto& r : records) {
2832+
auto params = TParamsBuilder()
2833+
.AddParam("$key").Utf8(r.Key).Build()
2834+
.AddParam("$value").Utf8(r.Value).Build()
2835+
.Build();
2836+
2837+
session.Execute(query, tx, false, params);
2838+
}
2839+
}
2840+
2841+
void TFixture::DeleteFromTable(const TString& tablePath,
2842+
const TVector<TTableRecord>& records,
2843+
ISession& session,
2844+
TTransactionBase* tx)
2845+
{
2846+
TString query = Sprintf("DECLARE $key AS Utf8;"
2847+
"DECLARE $value AS Utf8;"
2848+
"DELETE FROM `%s` ON (key, value) VALUES ($key, $value);",
2849+
tablePath.data());
2850+
2851+
for (const auto& r : records) {
2852+
auto params = TParamsBuilder()
2853+
.AddParam("$key").Utf8(r.Key).Build()
2854+
.AddParam("$value").Utf8(r.Value).Build()
2855+
.Build();
2856+
2857+
session.Execute(query, tx, false, params);
2858+
}
2859+
}
2860+
28132861
size_t TFixture::GetTableRecordsCount(const TString& tablePath)
28142862
{
28152863
TString query = Sprintf(R"(SELECT COUNT(*) FROM `%s`)",
@@ -3566,10 +3614,12 @@ class TFixtureSinks : public TFixture {
35663614
void TestSinksOltpWriteToTopicAndTable3();
35673615
void TestSinksOltpWriteToTopicAndTable4();
35683616
void TestSinksOltpWriteToTopicAndTable5();
3617+
void TestSinksOltpWriteToTopicAndTable6();
35693618

35703619
void TestSinksOlapWriteToTopicAndTable1();
35713620
void TestSinksOlapWriteToTopicAndTable2();
35723621
void TestSinksOlapWriteToTopicAndTable3();
3622+
void TestSinksOlapWriteToTopicAndTable4();
35733623
};
35743624

35753625
class TFixtureSinksTable : public TFixtureSinks {
@@ -3759,7 +3809,7 @@ void TFixtureSinks::TestSinksOltpWriteToTopicAndTable2()
37593809
auto tx = session->BeginTx();
37603810

37613811
auto records = MakeTableRecords();
3762-
WriteToTable("table_A", records, *session, tx.get());
3812+
UpsertToTable("table_A", records, *session, tx.get());
37633813

37643814
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), tx.get());
37653815

@@ -3786,6 +3836,7 @@ void TFixtureSinks::TestSinksOltpWriteToTopicAndTable2()
37863836
CheckTabletKeys("topic_B");
37873837
}
37883838

3839+
37893840
Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopicAndTable_2_Table, TFixtureSinksTable)
37903841
{
37913842
TestSinksOltpWriteToTopicAndTable2();
@@ -3808,8 +3859,8 @@ void TFixtureSinks::TestSinksOltpWriteToTopicAndTable3()
38083859
auto tx = session->BeginTx();
38093860

38103861
auto records = MakeTableRecords();
3811-
WriteToTable("table_A", records, *session, tx.get());
3812-
WriteToTable("table_B", records, *session, tx.get());
3862+
UpsertToTable("table_A", records, *session, tx.get());
3863+
UpsertToTable("table_B", records, *session, tx.get());
38133864

38143865
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), tx.get());
38153866

@@ -3860,7 +3911,7 @@ void TFixtureSinks::TestSinksOltpWriteToTopicAndTable4()
38603911
session->Execute(R"(SELECT COUNT(*) FROM `table_A`)", tx1.get(), false);
38613912

38623913
auto records = MakeTableRecords();
3863-
WriteToTable("table_A", records, *session, tx2.get());
3914+
UpsertToTable("table_A", records, *session, tx2.get());
38643915

38653916
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), tx1.get());
38663917
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
@@ -3894,7 +3945,7 @@ void TFixtureSinks::TestSinksOltpWriteToTopicAndTable5()
38943945
auto tx = session->BeginTx();
38953946

38963947
auto records = MakeTableRecords();
3897-
WriteToTable("table_A", records, *session, tx.get());
3948+
UpsertToTable("table_A", records, *session, tx.get());
38983949

38993950
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), tx.get());
39003951
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
@@ -3918,6 +3969,56 @@ Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopicAndTable_5_Query, TFixtureSinksQuery)
39183969
TestSinksOltpWriteToTopicAndTable5();
39193970
}
39203971

3972+
void TFixtureSinks::TestSinksOltpWriteToTopicAndTable6()
3973+
{
3974+
CreateTopic("topic_A");
3975+
CreateTopic("topic_B");
3976+
CreateRowTable("/Root/table_A");
3977+
3978+
auto session = CreateSession();
3979+
auto tx = session->BeginTx();
3980+
3981+
auto records = MakeTableRecords();
3982+
InsertToTable("table_A", records, *session, tx.get());
3983+
3984+
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), tx.get());
3985+
3986+
WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #1", tx.get());
3987+
WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #2", tx.get());
3988+
WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #3", tx.get());
3989+
3990+
DeleteFromTable("table_A", records, *session, tx.get());
3991+
3992+
session->CommitTx(*tx, EStatus::SUCCESS);
3993+
3994+
{
3995+
auto messages = Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 1);
3996+
UNIT_ASSERT_VALUES_EQUAL(messages.front(), MakeJsonDoc(records));
3997+
}
3998+
3999+
{
4000+
auto messages = Read_Exactly_N_Messages_From_Topic("topic_B", TEST_CONSUMER, 3);
4001+
UNIT_ASSERT_VALUES_EQUAL(messages.front(), "message #1");
4002+
UNIT_ASSERT_VALUES_EQUAL(messages.back(), "message #3");
4003+
}
4004+
4005+
UNIT_ASSERT_VALUES_EQUAL(GetTableRecordsCount("table_A"), 0);
4006+
4007+
CheckTabletKeys("topic_A");
4008+
CheckTabletKeys("topic_B");
4009+
}
4010+
4011+
4012+
Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopicAndTable_6_Table, TFixtureSinksTable)
4013+
{
4014+
TestSinksOltpWriteToTopicAndTable6();
4015+
}
4016+
4017+
Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopicAndTable_6_Query, TFixtureSinksQuery)
4018+
{
4019+
TestSinksOltpWriteToTopicAndTable6();
4020+
}
4021+
39214022
void TFixtureSinks::TestSinksOlapWriteToTopicAndTable1()
39224023
{
39234024
return; // https://github.com/ydb-platform/ydb/issues/17271
@@ -3928,7 +4029,7 @@ void TFixtureSinks::TestSinksOlapWriteToTopicAndTable1()
39284029
auto tx = session->BeginTx();
39294030

39304031
auto records = MakeTableRecords();
3931-
WriteToTable("table_A", records, *session, tx.get());
4032+
UpsertToTable("table_A", records, *session, tx.get());
39324033
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), tx.get());
39334034

39344035
session->CommitTx(*tx, EStatus::SUCCESS);
@@ -3965,8 +4066,8 @@ void TFixtureSinks::TestSinksOlapWriteToTopicAndTable2()
39654066

39664067
auto records = MakeTableRecords();
39674068

3968-
WriteToTable("table_A", records, *session, tx.get());
3969-
WriteToTable("table_B", records, *session, tx.get());
4069+
UpsertToTable("table_A", records, *session, tx.get());
4070+
UpsertToTable("table_B", records, *session, tx.get());
39704071

39714072
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), tx.get());
39724073

@@ -4014,7 +4115,7 @@ void TFixtureSinks::TestSinksOlapWriteToTopicAndTable3()
40144115
auto tx = session->BeginTx();
40154116

40164117
auto records = MakeTableRecords();
4017-
WriteToTable("table_A", records, *session, tx.get());
4118+
UpsertToTable("table_A", records, *session, tx.get());
40184119

40194120
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), tx.get());
40204121
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
@@ -4038,6 +4139,65 @@ Y_UNIT_TEST_F(Sinks_Olap_WriteToTopicAndTable_3_Query, TFixtureSinksQuery)
40384139
TestSinksOlapWriteToTopicAndTable3();
40394140
}
40404141

4142+
void TFixtureSinks::TestSinksOlapWriteToTopicAndTable4()
4143+
{
4144+
return; // https://github.com/ydb-platform/ydb/issues/17271
4145+
CreateTopic("topic_A");
4146+
CreateTopic("topic_B");
4147+
4148+
CreateRowTable("/Root/table_A");
4149+
CreateColumnTable("/Root/table_B");
4150+
CreateColumnTable("/Root/table_C");
4151+
4152+
auto session = CreateSession();
4153+
auto tx = session->BeginTx();
4154+
4155+
auto records = MakeTableRecords();
4156+
4157+
InsertToTable("table_A", records, *session, tx.get());
4158+
InsertToTable("table_B", records, *session, tx.get());
4159+
UpsertToTable("table_C", records, *session, tx.get());
4160+
4161+
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), tx.get());
4162+
4163+
const size_t topicMsgCnt = 10;
4164+
for (size_t i = 1; i <= topicMsgCnt; ++i) {
4165+
WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #" + std::to_string(i), tx.get());
4166+
}
4167+
4168+
DeleteFromTable("table_B", records, *session, tx.get());
4169+
4170+
session->CommitTx(*tx, EStatus::SUCCESS);
4171+
4172+
{
4173+
auto messages = Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 1);
4174+
UNIT_ASSERT_VALUES_EQUAL(messages.front(), MakeJsonDoc(records));
4175+
}
4176+
4177+
{
4178+
auto messages = Read_Exactly_N_Messages_From_Topic("topic_B", TEST_CONSUMER, topicMsgCnt);
4179+
UNIT_ASSERT_VALUES_EQUAL(messages.front(), "message #1");
4180+
UNIT_ASSERT_VALUES_EQUAL(messages.back(), "message #" + std::to_string(topicMsgCnt));
4181+
}
4182+
4183+
UNIT_ASSERT_VALUES_EQUAL(GetTableRecordsCount("table_A"), records.size());
4184+
UNIT_ASSERT_VALUES_EQUAL(GetTableRecordsCount("table_B"), 0);
4185+
UNIT_ASSERT_VALUES_EQUAL(GetTableRecordsCount("table_C"), records.size());
4186+
4187+
CheckTabletKeys("topic_A");
4188+
CheckTabletKeys("topic_B");
4189+
}
4190+
4191+
Y_UNIT_TEST_F(Sinks_Olap_WriteToTopicAndTable_4_Table, TFixtureSinksTable)
4192+
{
4193+
TestSinksOlapWriteToTopicAndTable4();
4194+
}
4195+
4196+
Y_UNIT_TEST_F(Sinks_Olap_WriteToTopicAndTable_4_Query, TFixtureSinksQuery)
4197+
{
4198+
TestSinksOlapWriteToTopicAndTable4();
4199+
}
4200+
40414201
void TFixture::TestWriteRandomSizedMessagesInWideTransactions()
40424202
{
40434203
// The test verifies the simultaneous execution of several transactions. There is a topic

0 commit comments

Comments
 (0)