16
16
#include < library/cpp/threading/future/future.h>
17
17
#include < library/cpp/threading/future/async.h>
18
18
19
+ #include < util/stream/zlib.h>
20
+
19
21
#include < future>
20
22
21
23
namespace NYdb ::NTopic::NTests {
@@ -99,7 +101,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
99
101
NYdb::TDriverConfig cfg;
100
102
cfg.SetEndpoint (TStringBuilder () << " invalid:" << setup.GetServer ().GrpcPort );
101
103
cfg.SetDatabase (" /Invalid" );
102
- cfg.SetLog (CreateLogBackend (" cerr" , ELogPriority::TLOG_DEBUG));
104
+ cfg.SetLog (std::unique_ptr<TLogBackend>( CreateLogBackend (" cerr" , ELogPriority::TLOG_DEBUG). Release () ));
103
105
auto driver = NYdb::TDriver (cfg);
104
106
105
107
{
@@ -113,13 +115,13 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
113
115
auto writeSession = client.CreateWriteSession (writeSettings);
114
116
115
117
auto event = writeSession->GetEvent (true );
116
- UNIT_ASSERT (event. Defined () && std::holds_alternative<TSessionClosedEvent>(event.GetRef ()));
118
+ UNIT_ASSERT (event && std::holds_alternative<TSessionClosedEvent>(event.value ()));
117
119
}
118
120
119
121
{
120
122
auto settings = TTopicClientSettings ()
121
123
.Database ({" /Root" })
122
- .DiscoveryEndpoint ({ TStringBuilder () << " localhost:" << setup.GetServer ().GrpcPort } );
124
+ .DiscoveryEndpoint (" localhost:" + std::to_string ( setup.GetServer ().GrpcPort ) );
123
125
124
126
TTopicClient client (driver, settings);
125
127
@@ -130,7 +132,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
130
132
auto writeSession = client.CreateWriteSession (writeSettings);
131
133
132
134
auto event = writeSession->GetEvent (true );
133
- UNIT_ASSERT (event. Defined () && !std::holds_alternative<TSessionClosedEvent>(event.GetRef ()));
135
+ UNIT_ASSERT (event && !std::holds_alternative<TSessionClosedEvent>(event.value ()));
134
136
}
135
137
}
136
138
@@ -171,13 +173,13 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
171
173
auto readSession = client.CreateReadSession (readSettings);
172
174
173
175
auto event = readSession->GetEvent (true );
174
- UNIT_ASSERT (event.Defined ());
176
+ UNIT_ASSERT (event.has_value ());
175
177
176
178
auto & startPartitionSession = std::get<TReadSessionEvent::TStartPartitionSessionEvent>(*event);
177
179
startPartitionSession.Confirm ();
178
180
179
181
event = readSession->GetEvent (true );
180
- UNIT_ASSERT (event.Defined ());
182
+ UNIT_ASSERT (event.has_value ());
181
183
182
184
auto & dataReceived = std::get<TReadSessionEvent::TDataReceivedEvent>(*event);
183
185
dataReceived.Commit ();
@@ -234,16 +236,16 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
234
236
auto readSession = client.CreateReadSession (readSettings);
235
237
236
238
auto event = readSession->GetEvent (true );
237
- UNIT_ASSERT (event.Defined ());
239
+ UNIT_ASSERT (event.has_value ());
238
240
239
241
auto & startPartitionSession = std::get<TReadSessionEvent::TStartPartitionSessionEvent>(*event);
240
242
startPartitionSession.Confirm ();
241
243
242
244
UNIT_CHECK_GENERATED_EXCEPTION (readSession->GetEvent (true , 0 ), TContractViolation);
243
- UNIT_CHECK_GENERATED_EXCEPTION (readSession->GetEvents (true , Nothing () , 0 ), TContractViolation);
245
+ UNIT_CHECK_GENERATED_EXCEPTION (readSession->GetEvents (true , std::nullopt , 0 ), TContractViolation);
244
246
245
247
event = readSession->GetEvent (true , 1 );
246
- UNIT_ASSERT (event.Defined ());
248
+ UNIT_ASSERT (event.has_value ());
247
249
248
250
auto & dataReceived = std::get<TReadSessionEvent::TDataReceivedEvent>(*event);
249
251
dataReceived.Commit ();
@@ -335,7 +337,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
335
337
auto description = result.GetConsumerDescription ();
336
338
UNIT_ASSERT (description.GetPartitions ().size () == 1 );
337
339
auto stats = description.GetPartitions ().front ().GetPartitionConsumerStats ();
338
- UNIT_ASSERT (stats.Defined ());
340
+ UNIT_ASSERT (stats.has_value ());
339
341
UNIT_ASSERT (stats->GetCommittedOffset () == 50 );
340
342
}
341
343
@@ -695,7 +697,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
695
697
auto description = result.GetTopicDescription ();
696
698
UNIT_ASSERT (description.GetPartitions ().size () == 1 );
697
699
auto stats = description.GetPartitions ().front ().GetPartitionStats ();
698
- UNIT_ASSERT (stats.Defined ());
700
+ UNIT_ASSERT (stats.has_value ());
699
701
UNIT_ASSERT_VALUES_EQUAL (stats->GetEndOffset (), count);
700
702
701
703
}
@@ -779,7 +781,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
779
781
std::visit (TOverloaded {
780
782
[&](TReadSessionEvent::TDataReceivedEvent& event) {
781
783
for (auto & message: event.GetMessages ()) {
782
- TString sourceId = message.GetMessageGroupId ();
784
+ std::string sourceId = message.GetMessageGroupId ();
783
785
ui32 seqNo = message.GetSeqNo ();
784
786
UNIT_ASSERT_VALUES_EQUAL (readMessageCount + 1 , seqNo);
785
787
++readMessageCount;
@@ -829,11 +831,11 @@ Y_UNIT_TEST_SUITE(TSettingsValidation) {
829
831
830
832
auto client = setup.MakeClient ();
831
833
ui64 producerIndex = 0u ;
832
- auto runTest = [&](TString producer, TString msgGroup, const TMaybe <bool >& useDedup, bool useSeqNo, EExpectedTestResult result) ->bool
834
+ auto runTest = [&](TString producer, TString msgGroup, const std::optional <bool >& useDedup, bool useSeqNo, EExpectedTestResult result) ->bool
833
835
{
834
836
TWriteSessionSettings writeSettings;
835
837
writeSettings.Path (setup.GetTopicPath ()).Codec (NTopic::ECodec::RAW);
836
- TString useDedupStr = useDedup.Defined () ? ToString (*useDedup) : " <unset>" ;
838
+ TString useDedupStr = useDedup.has_value () ? ToString (*useDedup) : " <unset>" ;
837
839
if (producer) {
838
840
producer += ToString (producerIndex);
839
841
}
@@ -848,7 +850,7 @@ Y_UNIT_TEST_SUITE(TSettingsValidation) {
848
850
<< useDedupStr << " , manual SeqNo: " << useSeqNo << Endl;
849
851
850
852
try {
851
- if (useDedup.Defined ()) {
853
+ if (useDedup.has_value ()) {
852
854
writeSettings.DeduplicationEnabled (useDedup);
853
855
}
854
856
auto session = client.CreateWriteSession (writeSettings);
@@ -857,12 +859,12 @@ Y_UNIT_TEST_SUITE(TSettingsValidation) {
857
859
ui64 written = 0 ;
858
860
while (written < 10 ) {
859
861
auto event = session->GetEvent (true );
860
- if (std::holds_alternative<TSessionClosedEvent>(event.GetRef ())) {
862
+ if (std::holds_alternative<TSessionClosedEvent>(event.value ())) {
861
863
auto closed = std::get<TSessionClosedEvent>(*event);
862
864
Cerr << " Session failed with error: " << closed.DebugString () << Endl;
863
865
UNIT_ASSERT (result == EExpectedTestResult::FAIL_ON_RPC);
864
866
return false ;
865
- } else if (std::holds_alternative<TWriteSessionEvent::TReadyToAcceptEvent>(event.GetRef ())) {
867
+ } else if (std::holds_alternative<TWriteSessionEvent::TReadyToAcceptEvent>(event.value ())) {
866
868
token = std::move (std::get<TWriteSessionEvent::TReadyToAcceptEvent>(*event).ContinuationToken );
867
869
if (useSeqNo) {
868
870
session->Write (std::move (*token), " data" , seqNo++);
@@ -948,10 +950,10 @@ Y_UNIT_TEST_SUITE(TSettingsValidation) {
948
950
949
951
auto readSession = client.CreateReadSession (readSettings);
950
952
auto event = readSession->GetEvent (true );
951
- UNIT_ASSERT (event.Defined ());
953
+ UNIT_ASSERT (event.has_value ());
952
954
953
955
auto & closeEvent = std::get<NYdb::NTopic::TSessionClosedEvent>(*event);
954
- UNIT_ASSERT (closeEvent.DebugString ().Contains (" Too small max memory usage" ));
956
+ UNIT_ASSERT (closeEvent.DebugString ().contains (" Too small max memory usage" ));
955
957
}
956
958
957
959
} // Y_UNIT_TEST_SUITE(TSettingsValidation)
0 commit comments