1
- #include " ut_utils/managed_executor.h"
2
1
#include " ut_utils/topic_sdk_test_setup.h"
2
+
3
+ #include < tests/integration/topic/utils/managed_executor.h>
4
+
3
5
#include < src/client/persqueue_public/ut/ut_utils/ut_utils.h>
4
6
5
7
#include < ydb-cpp-sdk/client/topic/client.h>
18
20
19
21
#include < util/stream/zlib.h>
20
22
21
- #include < future>
22
23
24
+ using namespace std ::chrono_literals;
23
25
24
26
static const bool EnableDirectRead = !std::string{std::getenv (" PQ_EXPERIMENTAL_DIRECT_READ" ) ? std::getenv (" PQ_EXPERIMENTAL_DIRECT_READ" ) : " " }.empty();
25
27
26
28
27
- namespace NYdb ::NTopic::NTests {
29
+ namespace NYdb ::inline V3:: NTopic::NTests {
28
30
29
- void WriteAndReadToEndWithRestarts (TReadSessionSettings readSettings, TWriteSessionSettings writeSettings, const std::string& message, ui32 count, TTopicSdkTestSetup& setup, TIntrusivePtr<TManagedExecutor> decompressor) {
31
+ void WriteAndReadToEndWithRestarts (TReadSessionSettings readSettings, TWriteSessionSettings writeSettings, const std::string& message, std:: uint32_t count, TTopicSdkTestSetup& setup, TIntrusivePtr<TManagedExecutor> decompressor) {
30
32
auto client = setup.MakeClient ();
31
33
auto session = client.CreateSimpleBlockingWriteSession (writeSettings);
32
34
33
- for (ui32 i = 1 ; i <= count; ++i) {
35
+ for (std:: uint32_t i = 1 ; i <= count; ++i) {
34
36
bool res = session->Write (message);
35
37
UNIT_ASSERT (res);
36
38
}
@@ -44,7 +46,7 @@ void WriteAndReadToEndWithRestarts(TReadSessionSettings readSettings, TWriteSess
44
46
45
47
auto WaitTasks = [&](auto f, size_t c) {
46
48
while (f () < c) {
47
- Sleep ( TDuration::MilliSeconds ( 100 ) );
49
+ std::this_thread::sleep_for (100ms );
48
50
};
49
51
};
50
52
auto WaitPlannedTasks = [&](auto e, size_t count) {
@@ -69,7 +71,7 @@ void WriteAndReadToEndWithRestarts(TReadSessionSettings readSettings, TWriteSess
69
71
size_t completed = e->GetExecutedCount ();
70
72
71
73
setup.GetServer ().KillTopicPqrbTablet (setup.GetTopicPath ());
72
- Sleep ( TDuration::MilliSeconds ( 100 ) );
74
+ std::this_thread::sleep_for (100ms );
73
75
74
76
e->StartFuncs (tasks);
75
77
WaitExecutedTasks (e, completed + n);
@@ -90,7 +92,7 @@ void WriteAndReadToEndWithRestarts(TReadSessionSettings readSettings, TWriteSess
90
92
91
93
ReadSession = topicClient.CreateReadSession (readSettings);
92
94
93
- ui32 i = 0 ;
95
+ std:: uint32_t i = 0 ;
94
96
while (AtomicGet (lastOffset) + 1 < count) {
95
97
RunTasks (decompressor, {i++});
96
98
}
@@ -109,7 +111,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
109
111
auto decompressor = CreateThreadPoolManagedExecutor (1 );
110
112
111
113
TReadSessionSettings readSettings;
112
- TTopicReadSettings topic = TEST_TOPIC ;
114
+ TTopicReadSettings topic = setup. GetTopicPath () ;
113
115
topic.AppendPartitionIds (0 );
114
116
readSettings
115
117
.WithoutConsumer ()
@@ -121,13 +123,13 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
121
123
122
124
TWriteSessionSettings writeSettings;
123
125
writeSettings
124
- .Path (TEST_TOPIC )
126
+ .Path (setup. GetTopicPath () )
125
127
.MessageGroupId (TEST_MESSAGE_GROUP_ID)
126
128
.Codec (NTopic::ECodec::RAW)
127
129
.CompressionExecutor (compressor);
128
130
129
131
130
- ui32 count = 700 ;
132
+ std:: uint32_t count = 700 ;
131
133
std::string message (2'000 , ' x' );
132
134
133
135
WriteAndReadToEndWithRestarts (readSettings, writeSettings, message, count, setup, decompressor);
@@ -140,21 +142,21 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
140
142
141
143
TReadSessionSettings readSettings;
142
144
readSettings
143
- .ConsumerName (TEST_CONSUMER )
145
+ .ConsumerName (setup. GetConsumerName () )
144
146
.MaxMemoryUsageBytes (1_MB)
145
147
.DecompressionExecutor (decompressor)
146
- .AppendTopics (TEST_TOPIC )
148
+ .AppendTopics (setup. GetTopicPath () )
147
149
// .DirectRead(EnableDirectRead)
148
150
;
149
151
150
152
TWriteSessionSettings writeSettings;
151
153
writeSettings
152
- .Path (TEST_TOPIC ).MessageGroupId (TEST_MESSAGE_GROUP_ID)
153
- .Codec (NTopic:: ECodec::RAW)
154
+ .Path (setup. GetTopicPath () ).MessageGroupId (TEST_MESSAGE_GROUP_ID)
155
+ .Codec (ECodec::RAW)
154
156
.CompressionExecutor (compressor);
155
157
156
158
157
- ui32 count = 700 ;
159
+ std:: uint32_t count = 700 ;
158
160
std::string message (2'000 , ' x' );
159
161
160
162
WriteAndReadToEndWithRestarts (readSettings, writeSettings, message, count, setup, decompressor);
@@ -164,25 +166,25 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
164
166
165
167
TTopicSdkTestSetup setup (TEST_CASE_NAME);
166
168
167
- NTopic:: TWriteSessionSettings writeSettings;
169
+ TWriteSessionSettings writeSettings;
168
170
writeSettings.Path (setup.GetTopicPath ()).MessageGroupId (TEST_MESSAGE_GROUP_ID);
169
171
writeSettings.Path (setup.GetTopicPath ()).ProducerId (TEST_MESSAGE_GROUP_ID);
170
- writeSettings.Codec (NTopic:: ECodec::RAW);
171
- NTopic:: IExecutor::TPtr executor = new NTopic:: TSyncExecutor ();
172
+ writeSettings.Codec (ECodec::RAW);
173
+ IExecutor::TPtr executor = new TSyncExecutor ();
172
174
writeSettings.CompressionExecutor (executor);
173
175
174
- ui64 count = 100u ;
176
+ std:: uint64_t count = 100u ;
175
177
176
178
auto client = setup.MakeClient ();
177
179
auto session = client.CreateSimpleBlockingWriteSession (writeSettings);
178
180
179
- TString messageBase = " message----" ;
181
+ std::string messageBase = " message----" ;
180
182
181
183
for (auto i = 0u ; i < count; i++) {
182
184
auto res = session->Write (messageBase);
183
185
UNIT_ASSERT (res);
184
186
if (i % 10 == 0 ) {
185
- setup.GetServer ().KillTopicPqTablets (setup.GetTopicPath ());
187
+ setup.GetServer ().KillTopicPqTablets (setup.GetFullTopicPath ());
186
188
}
187
189
}
188
190
session->Close ();
0 commit comments