1
1
#include " data_plane_helpers.h"
2
+ #include < ydb-cpp-sdk/client/resources/ydb_resources.h>
3
+ #include < ydb-cpp-sdk/client/topic/client.h>
2
4
3
5
namespace NKikimr ::NPersQueueTests {
4
6
@@ -51,7 +53,8 @@ namespace NKikimr::NPersQueueTests {
51
53
std::optional<ui32> partitionGroup,
52
54
std::optional<TString> codec,
53
55
std::optional<bool > reconnectOnFailure,
54
- std::unordered_map<std::string, std::string> sessionMeta
56
+ std::unordered_map<std::string, std::string> sessionMeta,
57
+ const TString& userAgent
55
58
) {
56
59
auto settings = TWriteSessionSettings ().Path (topic).MessageGroupId (sourceId);
57
60
if (partitionGroup) settings.PartitionGroupId (*partitionGroup);
@@ -66,6 +69,9 @@ namespace NKikimr::NPersQueueTests {
66
69
}
67
70
settings.MaxMemoryUsage (1024 *1024 *1024 *1024ll );
68
71
settings.Meta_ .Fields = sessionMeta;
72
+ if (!userAgent.empty ()) {
73
+ settings.Header ({{NYdb::YDB_APPLICATION_NAME, userAgent}});
74
+ }
69
75
return CreateSimpleWriter (driver, settings);
70
76
}
71
77
@@ -79,6 +85,21 @@ namespace NKikimr::NPersQueueTests {
79
85
return TPersQueueClient (driver, clientSettings).CreateReadSession (TReadSessionSettings (settings).DisableClusterDiscovery (true ));
80
86
}
81
87
88
+ std::shared_ptr<NYdb::NTopic::IReadSession> CreateReader (
89
+ NYdb::TDriver& driver,
90
+ const NYdb::NTopic::TReadSessionSettings& settings,
91
+ std::shared_ptr<NYdb::ICredentialsProviderFactory> creds,
92
+ const TString& userAgent
93
+ ) {
94
+ NYdb::NTopic::TTopicClientSettings clientSettings;
95
+ if (creds) clientSettings.CredentialsProviderFactory (creds);
96
+ auto readerSettings = settings;
97
+ if (!userAgent.empty ()) {
98
+ readerSettings.Header ({{NYdb::YDB_APPLICATION_NAME, userAgent}});
99
+ }
100
+ return NYdb::NTopic::TTopicClient (driver, clientSettings).CreateReadSession (readerSettings);
101
+ }
102
+
82
103
TMaybe<TReadSessionEvent::TDataReceivedEvent> GetNextMessageSkipAssignment (std::shared_ptr<IReadSession>& reader, TDuration timeout) {
83
104
while (true ) {
84
105
auto future = reader->WaitEvent ();
@@ -99,4 +120,24 @@ namespace NKikimr::NPersQueueTests {
99
120
}
100
121
return {};
101
122
}
123
+
124
+ TMaybe<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent> GetNextMessageSkipAssignment (std::shared_ptr<NYdb::NTopic::IReadSession>& reader, TDuration timeout) {
125
+ while (true ) {
126
+ auto future = reader->WaitEvent ();
127
+ future.Wait (timeout);
128
+ std::optional<NYdb::NTopic::TReadSessionEvent::TEvent> event = reader->GetEvent (false , 1 );
129
+ if (!event)
130
+ return {};
131
+ if (auto e = std::get_if<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent>(&*event)) {
132
+ return *e;
133
+ } else if (auto * e = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&*event)) {
134
+ e->Confirm ();
135
+ } else if (auto * e = std::get_if<NYdb::NTopic::TReadSessionEvent::TStopPartitionSessionEvent>(&*event)) {
136
+ e->Confirm ();
137
+ } else if (std::get_if<NYdb::NTopic::TSessionClosedEvent>(&*event)) {
138
+ return {};
139
+ }
140
+ }
141
+ return {};
142
+ }
102
143
}
0 commit comments