-
Notifications
You must be signed in to change notification settings - Fork 38
[New Feature] Timeout-capable channels #51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
9b32652
c149418
1f89d8d
892ba14
3a82b34
aedccfa
76775cb
cdf5c50
af2b030
949aca1
9811914
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,6 +12,7 @@ | |
* Blocking (forever waiting to fetch). | ||
* Range-based for loop supported. | ||
* Close to prevent pushing and stop waiting to fetch. | ||
* Optional timeout for read/write operations using `std::chrono`. | ||
* Integrates well with STL algorithms in some cases. Eg: std::move(ch.begin(), ch.end(), ...). | ||
* Tested with GCC, Clang, and MSVC. | ||
|
||
|
@@ -29,7 +30,6 @@ see [CMakeLists.txt](./examples/cmake-project/CMakeLists.txt) from the [CMake pr | |
|
||
```c++ | ||
#include <cassert> | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please revert. |
||
#include <msd/channel.hpp> | ||
|
||
int main() { | ||
|
@@ -83,7 +83,6 @@ int main() { | |
|
||
```c++ | ||
#include <iostream> | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please revert. |
||
#include <msd/channel.hpp> | ||
|
||
int main() { | ||
|
@@ -100,6 +99,31 @@ int main() { | |
} | ||
``` | ||
|
||
````c++ | ||
andreiavrammsd marked this conversation as resolved.
Show resolved
Hide resolved
|
||
#include <iostream> | ||
#include <msd/channel.hpp> | ||
|
||
int main() { | ||
msd::channel<int> ch{2}; | ||
ch.setTimeout(std::chrono::milliseconds(100)); | ||
|
||
std::clog << "Testing write timeout on full buffer:\n"; | ||
try { | ||
ch << 1; | ||
ch << 2; | ||
std::clog << "Attempting to write to full channel...\n"; | ||
ch << 3; | ||
} | ||
catch (const msd::channel_timeout& e) { | ||
std::clog << "Expected timeout occurred: " << e.what() << "\n"; | ||
} | ||
|
||
return 0; | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unnecessary. |
||
|
||
``` | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unnecessary. |
||
See [examples](examples). | ||
|
||
<br> | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
#include <chrono> | ||
#include <iostream> | ||
#include <thread> | ||
|
||
#include "msd/channel.hpp" | ||
|
||
// using namespace std::chrono_literals; for post-C++11 code, use this to save some headaches | ||
|
||
void demonstrateTimeouts() | ||
andreiavrammsd marked this conversation as resolved.
Show resolved
Hide resolved
|
||
{ | ||
// small capacity, short timeout | ||
msd::channel<int> ch{2}; | ||
ch.setTimeout(std::chrono::milliseconds(100)); | ||
|
||
std::cout << "Testing write timeout on full buffer:\n"; | ||
try { | ||
ch << 1; | ||
ch << 2; | ||
std::cout << "Attempting to write to full channel...\n"; | ||
ch << 3; | ||
} | ||
catch (const msd::channel_timeout& e) { | ||
std::cout << "Expected timeout occurred: " << e.what() << "\n"; | ||
} | ||
|
||
std::cout << "\nTesting read timeout on empty channel:\n"; | ||
|
||
msd::channel<int> ch2{5}; | ||
ch2.setTimeout(std::chrono::milliseconds(200)); | ||
|
||
try { | ||
int value; | ||
std::cout << "Attempting to read from empty channel...\n"; | ||
ch2 >> value; | ||
} | ||
catch (const msd::channel_timeout& e) { | ||
std::cout << "Expected timeout occurred: " << e.what() << "\n"; | ||
} | ||
|
||
std::cout << "\nDemonstrating timeout with range-based for loop:\n"; | ||
|
||
msd::channel<int> ch3{5}; | ||
ch3.setTimeout(std::chrono::milliseconds(200)); // lower this to see the timeout | ||
|
||
// Producer | ||
std::thread writer([&ch3]() { | ||
std::this_thread::sleep_for(std::chrono::milliseconds(200)); | ||
ch3 << 1; | ||
std::this_thread::sleep_for(std::chrono::milliseconds(200)); | ||
ch3 << 2; | ||
ch3.close(); | ||
}); | ||
|
||
// Consumer | ||
try { | ||
for (const auto& value : ch3) { | ||
std::cout << "Received value: " << value << "\n"; | ||
} | ||
} | ||
catch (const msd::channel_timeout& e) { | ||
std::cout << "Timeout in for loop: " << e.what() << "\n"; | ||
} | ||
|
||
writer.join(); | ||
} | ||
|
||
int main() | ||
{ | ||
// small capacity, short timeout | ||
msd::channel<int> ch{2}; | ||
ch.setTimeout(std::chrono::milliseconds(100)); | ||
|
||
std::cout << "Testing write timeout on full buffer:\n"; | ||
try { | ||
ch << 1; | ||
ch << 2; | ||
std::cout << "Attempting to write to full channel...\n"; | ||
ch << 3; | ||
} | ||
catch (const msd::channel_timeout& e) { | ||
std::cout << "Expected timeout occurred: " << e.what() << "\n"; | ||
} | ||
|
||
std::cout << "\nTesting read timeout on empty channel:\n"; | ||
|
||
msd::channel<int> ch2{5}; | ||
ch2.setTimeout(std::chrono::milliseconds(200)); | ||
|
||
try { | ||
int value; | ||
std::cout << "Attempting to read from empty channel...\n"; | ||
ch2 >> value; | ||
} | ||
catch (const msd::channel_timeout& e) { | ||
std::cout << "Expected timeout occurred: " << e.what() << "\n"; | ||
} | ||
|
||
std::cout << "\nDemonstrating timeout with range-based for loop:\n"; | ||
|
||
msd::channel<int> ch3{5}; | ||
ch3.setTimeout(std::chrono::milliseconds(200)); // lower this to see the timeout | ||
|
||
// Producer | ||
std::thread writer([&ch3]() { | ||
std::this_thread::sleep_for(std::chrono::milliseconds(200)); | ||
ch3 << 1; | ||
std::this_thread::sleep_for(std::chrono::milliseconds(200)); | ||
ch3 << 2; | ||
ch3.close(); | ||
}); | ||
|
||
// Consumer | ||
try { | ||
for (const auto& value : ch3) { | ||
std::cout << "Received value: " << value << "\n"; | ||
} | ||
} | ||
catch (const msd::channel_timeout& e) { | ||
std::cout << "Timeout in for loop: " << e.what() << "\n"; | ||
} | ||
|
||
writer.join(); | ||
|
||
return 0; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,9 @@ | ||
// Copyright (C) 2023 Andrei Avram | ||
|
||
#ifndef MSD_CHANNEL_HPP_ | ||
#define MSD_CHANNEL_HPP_ | ||
|
||
#include <atomic> | ||
#include <chrono> | ||
#include <condition_variable> | ||
#include <cstdlib> | ||
#include <mutex> | ||
|
@@ -30,6 +30,14 @@ class closed_channel : public std::runtime_error { | |
explicit closed_channel(const char* msg) : std::runtime_error{msg} {} | ||
}; | ||
|
||
/** | ||
* @brief Exception thrown when channel operation times out. | ||
*/ | ||
class channel_timeout : public std::runtime_error { | ||
public: | ||
explicit channel_timeout(const char* msg) : std::runtime_error{msg} {} | ||
}; | ||
|
||
/** | ||
* @brief Thread-safe container for sharing data between threads. | ||
* | ||
|
@@ -56,10 +64,24 @@ class channel { | |
*/ | ||
explicit constexpr channel(size_type capacity); | ||
|
||
/** | ||
* Sets a timeout for channel operations. | ||
* | ||
* @param timeout Duration after which operations will time out. | ||
*/ | ||
template <typename Rep, typename Period> | ||
void setTimeout(const std::chrono::duration<Rep, Period>& timeout); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a really useful scenario where you want to set a timeout after the channel has been declared, clear the timeout, or change it? Or would it be enough to have an overloaded constructor that accepts the capacity and the timeout? And then There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point, this is probably over-engineered since most of the times you do not really need that to change at runtime/past the program's initialization. The thing is that when I initially programmed this I tried to add time-out functionality on top of the original implementation as much as possible, so I was reluctant to touch the constructors, this is a good idea however and I'll refactor the PR as soon as I have time. |
||
|
||
/** | ||
* Clears any previously set timeout. | ||
*/ | ||
void clearTimeout() noexcept; | ||
|
||
/** | ||
* Pushes an element into the channel. | ||
* | ||
* @throws closed_channel if channel is closed. | ||
* @throws channel_timeout if operation times out. | ||
*/ | ||
template <typename Type> | ||
friend channel<typename std::decay<Type>::type>& operator<<(channel<typename std::decay<Type>::type>&, Type&&); | ||
|
@@ -68,6 +90,7 @@ class channel { | |
* Pops an element from the channel. | ||
* | ||
* @tparam Type The type of the elements | ||
* @throws channel_timeout if operation times out. | ||
*/ | ||
template <typename Type> | ||
friend channel<Type>& operator>>(channel<Type>&, Type&); | ||
|
@@ -114,14 +137,17 @@ class channel { | |
std::mutex mtx_; | ||
std::condition_variable cnd_; | ||
std::atomic<bool> is_closed_{false}; | ||
std::atomic<std::chrono::nanoseconds> timeout_{std::chrono::nanoseconds::zero()}; | ||
|
||
template <typename Predicate> | ||
bool waitWithTimeout(std::unique_lock<std::mutex>&, Predicate); | ||
bool waitBeforeRead(std::unique_lock<std::mutex>&); | ||
bool waitBeforeWrite(std::unique_lock<std::mutex>&); | ||
|
||
inline void waitBeforeRead(std::unique_lock<std::mutex>&); | ||
inline void waitBeforeWrite(std::unique_lock<std::mutex>&); | ||
friend class blocking_iterator<channel>; | ||
}; | ||
|
||
} // namespace msd | ||
|
||
#include "channel.inl" | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Typo. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please don't resolve conversations. They help to see if we understand one another. The author of the conversation resolves it when the changes are done accordingly. In this specific case, |
||
#endif // MSD_CHANNEL_HPP_ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"gcc-12 is already the newest version (12.3.0-1ubuntu1~22.04)."
Is
update-alternatives
enough?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By removing the
apt-get
call I get:I am honestly not familiar with Ubuntu so maybe I am not doing this correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you tried this locally, maybe you don't have GCC 12 already installed. On the GitHub runner, I see 12 is already installed, but not used as default. So just try it here without apt-get update and install.