diff --git a/.github/workflows/cmake.yml b/.github/workflows/cmake.yml index c1f05be..91b6e98 100644 --- a/.github/workflows/cmake.yml +++ b/.github/workflows/cmake.yml @@ -46,6 +46,16 @@ jobs: steps: - uses: actions/checkout@v4 + + - name: Install GCC 12 + if: runner.os == 'Linux' + run: | + sudo apt-get update + sudo apt-get install -y gcc-12 g++-12 cmake + sudo update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-12 100 + sudo update-alternatives --install /usr/bin/g++ g++ /usr/bin/g++-12 100 + gcc --version + g++ --version - name: Create Build Environment # Some projects don't allow in-source building, so create a separate build directory diff --git a/README.md b/README.md index 71f2e2e..693bf4b 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,7 @@ * Blocking (forever waiting to fetch). * Range-based for loop supported. * Close to prevent pushing and stop waiting to fetch. +* Optional timeout for read/write operations using `std::chrono`. * Integrates well with STL algorithms in some cases. Eg: std::move(ch.begin(), ch.end(), ...). * Tested with GCC, Clang, and MSVC. @@ -29,7 +30,6 @@ see [CMakeLists.txt](./examples/cmake-project/CMakeLists.txt) from the [CMake pr ```c++ #include - #include int main() { @@ -83,7 +83,6 @@ int main() { ```c++ #include - #include int main() { @@ -100,6 +99,31 @@ int main() { } ``` +```c++ +#include +#include + +int main() { + msd::channel ch{2}; + ch.setTimeout(std::chrono::milliseconds(100)); + + std::clog << "Testing write timeout on full buffer:\n"; + try { + ch << 1; + ch << 2; + std::clog << "Attempting to write to full channel...\n"; + ch << 3; + } + catch (const msd::channel_timeout& e) { + std::clog << "Expected timeout occurred: " << e.what() << "\n"; + } + + return 0; +} + + +``` + See [examples](examples).
diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 042bf70..f008a34 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -36,3 +36,6 @@ add_example(example_multithreading multithreading.cpp) add_example(example_streaming streaming.cpp) run_example(example_streaming) + +add_example(example_timeout timeout.cpp) +run_example(example_timeout) diff --git a/examples/timeout.cpp b/examples/timeout.cpp new file mode 100644 index 0000000..a27af38 --- /dev/null +++ b/examples/timeout.cpp @@ -0,0 +1,67 @@ +#include +#include +#include + +#include "msd/channel.hpp" + +// using namespace std::chrono_literals; for post-C++11 code, use this to save some headaches + +int main() +{ + // small capacity, short timeout + msd::channel ch{2}; + ch.setTimeout(std::chrono::milliseconds(100)); + + std::cout << "Testing write timeout on full buffer:\n"; + try { + ch << 1; + ch << 2; + std::cout << "Attempting to write to full channel...\n"; + ch << 3; + } + catch (const msd::channel_timeout& e) { + std::cout << "Expected timeout occurred: " << e.what() << "\n"; + } + + std::cout << "\nTesting read timeout on empty channel:\n"; + + msd::channel ch2{5}; + ch2.setTimeout(std::chrono::milliseconds(200)); + + try { + int value; + std::cout << "Attempting to read from empty channel...\n"; + ch2 >> value; + } + catch (const msd::channel_timeout& e) { + std::cout << "Expected timeout occurred: " << e.what() << "\n"; + } + + std::cout << "\nDemonstrating timeout with range-based for loop:\n"; + + msd::channel ch3{5}; + ch3.setTimeout(std::chrono::milliseconds(200)); // lower this to see the timeout + + // Producer + std::thread writer([&ch3]() { + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + ch3 << 1; + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + ch3 << 2; + ch3.close(); + }); + + // Consumer + try { + for (const auto& value : ch3) { + std::cout << "Received value: " << value << "\n"; + } + } + catch (const msd::channel_timeout& e) { + std::cout << "Timeout in for loop: " << e.what() << "\n"; + } + + writer.join(); + + return 0; +} diff --git a/include/msd/channel.hpp b/include/msd/channel.hpp index 322d265..93e1694 100644 --- a/include/msd/channel.hpp +++ b/include/msd/channel.hpp @@ -1,9 +1,9 @@ // Copyright (C) 2023 Andrei Avram - #ifndef MSD_CHANNEL_HPP_ #define MSD_CHANNEL_HPP_ #include +#include #include #include #include @@ -30,6 +30,14 @@ class closed_channel : public std::runtime_error { explicit closed_channel(const char* msg) : std::runtime_error{msg} {} }; +/** + * @brief Exception thrown when channel operation times out. + */ +class channel_timeout : public std::runtime_error { + public: + explicit channel_timeout(const char* msg) : std::runtime_error{msg} {} +}; + /** * @brief Thread-safe container for sharing data between threads. * @@ -56,10 +64,24 @@ class channel { */ explicit constexpr channel(size_type capacity); + /** + * Sets a timeout for channel operations. + * + * @param timeout Duration after which operations will time out. + */ + template + void setTimeout(const std::chrono::duration& timeout); + + /** + * Clears any previously set timeout. + */ + void clearTimeout() noexcept; + /** * Pushes an element into the channel. * * @throws closed_channel if channel is closed. + * @throws channel_timeout if operation times out. */ template friend channel::type>& operator<<(channel::type>&, Type&&); @@ -68,6 +90,7 @@ class channel { * Pops an element from the channel. * * @tparam Type The type of the elements + * @throws channel_timeout if operation times out. */ template friend channel& operator>>(channel&, Type&); @@ -114,14 +137,16 @@ class channel { std::mutex mtx_; std::condition_variable cnd_; std::atomic is_closed_{false}; + std::atomic timeout_{std::chrono::nanoseconds::zero()}; + + template + bool waitWithTimeout(std::unique_lock&, Predicate&&); + bool waitBeforeRead(std::unique_lock&); + bool waitBeforeWrite(std::unique_lock&); - inline void waitBeforeRead(std::unique_lock&); - inline void waitBeforeWrite(std::unique_lock&); friend class blocking_iterator; }; } // namespace msd -#include "channel.inl" - #endif // MSD_CHANNEL_HPP_ diff --git a/include/msd/channel.inl b/include/msd/channel.inl index fbf0f5d..12efc17 100644 --- a/include/msd/channel.inl +++ b/include/msd/channel.inl @@ -1,5 +1,4 @@ // Copyright (C) 2023 Andrei Avram - namespace msd { template @@ -7,23 +6,62 @@ constexpr channel::channel(const size_type capacity) : cap_{capacity} { } +template +template +void channel::setTimeout(const std::chrono::duration& timeout) +{ + timeout_ = std::chrono::duration_cast(timeout); +} + +template +void channel::clearTimeout() noexcept +{ + timeout_ = std::chrono::nanoseconds::zero(); +} + +template +template +bool channel::waitWithTimeout(std::unique_lock& lock, Predicate&& predicate) +{ + auto timeout = timeout_.load(std::memory_order_relaxed); + if (timeout == std::chrono::nanoseconds::zero()) { + cnd_.wait(lock, std::forward(predicate)); + return true; + } + + return cnd_.wait_for(lock, timeout, std::forward(predicate)); +} + +template +bool channel::waitBeforeRead(std::unique_lock& lock) +{ + return waitWithTimeout(lock, [this]() { return !empty() || closed(); }); +} + +template +bool channel::waitBeforeWrite(std::unique_lock& lock) +{ + if (cap_ > 0 && size_ == cap_) { + return waitWithTimeout(lock, [this]() { return size_ < cap_; }); + } + return true; +} + template channel::type>& operator<<(channel::type>& ch, T&& in) { if (ch.closed()) { throw closed_channel{"cannot write on closed channel"}; } - { std::unique_lock lock{ch.mtx_}; - ch.waitBeforeWrite(lock); - + if (!ch.waitBeforeWrite(lock)) { + throw channel_timeout{"write operation timed out"}; + } ch.queue_.push(std::forward(in)); ++ch.size_; } - ch.cnd_.notify_one(); - return ch; } @@ -33,20 +71,18 @@ channel& operator>>(channel& ch, T& out) if (ch.closed() && ch.empty()) { return ch; } - { std::unique_lock lock{ch.mtx_}; - ch.waitBeforeRead(lock); - + if (!ch.waitBeforeRead(lock)) { + throw channel_timeout{"read operation timed out"}; + } if (!ch.empty()) { out = std::move(ch.queue_.front()); ch.queue_.pop(); --ch.size_; } } - ch.cnd_.notify_one(); - return ch; } @@ -67,7 +103,7 @@ void channel::close() noexcept { { std::unique_lock lock{mtx_}; - is_closed_.store(true); + is_closed_.store(true, std::memory_order_relaxed); } cnd_.notify_all(); } @@ -75,7 +111,7 @@ void channel::close() noexcept template bool channel::closed() const noexcept { - return is_closed_.load(); + return is_closed_.load(std::memory_order_relaxed); } template @@ -90,18 +126,4 @@ blocking_iterator> channel::end() noexcept return blocking_iterator>{*this}; } -template -void channel::waitBeforeRead(std::unique_lock& lock) -{ - cnd_.wait(lock, [this]() { return !empty() || closed(); }); -} - -template -void channel::waitBeforeWrite(std::unique_lock& lock) -{ - if (cap_ > 0 && size_ == cap_) { - cnd_.wait(lock, [this]() { return size_ < cap_; }); - } -} - } // namespace msd diff --git a/tests/channel_test.cpp b/tests/channel_test.cpp index 5583957..f938c71 100644 --- a/tests/channel_test.cpp +++ b/tests/channel_test.cpp @@ -4,6 +4,7 @@ #include #include +#include #include #include #include @@ -205,3 +206,88 @@ TEST(ChannelTest, Multithreading) EXPECT_EQ(expected, sum_numbers); } + +TEST(ChannelTest, TimeoutBasicOperations) +{ + msd::channel channel(1); + channel.setTimeout(std::chrono::milliseconds(50)); + + channel << 1; + EXPECT_THROW(channel << 2, msd::channel_timeout); + + int out = 0; + channel >> out; + EXPECT_EQ(1, out); + EXPECT_THROW(channel >> out, msd::channel_timeout); +} + +TEST(ChannelTest, TimeoutClearAndReset) +{ + msd::channel channel(1); + channel.setTimeout(std::chrono::milliseconds(50)); + channel.clearTimeout(); + + channel << 1; + + std::thread writer(std::bind( + [](msd::channel& ch) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + int value = 0; + ch >> value; + }, + std::ref(channel))); + + EXPECT_NO_THROW(channel << 2); + writer.join(); +} + +TEST(ChannelTest, TimeoutIterator) +{ + msd::channel channel(5); + channel.setTimeout(std::chrono::milliseconds(50)); + + EXPECT_THROW( + { + for (const int& value : channel) { + (void)value; + } + }, + msd::channel_timeout); +} + +TEST(ChannelTest, TimeoutDynamicDuration) +{ + msd::channel channel(1); + + channel.setTimeout(std::chrono::milliseconds(50)); + int dummy = 0; + EXPECT_THROW(channel >> dummy, msd::channel_timeout); + + channel.setTimeout(std::chrono::milliseconds(1000)); + std::thread writer(std::bind( + [](msd::channel& ch) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + ch << 1; + }, + std::ref(channel))); + + int value = 0; + EXPECT_NO_THROW(channel >> value); + EXPECT_EQ(value, 1); + writer.join(); +} + +TEST(ChannelTest, TimeoutClosedChannel) +{ + msd::channel channel(1); + channel.setTimeout(std::chrono::milliseconds(50)); + + channel << 1; + channel.close(); + + int value = 0; + EXPECT_NO_THROW(channel >> value); + EXPECT_EQ(value, 1); + EXPECT_NO_THROW(channel >> value); + EXPECT_THROW(channel << 1, msd::closed_channel); +}