@@ -12,9 +12,13 @@ namespace NYql {
12
12
13
13
class TBlockingEQueue {
14
14
public:
15
- void Push (NYdb::NTopic::TReadSessionEvent::TEvent&& e) {
15
+ TBlockingEQueue (size_t maxSize):MaxSize_(maxSize) {
16
+ }
17
+ void Push (NYdb::NTopic::TReadSessionEvent::TEvent&& e, size_t size) {
16
18
with_lock (Mutex_) {
17
- Events_.push_back (std::move (e));
19
+ CanPush_.WaitI (Mutex_, [this ] () {return Stopped_ || Size_ < MaxSize_;});
20
+ Events_.emplace_back (std::move (e), size );
21
+ Size_ += size;
18
22
}
19
23
CanPop_.BroadCast ();
20
24
}
@@ -29,26 +33,26 @@ class TBlockingEQueue {
29
33
with_lock (Mutex_) {
30
34
if (block) {
31
35
CanPop_.WaitI (Mutex_, [this ] () {return CanPopPredicate ();});
32
-
33
- auto front = Events_.front ();
34
- Events_.pop_front ();
35
- return front;
36
36
} else {
37
37
if (!CanPopPredicate ()) {
38
38
return {};
39
39
}
40
-
41
- auto front = Events_.front ();
42
- Events_.pop_front ();
43
- return front;
44
40
}
41
+ auto [front, size] = std::move (Events_.front ());
42
+ Events_.pop_front ();
43
+ Size_ -= size;
44
+ if (Size_ < MaxSize_) {
45
+ CanPush_.BroadCast ();
46
+ }
47
+ return front;
45
48
}
46
49
}
47
50
48
51
void Stop () {
49
52
with_lock (Mutex_) {
50
53
Stopped_ = true ;
51
54
CanPop_.BroadCast ();
55
+ CanPush_.BroadCast ();
52
56
}
53
57
}
54
58
@@ -63,10 +67,13 @@ class TBlockingEQueue {
63
67
return !Events_.empty () && !Stopped_;
64
68
}
65
69
66
- TDeque<NYdb::NTopic::TReadSessionEvent::TEvent> Events_;
70
+ size_t MaxSize_;
71
+ size_t Size_ = 0 ;
72
+ TDeque<std::pair<NYdb::NTopic::TReadSessionEvent::TEvent, size_t >> Events_;
67
73
bool Stopped_ = false ;
68
74
TMutex Mutex_;
69
75
TCondVar CanPop_;
76
+ TCondVar CanPush_;
70
77
};
71
78
72
79
class TFileTopicReadSession : public NYdb ::NTopic::IReadSession {
@@ -174,21 +181,23 @@ constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(100);
174
181
while (!EventsQ_.IsStopped ()) {
175
182
TString rawMsg;
176
183
TVector<TMessage> msgs;
184
+ size_t size = 0 ;
177
185
178
186
while (size_t read = fi.ReadLine (rawMsg)) {
179
187
msgs.emplace_back (MakeNextMessage (rawMsg));
180
188
MsgOffset_++;
189
+ size += rawMsg.size ();
181
190
}
182
191
if (!msgs.empty ()) {
183
- EventsQ_.Push (NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent (msgs, {}, Session_));
192
+ EventsQ_.Push (NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent (msgs, {}, Session_), size );
184
193
}
185
194
186
195
Sleep (FILE_POLL_PERIOD);
187
196
}
188
197
}
189
198
190
199
TFile File_;
191
- TBlockingEQueue EventsQ_;
200
+ TBlockingEQueue EventsQ_ {4_MB} ;
192
201
NYdb::NTopic::TPartitionSession::TPtr Session_;
193
202
TString ProducerId_;
194
203
std::thread FilePoller_;
@@ -298,4 +307,4 @@ NYdb::TAsyncStatus TFileTopicClient::CommitOffset(const TString& path, ui64 part
298
307
return NThreading::MakeFuture (NYdb::TStatus (NYdb::EStatus::SUCCESS, {}));
299
308
}
300
309
301
- }
310
+ }
0 commit comments