1
- #include < ydb/public/ sdk/cpp/ client/ydb_federated_topic /federated_topic.h>
2
- #include < ydb/public/sdk/cpp/ client/ydb_federated_topic /impl/federated_write_session.h>
1
+ #include < ydb-cpp- sdk/client/federated_topic /federated_topic.h>
2
+ #include < src/ client/federated_topic /impl/federated_write_session.h>
3
3
4
- #include < ydb/public/sdk/cpp/ client/ydb_topic /ut/ut_utils/managed_executor.h>
4
+ #include < src/ client/topic /ut/ut_utils/managed_executor.h>
5
5
6
- #include < ydb/public/sdk/cpp/ client/ydb_persqueue_public /persqueue.h>
6
+ #include < src/ client/persqueue_public /persqueue.h>
7
7
8
- #include < ydb/public/sdk/cpp/ client/ydb_topic /impl/common.h>
9
- #include < ydb/public/sdk/cpp/ client/ydb_topic /common/executor_impl.h>
10
- #include < ydb/public/sdk/cpp/ client/ydb_persqueue_public /include/write_session.h>
8
+ #include < src/ client/topic /impl/common.h>
9
+ #include < src/ client/topic /common/executor_impl.h>
10
+ #include < src/ client/persqueue_public /include/write_session.h>
11
11
12
- #include < ydb/public/sdk/cpp/ client/ydb_persqueue_public /ut/ut_utils/ut_utils.h>
13
- #include < ydb/public/sdk/cpp/ client/ydb_federated_topic /ut/fds_mock/fds_mock.h>
12
+ #include < src/ client/persqueue_public /ut/ut_utils/ut_utils.h>
13
+ #include < src/ client/federated_topic /ut/fds_mock/fds_mock.h>
14
14
15
15
#include < library/cpp/testing/unittest/registar.h>
16
16
#include < library/cpp/testing/unittest/tests_data.h>
@@ -43,7 +43,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
43
43
NYdb::TDriverConfig cfg;
44
44
cfg.SetEndpoint (TStringBuilder () << " localhost:" << newServicePort);
45
45
cfg.SetDatabase (" /Root" );
46
- cfg.SetLog (CreateLogBackend (" cerr" , ELogPriority::TLOG_DEBUG));
46
+ cfg.SetLog (std::unique_ptr<TLogBackend>( CreateLogBackend (" cerr" , ELogPriority::TLOG_DEBUG). Release () ));
47
47
NYdb::TDriver driver (cfg);
48
48
NYdb::NFederatedTopic::TFederatedTopicClient topicClient (driver);
49
49
@@ -58,7 +58,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
58
58
Cerr << " Session was created" << Endl;
59
59
60
60
ReadSession->WaitEvent ().Wait (TDuration::Seconds (1 ));
61
- TMaybe <NYdb::NFederatedTopic::TReadSessionEvent::TEvent> event = ReadSession->GetEvent (false );
61
+ std::optional <NYdb::NFederatedTopic::TReadSessionEvent::TEvent> event = ReadSession->GetEvent (false );
62
62
Y_ASSERT (!event);
63
63
64
64
auto fdsRequest = fdsMock.GetNextPendingRequest ();
@@ -99,7 +99,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
99
99
for (size_t i = 0 ; i < partitionsCount; ++i) {
100
100
ReadSession->WaitEvent ().Wait ();
101
101
// Get event
102
- TMaybe <NYdb::NFederatedTopic::TReadSessionEvent::TEvent> event = ReadSession->GetEvent (true /* block - will block if no event received yet*/ );
102
+ std::optional <NYdb::NFederatedTopic::TReadSessionEvent::TEvent> event = ReadSession->GetEvent (true /* block - will block if no event received yet*/ );
103
103
Cerr << " Got new read session event: " << DebugString (*event) << Endl;
104
104
105
105
auto * startPartitionSessionEvent = std::get_if<NYdb::NFederatedTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&*event);
@@ -126,7 +126,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
126
126
NYdb::TDriverConfig cfg;
127
127
cfg.SetEndpoint (TStringBuilder () << " localhost:" << newServicePort);
128
128
cfg.SetDatabase (" /Root" );
129
- cfg.SetLog (CreateLogBackend (" cerr" , ELogPriority::TLOG_DEBUG));
129
+ cfg.SetLog (std::unique_ptr<TLogBackend>( CreateLogBackend (" cerr" , ELogPriority::TLOG_DEBUG). Release () ));
130
130
NYdb::TDriver driver (cfg);
131
131
NYdb::NFederatedTopic::TFederatedTopicClient topicClient (driver);
132
132
@@ -170,7 +170,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
170
170
NYdb::TDriverConfig cfg;
171
171
cfg.SetEndpoint (TStringBuilder () << " localhost:" << newServicePort);
172
172
cfg.SetDatabase (" /Root" );
173
- cfg.SetLog (CreateLogBackend (" cerr" , ELogPriority::TLOG_DEBUG));
173
+ cfg.SetLog (std::unique_ptr<TLogBackend>( CreateLogBackend (" cerr" , ELogPriority::TLOG_DEBUG). Release () ));
174
174
NYdb::TDriver driver (cfg);
175
175
auto clientSettings = TFederatedTopicClientSettings ()
176
176
.RetryPolicy (NTopic::IRetryPolicy::GetFixedIntervalPolicy (
@@ -215,7 +215,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
215
215
NYdb::TDriverConfig cfg;
216
216
cfg.SetEndpoint (TStringBuilder () << " localhost:" << newServicePort);
217
217
cfg.SetDatabase (" /Root" );
218
- cfg.SetLog (CreateLogBackend (" cerr" , ELogPriority::TLOG_DEBUG));
218
+ cfg.SetLog (std::unique_ptr<TLogBackend>( CreateLogBackend (" cerr" , ELogPriority::TLOG_DEBUG). Release () ));
219
219
NYdb::TDriver driver (cfg);
220
220
auto clientSettings = TFederatedTopicClientSettings ()
221
221
.RetryPolicy (NTopic::IRetryPolicy::GetFixedIntervalPolicy (
@@ -324,7 +324,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
324
324
NYdb::TDriverConfig cfg;
325
325
cfg.SetEndpoint (TStringBuilder () << " localhost:" << newServicePort);
326
326
cfg.SetDatabase (" /Root" );
327
- cfg.SetLog (CreateLogBackend (" cerr" , ELogPriority::TLOG_DEBUG));
327
+ cfg.SetLog (std::unique_ptr<TLogBackend>( CreateLogBackend (" cerr" , ELogPriority::TLOG_DEBUG). Release () ));
328
328
NYdb::TDriver driver (cfg);
329
329
auto clientSettings = TFederatedTopicClientSettings ()
330
330
.RetryPolicy (NTopic::IRetryPolicy::GetNoRetryPolicy ());
@@ -343,14 +343,14 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
343
343
344
344
ReadSession->WaitEvent ().Wait (TDuration::Seconds (1 ));
345
345
auto event = ReadSession->GetEvent (false );
346
- UNIT_ASSERT (!event.Defined ());
346
+ UNIT_ASSERT (!event.has_value ());
347
347
348
348
auto fdsRequest = fdsMock.WaitNextPendingRequest ();
349
349
fdsRequest.Result .SetValue ({{}, grpc::Status (grpc::StatusCode::UNAVAILABLE, " mock 'unavailable'" )});
350
350
351
351
ReadSession->WaitEvent ().Wait ();
352
352
event = ReadSession->GetEvent (false );
353
- UNIT_ASSERT (event.Defined ());
353
+ UNIT_ASSERT (event.has_value ());
354
354
Cerr << " >>> Got event: " << DebugString (*event) << Endl;
355
355
UNIT_ASSERT (std::holds_alternative<NTopic::TSessionClosedEvent>(*event));
356
356
@@ -359,13 +359,13 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
359
359
360
360
ReadSession2->WaitEvent ().Wait (TDuration::Seconds (1 ));
361
361
event = ReadSession2->GetEvent (false );
362
- UNIT_ASSERT (!event.Defined ());
362
+ UNIT_ASSERT (!event.has_value ());
363
363
364
364
fdsRequest = fdsMock.WaitNextPendingRequest ();
365
365
fdsRequest.Result .SetValue (fdsMock.ComposeOkResultAvailableDatabases ());
366
366
367
367
event = ReadSession2->GetEvent (true );
368
- UNIT_ASSERT (event.Defined ());
368
+ UNIT_ASSERT (event.has_value ());
369
369
Cerr << " >>> Got event: " << DebugString (*event) << Endl;
370
370
UNIT_ASSERT (std::holds_alternative<TReadSessionEvent::TStartPartitionSessionEvent>(*event));
371
371
@@ -393,7 +393,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
393
393
Cerr << " Session was created" << Endl;
394
394
395
395
ReadSession->WaitEvent ().Wait (TDuration::Seconds (1 ));
396
- TMaybe <NYdb::NFederatedTopic::TReadSessionEvent::TEvent> event = ReadSession->GetEvent (false );
396
+ std::optional <NYdb::NFederatedTopic::TReadSessionEvent::TEvent> event = ReadSession->GetEvent (false );
397
397
Y_ASSERT (event);
398
398
Cerr << " Got new read session event: " << DebugString (*event) << Endl;
399
399
@@ -418,7 +418,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
418
418
NYdb::TDriverConfig cfg;
419
419
cfg.SetEndpoint (TStringBuilder () << " localhost:" << newServicePort);
420
420
cfg.SetDatabase (" /Root" );
421
- cfg.SetLog (CreateLogBackend (" cerr" , ELogPriority::TLOG_DEBUG));
421
+ cfg.SetLog (std::unique_ptr<TLogBackend>( CreateLogBackend (" cerr" , ELogPriority::TLOG_DEBUG). Release () ));
422
422
NYdb::TDriver driver (cfg);
423
423
auto clientSettings = TFederatedTopicClientSettings ();
424
424
NYdb::NFederatedTopic::TFederatedTopicClient topicClient (driver, clientSettings);
@@ -434,7 +434,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
434
434
Cerr << " Session was created" << Endl;
435
435
436
436
ReadSession->WaitEvent ().Wait (TDuration::Seconds (1 ));
437
- TMaybe <NYdb::NFederatedTopic::TReadSessionEvent::TEvent> event = ReadSession->GetEvent (false );
437
+ std::optional <NYdb::NFederatedTopic::TReadSessionEvent::TEvent> event = ReadSession->GetEvent (false );
438
438
Y_ASSERT (!event);
439
439
440
440
{
@@ -449,7 +449,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
449
449
}
450
450
451
451
ReadSession->WaitEvent ().Wait ();
452
- TMaybe <NYdb::NFederatedTopic::TReadSessionEvent::TEvent> event2 = ReadSession->GetEvent (true );
452
+ std::optional <NYdb::NFederatedTopic::TReadSessionEvent::TEvent> event2 = ReadSession->GetEvent (true );
453
453
Cerr << " Got new read session event: " << DebugString (*event2) << Endl;
454
454
455
455
auto * sessionEvent = std::get_if<NYdb::NFederatedTopic::TSessionClosedEvent>(&*event2);
@@ -475,7 +475,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
475
475
NYdb::TDriverConfig cfg;
476
476
cfg.SetEndpoint (TStringBuilder () << " localhost:" << newServicePort);
477
477
cfg.SetDatabase (" /Root" );
478
- cfg.SetLog (CreateLogBackend (" cerr" , ELogPriority::TLOG_DEBUG));
478
+ cfg.SetLog (std::unique_ptr<TLogBackend>( CreateLogBackend (" cerr" , ELogPriority::TLOG_DEBUG). Release () ));
479
479
NYdb::TDriver driver (cfg);
480
480
auto clientSettings = TFederatedTopicClientSettings ()
481
481
.RetryPolicy (NTopic::IRetryPolicy::GetFixedIntervalPolicy (
@@ -568,7 +568,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
568
568
NYdb::TDriverConfig cfg;
569
569
cfg.SetEndpoint (TStringBuilder () << " localhost:" << newServicePort);
570
570
cfg.SetDatabase (" /Root" );
571
- cfg.SetLog (CreateLogBackend (" cerr" , ELogPriority::TLOG_DEBUG));
571
+ cfg.SetLog (std::unique_ptr<TLogBackend>( CreateLogBackend (" cerr" , ELogPriority::TLOG_DEBUG). Release () ));
572
572
NYdb::TDriver driver (cfg);
573
573
auto clientSettings = TFederatedTopicClientSettings ()
574
574
.RetryPolicy (NTopic::IRetryPolicy::GetFixedIntervalPolicy (
@@ -613,10 +613,10 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
613
613
auto & message = messages[i];
614
614
UNIT_ASSERT (message.GetFederatedPartitionSession ()->GetReadSourceDatabaseName () == " dc1" );
615
615
UNIT_ASSERT (message.GetFederatedPartitionSession ()->GetTopicPath () == setup->GetTestTopic ());
616
- UNIT_ASSERT (message.GetData ().EndsWith (message.GetFederatedPartitionSession ()->GetTopicOriginDatabaseName ()));
616
+ UNIT_ASSERT (message.GetData ().ends_with (message.GetFederatedPartitionSession ()->GetTopicOriginDatabaseName ()));
617
617
618
618
UNIT_ASSERT (!sentSet.empty ());
619
- UNIT_ASSERT_C (sentSet.erase (message.GetData ()), " no such element is sentSet: " + message.GetData ());
619
+ UNIT_ASSERT_C (sentSet.erase (TString{ message.GetData ()} ), " no such element is sentSet: " + message.GetData ());
620
620
totalReceived++;
621
621
}
622
622
if (totalReceived == 3 * sentMessages.size ()) {
@@ -719,7 +719,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
719
719
NYdb::TDriverConfig cfg;
720
720
cfg.SetEndpoint (TStringBuilder () << " localhost:" << newServicePort);
721
721
cfg.SetDatabase (" /Root" );
722
- cfg.SetLog (CreateLogBackend (" cerr" , ELogPriority::TLOG_DEBUG));
722
+ cfg.SetLog (std::unique_ptr<TLogBackend>( CreateLogBackend (" cerr" , ELogPriority::TLOG_DEBUG). Release () ));
723
723
NYdb::TDriver driver (cfg);
724
724
NYdb::NFederatedTopic::TFederatedTopicClient topicClient (driver);
725
725
@@ -787,7 +787,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
787
787
NYdb::TDriverConfig cfg;
788
788
cfg.SetEndpoint (TStringBuilder () << " localhost:" << newServicePort);
789
789
cfg.SetDatabase (" /Root" );
790
- cfg.SetLog (CreateLogBackend (" cerr" , ELogPriority::TLOG_DEBUG));
790
+ cfg.SetLog (std::unique_ptr<TLogBackend>( CreateLogBackend (" cerr" , ELogPriority::TLOG_DEBUG). Release () ));
791
791
NYdb::TDriver driver (cfg);
792
792
NYdb::NFederatedTopic::TFederatedTopicClient topicClient (driver);
793
793
@@ -821,7 +821,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
821
821
NYdb::TDriverConfig cfg;
822
822
cfg.SetEndpoint (TStringBuilder () << " localhost:" << newServicePort);
823
823
cfg.SetDatabase (" /Root" );
824
- cfg.SetLog (CreateLogBackend (" cerr" , ELogPriority::TLOG_DEBUG));
824
+ cfg.SetLog (std::unique_ptr<TLogBackend>( CreateLogBackend (" cerr" , ELogPriority::TLOG_DEBUG). Release () ));
825
825
NYdb::TDriver driver (cfg);
826
826
NYdb::NFederatedTopic::TFederatedTopicClient topicClient (driver);
827
827
@@ -878,7 +878,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
878
878
NYdb::TDriverConfig cfg;
879
879
cfg.SetEndpoint (TStringBuilder () << " localhost:" << newServicePort);
880
880
cfg.SetDatabase (" /Root" );
881
- cfg.SetLog (CreateLogBackend (" cerr" , ELogPriority::TLOG_DEBUG));
881
+ cfg.SetLog (std::unique_ptr<TLogBackend>( CreateLogBackend (" cerr" , ELogPriority::TLOG_DEBUG). Release () ));
882
882
NYdb::TDriver driver (cfg);
883
883
TFederatedTopicClientSettings clientSettings;
884
884
clientSettings.RetryPolicy (NPersQueue::IRetryPolicy::GetNoRetryPolicy ());
@@ -934,7 +934,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
934
934
NYdb::TDriverConfig cfg;
935
935
cfg.SetEndpoint (TStringBuilder () << " localhost:" << newServicePort);
936
936
cfg.SetDatabase (" /Root" );
937
- cfg.SetLog (CreateLogBackend (" cerr" , ELogPriority::TLOG_DEBUG));
937
+ cfg.SetLog (std::unique_ptr<TLogBackend>( CreateLogBackend (" cerr" , ELogPriority::TLOG_DEBUG). Release () ));
938
938
NYdb::TDriver driver (cfg);
939
939
NYdb::NFederatedTopic::TFederatedTopicClient topicClient (driver);
940
940
@@ -1017,7 +1017,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
1017
1017
NYdb::TDriverConfig cfg;
1018
1018
cfg.SetEndpoint (TStringBuilder () << " localhost:" << newServicePort);
1019
1019
cfg.SetDatabase (" /Root" );
1020
- cfg.SetLog (CreateLogBackend (" cerr" , ELogPriority::TLOG_DEBUG));
1020
+ cfg.SetLog (std::unique_ptr<TLogBackend>( CreateLogBackend (" cerr" , ELogPriority::TLOG_DEBUG). Release () ));
1021
1021
NYdb::TDriver driver (cfg);
1022
1022
TFederatedTopicClientSettings clientSettings;
1023
1023
NYdb::NFederatedTopic::TFederatedTopicClient topicClient (driver, clientSettings);
@@ -1036,13 +1036,13 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
1036
1036
1037
1037
{
1038
1038
auto e = WriteSession->GetEvent (true );
1039
- UNIT_ASSERT (e.Defined ());
1039
+ UNIT_ASSERT (e.has_value ());
1040
1040
Cerr << " >>> Got event: " << DebugString (*e) << Endl;
1041
1041
UNIT_ASSERT (std::holds_alternative<NYdb::NTopic::TWriteSessionEvent::TReadyToAcceptEvent>(*e));
1042
1042
}
1043
1043
{
1044
1044
auto e = WriteSession->GetEvent (true );
1045
- UNIT_ASSERT (e.Defined ());
1045
+ UNIT_ASSERT (e.has_value ());
1046
1046
Cerr << " >>> Got event: " << DebugString (*e) << Endl;
1047
1047
UNIT_ASSERT (std::holds_alternative<NYdb::NTopic::TSessionClosedEvent>(*e));
1048
1048
}
@@ -1052,7 +1052,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
1052
1052
1053
1053
NTopic::TContinuationToken GetToken (std::shared_ptr<NTopic::IWriteSession> writer) {
1054
1054
auto e = writer->GetEvent (true );
1055
- UNIT_ASSERT (e.Defined ());
1055
+ UNIT_ASSERT (e.has_value ());
1056
1056
Cerr << " >>> Got event: " << DebugString (*e) << Endl;
1057
1057
auto * readyToAcceptEvent = std::get_if<NYdb::NTopic::TWriteSessionEvent::TReadyToAcceptEvent>(&*e);
1058
1058
UNIT_ASSERT (readyToAcceptEvent);
@@ -1076,7 +1076,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
1076
1076
auto driverConfig = NYdb::TDriverConfig ()
1077
1077
.SetEndpoint (TStringBuilder () << " localhost:" << newServicePort)
1078
1078
.SetDatabase (" /Root" )
1079
- .SetLog (CreateLogBackend (" cerr" , ELogPriority::TLOG_DEBUG));
1079
+ .SetLog (std::unique_ptr<TLogBackend>( CreateLogBackend (" cerr" , ELogPriority::TLOG_DEBUG). Release () ));
1080
1080
auto driver = NYdb::TDriver (driverConfig);
1081
1081
auto topicClient = NYdb::NFederatedTopic::TFederatedTopicClient (driver);
1082
1082
@@ -1169,7 +1169,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
1169
1169
NYdb::TDriverConfig cfg;
1170
1170
cfg.SetEndpoint (TStringBuilder () << " localhost:" << setup->GetGrpcPort ());
1171
1171
cfg.SetDatabase (" /Root" );
1172
- cfg.SetLog (CreateLogBackend (" cerr" , ELogPriority::TLOG_DEBUG));
1172
+ cfg.SetLog (std::unique_ptr<TLogBackend>( CreateLogBackend (" cerr" , ELogPriority::TLOG_DEBUG). Release () ));
1173
1173
NYdb::TDriver driver (cfg);
1174
1174
NYdb::NFederatedTopic::TFederatedTopicClient client (driver);
1175
1175
0 commit comments