Skip to content

Commit c4b9ff6

Browse files
committed
fix
1 parent 231559c commit c4b9ff6

File tree

2 files changed

+47
-137
lines changed

2 files changed

+47
-137
lines changed

ydb/public/sdk/cpp/src/client/topic/ut/topic_to_table_ut.cpp

Lines changed: 46 additions & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -171,12 +171,10 @@ class TFixture : public NUnitTest::TBaseFixture {
171171
void CreateTable(const TString& path);
172172
void UpsertToTable(const TString& tablePath,
173173
const TVector<TTableRecord>& records,
174-
ISession& session,
175-
TTransactionBase* tx);
174+
NTable::TTransaction* tx);
176175
void InsertToTable(const TString& tablePath,
177176
const TVector<TTableRecord>& records,
178-
ISession& session,
179-
TTransactionBase* tx);
177+
NTable::TTransaction* tx);
180178
void DeleteFromTable(const TString& tablePath,
181179
const TVector<TTableRecord>& records,
182180
NTable::TTransaction* tx);
@@ -1370,8 +1368,8 @@ void TFixture::TestWriteToTopic24()
13701368
NTable::TTransaction tx = BeginTx(tableSession);
13711369

13721370
auto records = MakeTableRecords();
1373-
UpsertToTable("table_A", records, tx.get());
1374-
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), tx.get());
1371+
UpsertToTable("table_A", records, &tx);
1372+
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), &tx);
13751373

13761374
CommitTx(tx, EStatus::SUCCESS);
13771375

@@ -2222,13 +2220,13 @@ auto TFixture::MakeJsonDoc(const TVector<TTableRecord>& records) -> TString
22222220

22232221
void TFixture::UpsertToTable(const TString& tablePath,
22242222
const TVector<TTableRecord>& records,
2225-
ISession& session,
22262223
NTable::TTransaction* tx)
22272224
{
22282225
TString query = Sprintf("DECLARE $key AS Utf8;"
22292226
"DECLARE $value AS Utf8;"
22302227
"UPSERT INTO `%s` (key, value) VALUES ($key, $value);",
22312228
tablePath.data());
2229+
NTable::TSession session = tx->GetSession();
22322230

22332231
for (const auto& r : records) {
22342232
auto params = session.GetParamsBuilder()
@@ -2244,41 +2242,47 @@ void TFixture::UpsertToTable(const TString& tablePath,
22442242

22452243
void TFixture::InsertToTable(const TString& tablePath,
22462244
const TVector<TTableRecord>& records,
2247-
ISession& session,
2248-
TTransactionBase* tx)
2245+
NTable::TTransaction* tx)
22492246
{
22502247
TString query = Sprintf("DECLARE $key AS Utf8;"
22512248
"DECLARE $value AS Utf8;"
22522249
"INSERT INTO `%s` (key, value) VALUES ($key, $value);",
22532250
tablePath.data());
2251+
NTable::TSession session = tx->GetSession();
22542252

22552253
for (const auto& r : records) {
22562254
auto params = TParamsBuilder()
22572255
.AddParam("$key").Utf8(r.Key).Build()
22582256
.AddParam("$value").Utf8(r.Value).Build()
22592257
.Build();
22602258

2261-
session.Execute(query, tx, false, params);
2259+
auto result = session.ExecuteDataQuery(query,
2260+
NYdb::NTable::TTxControl::Tx(*tx),
2261+
params).GetValueSync();
2262+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
22622263
}
22632264
}
22642265

22652266
void TFixture::DeleteFromTable(const TString& tablePath,
22662267
const TVector<TTableRecord>& records,
2267-
ISession& session,
2268-
TTransactionBase* tx)
2268+
NTable::TTransaction* tx)
22692269
{
22702270
TString query = Sprintf("DECLARE $key AS Utf8;"
22712271
"DECLARE $value AS Utf8;"
22722272
"DELETE FROM `%s` ON (key, value) VALUES ($key, $value);",
22732273
tablePath.data());
2274+
NTable::TSession session = tx->GetSession();
22742275

22752276
for (const auto& r : records) {
22762277
auto params = TParamsBuilder()
22772278
.AddParam("$key").Utf8(r.Key).Build()
22782279
.AddParam("$value").Utf8(r.Value).Build()
22792280
.Build();
22802281

2281-
session.Execute(query, tx, false, params);
2282+
auto result = session.ExecuteDataQuery(query,
2283+
NYdb::NTable::TTxControl::Tx(*tx),
2284+
params).GetValueSync();
2285+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
22822286
}
22832287
}
22842288

@@ -2794,30 +2798,19 @@ class TFixtureSinks : public TFixture {
27942798

27952799
void TestSinksOltpWriteToTopic5();
27962800

2797-
void TestSinksOltpWriteToTopicAndTable2();
27982801
void TestSinksOltpWriteToTopicAndTable3();
27992802
void TestSinksOltpWriteToTopicAndTable4();
2800-
void TestSinksOltpWriteToTopicAndTable5();
28012803
void TestSinksOltpWriteToTopicAndTable6();
28022804

28032805
void TestSinksOlapWriteToTopicAndTable1();
28042806
void TestSinksOlapWriteToTopicAndTable2();
2805-
void TestSinksOlapWriteToTopicAndTable3();
28062807
void TestSinksOlapWriteToTopicAndTable4();
28072808
};
28082809

28092810
class TFixtureSinksTable : public TFixtureSinks {
2810-
protected:
2811-
EClientType GetClientType() const override {
2812-
return EClientType::Table;
2813-
}
28142811
};
28152812

28162813
class TFixtureSinksQuery : public TFixtureSinks {
2817-
protected:
2818-
EClientType GetClientType() const override {
2819-
return EClientType::Query;
2820-
}
28212814
};
28222815

28232816
void TFixtureSinks::CreateRowTable(const TString& path)
@@ -2935,7 +2928,7 @@ Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopicAndTable_2, TFixtureSinks)
29352928
NTable::TTransaction tx = BeginTx(tableSession);
29362929

29372930
auto records = MakeTableRecords();
2938-
UpsertToTable("table_A", records, *session, tx.get());
2931+
UpsertToTable("table_A", records, &tx);
29392932

29402933
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), &tx);
29412934

@@ -2963,17 +2956,7 @@ Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopicAndTable_2, TFixtureSinks)
29632956
}
29642957

29652958

2966-
Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopicAndTable_2_Table, TFixtureSinksTable)
2967-
{
2968-
TestSinksOltpWriteToTopicAndTable2();
2969-
}
2970-
2971-
Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopicAndTable_2_Query, TFixtureSinksQuery)
2972-
{
2973-
TestSinksOltpWriteToTopicAndTable2();
2974-
}
2975-
2976-
void TFixtureSinks::TestSinksOltpWriteToTopicAndTable3()
2959+
Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopicAndTable_3, TFixtureSinks)
29772960
{
29782961
CreateTopic("topic_A");
29792962
CreateTopic("topic_B");
@@ -2985,8 +2968,8 @@ void TFixtureSinks::TestSinksOltpWriteToTopicAndTable3()
29852968
NTable::TTransaction tx = BeginTx(tableSession);
29862969

29872970
auto records = MakeTableRecords();
2988-
UpsertToTable("table_A", records, *session, tx.get());
2989-
UpsertToTable("table_B", records, *session, tx.get());
2971+
UpsertToTable("table_A", records, &tx);
2972+
UpsertToTable("table_B", records, &tx);
29902973

29912974
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), &tx);
29922975

@@ -3027,7 +3010,7 @@ Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopicAndTable_4, TFixtureSinks)
30273010
ExecuteDataQuery(tableSession, R"(SELECT COUNT(*) FROM `table_A`)", NTable::TTxControl::Tx(tx1));
30283011

30293012
auto records = MakeTableRecords();
3030-
UpsertToTable("table_A", records, *session, tx2.get());
3013+
UpsertToTable("table_A", records, &tx2);
30313014

30323015
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), &tx1);
30333016
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
@@ -3051,7 +3034,7 @@ Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopicAndTable_5, TFixtureSinks)
30513034
NTable::TTransaction tx = BeginTx(tableSession);
30523035

30533036
auto records = MakeTableRecords();
3054-
UpsertToTable("table_A", records, *session, tx.get());
3037+
UpsertToTable("table_A", records, &tx);
30553038

30563039
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), &tx);
30573040
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
@@ -3065,37 +3048,27 @@ Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopicAndTable_5, TFixtureSinks)
30653048
CheckTabletKeys("topic_A");
30663049
}
30673050

3068-
Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopicAndTable_5_Table, TFixtureSinksTable)
3069-
{
3070-
TestSinksOltpWriteToTopicAndTable5();
3071-
}
3072-
3073-
Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopicAndTable_5_Query, TFixtureSinksQuery)
3074-
{
3075-
TestSinksOltpWriteToTopicAndTable5();
3076-
}
3077-
30783051
void TFixtureSinks::TestSinksOltpWriteToTopicAndTable6()
30793052
{
30803053
CreateTopic("topic_A");
30813054
CreateTopic("topic_B");
30823055
CreateRowTable("/Root/table_A");
30833056

3084-
auto session = CreateSession();
3085-
auto tx = session->BeginTx();
3057+
NTable::TSession tableSession = CreateTableSession();
3058+
NTable::TTransaction tx = BeginTx(tableSession);
30863059

30873060
auto records = MakeTableRecords();
3088-
InsertToTable("table_A", records, *session, tx.get());
3061+
InsertToTable("table_A", records, &tx);
30893062

3090-
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), tx.get());
3063+
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), &tx);
30913064

3092-
WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #1", tx.get());
3093-
WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #2", tx.get());
3094-
WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #3", tx.get());
3065+
WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #1", &tx);
3066+
WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #2", &tx);
3067+
WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #3", &tx);
30953068

3096-
DeleteFromTable("table_A", records, *session, tx.get());
3069+
DeleteFromTable("table_A", records, &tx);
30973070

3098-
session->CommitTx(*tx, EStatus::SUCCESS);
3071+
CommitTx(tx, EStatus::SUCCESS);
30993072

31003073
{
31013074
auto messages = Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 1);
@@ -3135,8 +3108,8 @@ void TFixtureSinks::TestSinksOlapWriteToTopicAndTable1()
31353108
NTable::TTransaction tx = BeginTx(tableSession);
31363109

31373110
auto records = MakeTableRecords();
3138-
UpsertToTable("table_A", records, *session, tx.get());
3139-
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), tx.get());
3111+
UpsertToTable("table_A", records, &tx);
3112+
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), &tx);
31403113

31413114
CommitTx(tx, EStatus::SUCCESS);
31423115

@@ -3162,8 +3135,8 @@ Y_UNIT_TEST_F(Sinks_Olap_WriteToTopicAndTable_2, TFixtureSinks)
31623135

31633136
auto records = MakeTableRecords();
31643137

3165-
UpsertToTable("table_A", records, *session, tx.get());
3166-
UpsertToTable("table_B", records, *session, tx.get());
3138+
UpsertToTable("table_A", records, &tx);
3139+
UpsertToTable("table_B", records, &tx);
31673140

31683141
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), &tx);
31693142

@@ -3201,7 +3174,7 @@ Y_UNIT_TEST_F(Sinks_Olap_WriteToTopicAndTable_3, TFixtureSinks)
32013174
NTable::TTransaction tx = BeginTx(tableSession);
32023175

32033176
auto records = MakeTableRecords();
3204-
UpsertToTable("table_A", records, *session, tx.get());
3177+
UpsertToTable("table_A", records, &tx);
32053178

32063179
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), &tx);
32073180
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
@@ -3215,16 +3188,6 @@ Y_UNIT_TEST_F(Sinks_Olap_WriteToTopicAndTable_3, TFixtureSinks)
32153188
CheckTabletKeys("topic_A");
32163189
}
32173190

3218-
Y_UNIT_TEST_F(Write_Random_Sized_Messages_In_Wide_Transactions, TFixture)
3219-
{
3220-
// Consumes a lot of memory. Temporarily disabled
3221-
return;
3222-
3223-
Y_UNIT_TEST_F(Sinks_Olap_WriteToTopicAndTable_3_Query, TFixtureSinksQuery)
3224-
{
3225-
TestSinksOlapWriteToTopicAndTable3();
3226-
}
3227-
32283191
void TFixtureSinks::TestSinksOlapWriteToTopicAndTable4()
32293192
{
32303193
return; // https://github.com/ydb-platform/ydb/issues/17271
@@ -3235,25 +3198,25 @@ void TFixtureSinks::TestSinksOlapWriteToTopicAndTable4()
32353198
CreateColumnTable("/Root/table_B");
32363199
CreateColumnTable("/Root/table_C");
32373200

3238-
auto session = CreateSession();
3239-
auto tx = session->BeginTx();
3201+
NTable::TSession tableSession = CreateTableSession();
3202+
NTable::TTransaction tx = BeginTx(tableSession);
32403203

32413204
auto records = MakeTableRecords();
32423205

3243-
InsertToTable("table_A", records, *session, tx.get());
3244-
InsertToTable("table_B", records, *session, tx.get());
3245-
UpsertToTable("table_C", records, *session, tx.get());
3206+
InsertToTable("table_A", records, &tx);
3207+
InsertToTable("table_B", records, &tx);
3208+
UpsertToTable("table_C", records, &tx);
32463209

3247-
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), tx.get());
3210+
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), &tx);
32483211

32493212
const size_t topicMsgCnt = 10;
32503213
for (size_t i = 1; i <= topicMsgCnt; ++i) {
3251-
WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #" + std::to_string(i), tx.get());
3214+
WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #" + std::to_string(i), &tx);
32523215
}
32533216

3254-
DeleteFromTable("table_B", records, *session, tx.get());
3217+
DeleteFromTable("table_B", records, &tx);
32553218

3256-
session->CommitTx(*tx, EStatus::SUCCESS);
3219+
CommitTx(tx, EStatus::SUCCESS);
32573220

32583221
{
32593222
auto messages = Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 1);
@@ -3284,59 +3247,6 @@ Y_UNIT_TEST_F(Sinks_Olap_WriteToTopicAndTable_4_Query, TFixtureSinksQuery)
32843247
TestSinksOlapWriteToTopicAndTable4();
32853248
}
32863249

3287-
void TFixture::TestWriteRandomSizedMessagesInWideTransactions()
3288-
{
3289-
// The test verifies the simultaneous execution of several transactions. There is a topic
3290-
// with PARTITIONS_COUNT partitions. In each transaction, the test writes to all the partitions.
3291-
// The size of the messages is random. Such that both large blobs in the body and small ones in
3292-
// the head of the partition are obtained. Message sizes are multiples of 500 KB. This way we
3293-
// will make sure that when committing transactions, the division into blocks is taken into account.
3294-
3295-
const size_t PARTITIONS_COUNT = 20;
3296-
const size_t TXS_COUNT = 100;
3297-
3298-
CreateTopic("topic_A", TEST_CONSUMER, PARTITIONS_COUNT);
3299-
3300-
std::vector<NTable::TSession> sessions;
3301-
std::vector<NTable::TTransaction> transactions;
3302-
3303-
// We open TXS_COUNT transactions and write messages to the topic.
3304-
for (size_t i = 0; i < TXS_COUNT; ++i) {
3305-
sessions.push_back(CreateTableSession());
3306-
auto& session = sessions.back();
3307-
3308-
transactions.push_back(BeginTx(session));
3309-
auto& tx = transactions.back();
3310-
3311-
for (size_t j = 0; j < PARTITIONS_COUNT; ++j) {
3312-
TString sourceId = TEST_MESSAGE_GROUP_ID;
3313-
sourceId += "_";
3314-
sourceId += ToString(i);
3315-
sourceId += "_";
3316-
sourceId += ToString(j);
3317-
3318-
size_t count = RandomNumber<size_t>(20) + 3;
3319-
WriteToTopic("topic_A", sourceId, TString(512 * 1000 * count, 'x'), &tx, j);
3320-
3321-
WaitForAcks("topic_A", sourceId);
3322-
}
3323-
}
3324-
3325-
// We are doing an asynchronous commit of transactions. They will be executed simultaneously.
3326-
std::vector<NTable::TAsyncCommitTransactionResult> futures;
3327-
3328-
for (size_t i = 0; i < TXS_COUNT; ++i) {
3329-
futures.push_back(transactions[i].Commit());
3330-
}
3331-
3332-
// All transactions must be completed successfully.
3333-
for (size_t i = 0; i < TXS_COUNT; ++i) {
3334-
futures[i].Wait();
3335-
const auto& result = futures[i].GetValueSync();
3336-
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
3337-
}
3338-
}
3339-
33403250
Y_UNIT_TEST_F(The_Configuration_Is_Changing_As_We_Write_To_The_Topic, TFixture)
33413251
{
33423252
// To test that you can change the topic configuration while writing to the partition

ydb/tests/functional/api/test_insert.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# -*- coding: utf-8 -*-
22
import logging
33

4-
from hamcrest import assert_that, raises, equal_to, any_of
4+
from hamcrest import assert_that, raises, equal_to
55

66
from ydb.tests.library.harness.kikimr_runner import KiKiMR
77
from ydb.tests.oss.ydb_sdk_import import ydb

0 commit comments

Comments
 (0)