Skip to content

Commit 316e3a7

Browse files
committed
opt(util):优化AsyncPipe代码,消除错序风险
1 parent 00d7529 commit 316e3a7

File tree

2 files changed

+35
-36
lines changed

2 files changed

+35
-36
lines changed

modules/util/async_pipe.cpp

Lines changed: 30 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -287,16 +287,21 @@ void AsyncPipe::Impl::appendLockless(const void *data_ptr, size_t data_size)
287287

288288
void AsyncPipe::Impl::threadFunc()
289289
{
290-
do {
291-
bool is_wake_for_timeup = true; //! 是否因超时而唤醒
290+
for (;;) {
291+
bool is_wake_for_quit = false; //! 是否因需要停止而被唤醒
292+
bool is_wake_for_timeup = true; //! 是否因超时而被唤醒
293+
292294
{
293295
//! 等待唤醒信号
294296
std::unique_lock<std::mutex> lk(full_buffers_mutex_);
295297
if (full_buffers_.empty()) {
296298
//! 等待三种情况: 1.超时,2.停止,3.full_buffers_不为空
297299
full_buffers_cv_.wait_for(lk, std::chrono::milliseconds(cfg_.interval),
298-
[&is_wake_for_timeup, this] {
299-
if (stop_signal_ || !full_buffers_.empty()) {
300+
[this, &is_wake_for_timeup, &is_wake_for_quit] {
301+
if (stop_signal_)
302+
is_wake_for_quit = true;
303+
304+
if (is_wake_for_quit || !full_buffers_.empty()) {
300305
is_wake_for_timeup = false;
301306
return true;
302307
}
@@ -307,7 +312,22 @@ void AsyncPipe::Impl::threadFunc()
307312
is_wake_for_timeup = false;
308313
}
309314
}
310-
//! 先处理 full_buffers_ 中的数据
315+
316+
//! 如果是超时或是收到停止信号,则先将 curr_buff_ 移到 full_buffers_
317+
if (is_wake_for_timeup || is_wake_for_quit) {
318+
if (curr_buffer_mutex_.try_lock()) {
319+
if (curr_buffer_ != nullptr) {
320+
//! Q: 这里为什么不锁 full_buffers_mutex_ ?
321+
//! A: 因为锁住了 curr_buffer_mutex_ 就不会有前端调用 appendLockless(),仅有后端的线程操作。
322+
//! 所以不锁 full_buffers_mutex_ 也是安全的
323+
full_buffers_.push_back(curr_buffer_);
324+
curr_buffer_ = nullptr;
325+
}
326+
curr_buffer_mutex_.unlock();
327+
}
328+
}
329+
330+
//! 然后逐一处理 full_buffers_ 中的数据
311331
for (;;) {
312332
Buffer *buff = nullptr;
313333
{
@@ -316,8 +336,9 @@ void AsyncPipe::Impl::threadFunc()
316336
if (!full_buffers_.empty()) {
317337
buff = full_buffers_.front();
318338
full_buffers_.pop_front();
319-
} else
339+
} else {
320340
break;
341+
}
321342
}
322343

323344
if (buff != nullptr) {
@@ -341,32 +362,9 @@ void AsyncPipe::Impl::threadFunc()
341362
}
342363
}
343364

344-
//! 如果是超时或是收到停止信号,则检查并处理 curr_buffer_ 中的数据
345-
if (is_wake_for_timeup || stop_signal_) {
346-
Buffer *buff = nullptr;
347-
if (curr_buffer_mutex_.try_lock()) {
348-
//! 注意:这里一定要用 try_lock(),否则会死锁
349-
if (curr_buffer_ != nullptr && !curr_buffer_->empty()) {
350-
buff = curr_buffer_;
351-
curr_buffer_ = nullptr;
352-
}
353-
curr_buffer_mutex_.unlock();
354-
}
355-
356-
if (buff != nullptr) { //! 如果没取出来
357-
//! 进行处理
358-
if (cb_)
359-
cb_(buff->data(), buff->size());
360-
buff->reset();
361-
//! 然后将处理后的buff放入到free_buffers_中
362-
std::lock_guard<std::mutex> lg(free_buffers_mutex_);
363-
free_buffers_.push_back(buff);
364-
free_buffers_cv_.notify_all();
365-
}
366-
}
367-
} while (!stop_signal_); //! 如果是停止信号,则直接跳出循环,结束线程
368-
//! Q: stop_signal_ 信号为什么不在被唤醒时就break呢?
369-
//! A: 因为我们期望就算是退出了,Buff中的数据都应该先被处理掉
365+
if (is_wake_for_quit)
366+
break;
367+
}
370368
}
371369

372370
}

modules/util/async_pipe_test.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,18 +40,18 @@ void TestByConfig(AsyncPipe::Config cfg)
4040
const uint8_t *p = static_cast<const uint8_t*>(ptr);
4141
for (size_t i = 0; i < size; ++i)
4242
out_data.push_back(p[i]);
43-
this_thread::sleep_for(chrono::milliseconds(10));
43+
this_thread::sleep_for(chrono::microseconds(10));
4444
}
4545
);
4646

47-
4847
for (size_t i = 0; i < 256; ++i) {
4948
uint8_t v = i;
5049
ap.append(&v, 1);
5150
}
51+
5252
ap.cleanup();
5353

54-
EXPECT_EQ(out_data.size(), 256);
54+
ASSERT_EQ(out_data.size(), 256);
5555
for (size_t i = 0; i < 256; ++i) {
5656
EXPECT_EQ(out_data[i], i);
5757
}
@@ -221,7 +221,7 @@ TEST(AsyncPipe, MultiThreadAppend)
221221
const auto len = s1.size();
222222

223223
const int thread_num = 100;
224-
const int each_thread_send_num = 1000;
224+
const int each_thread_send_num = 100;
225225

226226
std::vector<char> recv_data;
227227
recv_data.reserve(thread_num * each_thread_send_num * len);
@@ -231,6 +231,7 @@ TEST(AsyncPipe, MultiThreadAppend)
231231
const char *str = static_cast<const char *>(ptr);
232232
for (size_t i = 0; i < size; ++i)
233233
recv_data.push_back(str[i]);
234+
std::this_thread::sleep_for(std::chrono::microseconds(rand() % 10));
234235
}
235236
);
236237

0 commit comments

Comments
 (0)