diff --git a/CMakeLists.txt b/CMakeLists.txt index c8fbbab..65d5d7a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,6 +1,6 @@ cmake_minimum_required(VERSION 3.12) project(cpp_channel) -set(PROJECT_VERSION 1.2.0) +set(PROJECT_VERSION 1.2.1) set(CMAKE_CXX_STANDARD 11 CACHE STRING "C++ standard") set(CMAKE_CXX_STANDARD_REQUIRED YES) diff --git a/README.md b/README.md index 72f88ed..4ffa5ba 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ # Channel [![build](https://github.com/andreiavrammsd/cpp-channel/actions/workflows/cmake.yml/badge.svg)](https://github.com/andreiavrammsd/cpp-channel/actions) [![codecov](https://codecov.io/github/andreiavrammsd/cpp-channel/graph/badge.svg?token=CKQ0TVW62Z)](https://codecov.io/github/andreiavrammsd/cpp-channel) -[![documentation](https://github.com/andreiavrammsd/cpp-channel/workflows/doc/badge.svg)](https://andreiavrammsd.github.io/cpp-channel/) +[![documentation](https://github.com/andreiavrammsd/cpp-channel/actions/workflows/doc.yml/badge.svg)](https://andreiavrammsd.github.io/cpp-channel/) ### Thread-safe container for sharing data between threads (synchronized queue). Header-only. Compatible with C++11. @@ -11,9 +11,10 @@ * Blocking (forever waiting to fetch). * Range-based for loop supported. * Close to prevent pushing and stop waiting to fetch. -* Integrates well with STL algorithms in some cases. Eg: +* Integrates with some of the STD algorithms. Eg: * `std::move(ch.begin(), ch.end(), ...)` * `std::transform(input_chan.begin(), input_chan.end(), msd::back_inserter(output_chan))`. + * `std::copy_if(chan.begin(), chan.end(), ...);` * Tested with GCC, Clang, and MSVC. * Includes stack-based, exception-free alternative (static channel). @@ -136,6 +137,10 @@ int main() { See [examples](https://github.com/andreiavrammsd/cpp-channel/tree/master/examples) and [documentation](https://andreiavrammsd.github.io/cpp-channel/). +## Known limitations + +* In some cases, the integration with some STD algorithms does not compile with MSVC. See the [Transform test](https://github.com/andreiavrammsd/cpp-channel/blob/master/tests/channel_test.cpp). +
Developed with [CLion](https://www.jetbrains.com/?from=serializer) and [Visual Studio Code](https://code.visualstudio.com/). diff --git a/examples/multithreading.cpp b/examples/multithreading.cpp index 3a061d5..49a6593 100644 --- a/examples/multithreading.cpp +++ b/examples/multithreading.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include @@ -15,7 +16,9 @@ int main() // Read const auto out = [](msd::channel& ch, std::size_t i) { for (auto number : ch) { - std::cout << number << " from thread: " << i << '\n'; + std::stringstream stream; + stream << number << " from thread: " << i << '\n'; + std::cout << stream.str(); std::this_thread::sleep_for(std::chrono::milliseconds(500)); } }; diff --git a/include/msd/blocking_iterator.hpp b/include/msd/blocking_iterator.hpp index 4ce6d7d..90a3b78 100644 --- a/include/msd/blocking_iterator.hpp +++ b/include/msd/blocking_iterator.hpp @@ -9,9 +9,9 @@ namespace msd { /** - * @brief An iterator that block the current thread, waiting to fetch elements from the channel. + * @brief An iterator that blocks the current thread, waiting to fetch elements from the channel. * - * Used to implement channel range-based for loop. + * @details Used to implement channel range-based for loop. * * @tparam Channel Type of channel being iterated. */ @@ -61,7 +61,7 @@ class blocking_iterator { * * @return The iterator itself. */ - blocking_iterator operator++() noexcept + blocking_iterator& operator++() noexcept { if (!chan_->read(value_)) { is_end_ = true; @@ -80,7 +80,6 @@ class blocking_iterator { * @brief Makes iteration continue until the channel is closed and empty. * * @param other Another blocking_iterator to compare with. - * * @return true if the channel is not closed or not empty (continue iterating). * @return false if the channel is closed and empty (stop iterating). */ @@ -95,7 +94,7 @@ class blocking_iterator { /** * @brief An output iterator pushes elements into a channel. Blocking until the channel is not full. * - * Used to integrate with standard algorithms that require an output iterator. + * @details Used to integrate with standard algorithms that require an output iterator. * * @tparam Channel Type of channel being iterated. */ @@ -137,13 +136,13 @@ class blocking_writer_iterator { /** * @brief Writes an element into the channel, blocking until space is available. * - * @param val The value to be written into the channel. - * + * @param value The value to be written into the channel. * @return The iterator itself. + * @note There is no effect if the channel is closed. */ - blocking_writer_iterator& operator=(const value_type& val) + blocking_writer_iterator& operator=(reference value) { - chan_->write(val); + chan_->write(value); return *this; } @@ -151,6 +150,11 @@ class blocking_writer_iterator { * @brief Not applicable (handled by operator=). * * @return The iterator itself. + * + * @note It's uncommon to return a reference to an iterator, but I don't want to return a value from the channel. + * This iterator is supposed to be used only to write values. + * I don't know if it's a terrible idea or not, but it looks related to the issue with MSVC + * in the Transform test in tests/channel_test.cpp. */ blocking_writer_iterator& operator*() { return *this; } @@ -176,9 +180,7 @@ class blocking_writer_iterator { * @brief Creates a blocking iterator for the given channel. * * @tparam Channel Type of channel being iterated. - * * @param chan Reference to the channel this iterator will iterate over. - * * @return A blocking iterator for the specified channel. */ template diff --git a/include/msd/channel.hpp b/include/msd/channel.hpp index 9ffd26a..eb6a61e 100644 --- a/include/msd/channel.hpp +++ b/include/msd/channel.hpp @@ -31,7 +31,8 @@ class closed_channel : public std::runtime_error { /** * @brief Thread-safe container for sharing data between threads. * - * Implements a blocking input iterator. + * - Not movable, not copyable. + * - Includes a blocking input iterator. * * @tparam T The type of the elements. */ @@ -85,9 +86,7 @@ class channel { * @brief Pushes an element into the channel. * * @tparam Type The type of the elements. - * * @param value The element to be pushed into the channel. - * * @return true If an element was successfully pushed into the channel. * @return false If the channel is closed. */ @@ -103,7 +102,6 @@ class channel { } queue_.push(std::forward(value)); - ++size_; } cnd_.notify_one(); @@ -115,7 +113,6 @@ class channel { * @brief Pops an element from the channel. * * @param out Reference to the variable where the popped element will be stored. - * * @return true If an element was successfully read from the channel. * @return false If the channel is closed and empty. */ @@ -125,13 +122,12 @@ class channel { std::unique_lock lock{mtx_}; waitBeforeRead(lock); - if (is_closed_ && size_ == 0) { + if (is_closed_ && queue_.empty()) { return false; } out = std::move(queue_.front()); queue_.pop(); - --size_; } cnd_.notify_one(); @@ -147,7 +143,7 @@ class channel { NODISCARD size_type size() const noexcept { std::unique_lock lock{mtx_}; - return size_; + return queue_.size(); } /** @@ -159,7 +155,7 @@ class channel { NODISCARD bool empty() const noexcept { std::unique_lock lock{mtx_}; - return size_ == 0; + return queue_.empty(); } /** @@ -195,7 +191,7 @@ class channel { NODISCARD bool drained() noexcept { std::unique_lock lock{mtx_}; - return size_ == 0 && is_closed_; + return queue_.empty() && is_closed_; } /** @@ -212,9 +208,6 @@ class channel { */ iterator end() noexcept { return blocking_iterator>{*this, true}; } - /** - * Channel cannot be copied or moved. - */ channel(const channel&) = delete; channel& operator=(const channel&) = delete; channel(channel&&) = delete; @@ -223,7 +216,6 @@ class channel { private: std::queue queue_; - std::size_t size_{0}; const size_type cap_{0}; mutable std::mutex mtx_; std::condition_variable cnd_; @@ -231,13 +223,13 @@ class channel { void waitBeforeRead(std::unique_lock& lock) { - cnd_.wait(lock, [this]() { return size_ > 0 || is_closed_; }); + cnd_.wait(lock, [this]() { return !queue_.empty() || is_closed_; }); }; void waitBeforeWrite(std::unique_lock& lock) { - if (cap_ > 0 && size_ == cap_) { - cnd_.wait(lock, [this]() { return size_ < cap_; }); + if (cap_ > 0 && queue_.size() == cap_) { + cnd_.wait(lock, [this]() { return queue_.size() < cap_; }); } } }; diff --git a/include/msd/static_channel.hpp b/include/msd/static_channel.hpp index 469000f..bb6e7ba 100644 --- a/include/msd/static_channel.hpp +++ b/include/msd/static_channel.hpp @@ -16,9 +16,10 @@ namespace msd { /** * @brief Thread-safe container for sharing data between threads. * - * Allocates elements on the stack. - * Does not throw exceptions. - * Implements a blocking input iterator. + * - Allocates elements on the stack. + * - Does not throw exceptions. + * - Not movable, not copyable. + * - Includes a blocking input iterator. * * @tparam T The type of the elements. * @tparam Capacity The maximum number of elements the channel can hold before blocking. @@ -52,9 +53,7 @@ class static_channel { * @brief Pushes an element into the channel. * * @tparam Type The type of the elements. - * * @param value The element to be pushed into the channel. - * * @return true If an element was successfully pushed into the channel. * @return false If the channel is closed. */ @@ -82,7 +81,6 @@ class static_channel { * @brief Pops an element from the channel. * * @param out Reference to the variable where the popped element will be stored. - * * @return true If an element was successfully read from the channel. * @return false If the channel is closed and empty. */ @@ -179,9 +177,6 @@ class static_channel { */ iterator end() noexcept { return blocking_iterator>{*this, true}; } - /** - * Channel cannot be copied or moved. - */ static_channel(const static_channel&) = delete; static_channel& operator=(const static_channel&) = delete; static_channel(static_channel&&) = delete; diff --git a/tests/blocking_iterator_test.cpp b/tests/blocking_iterator_test.cpp index 474fd16..b88ff6e 100644 --- a/tests/blocking_iterator_test.cpp +++ b/tests/blocking_iterator_test.cpp @@ -70,6 +70,7 @@ TEST(BlockingWriterIteratorTest, WriteToChannelUsingBackInserter) *out = 20; *out = 30; channel.close(); + *out = 40; // Ignored because the channel is closed }); std::vector results; diff --git a/tests/channel_test.cpp b/tests/channel_test.cpp index d976dd2..3b353f3 100644 --- a/tests/channel_test.cpp +++ b/tests/channel_test.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -323,47 +324,165 @@ TEST(ChannelTest, ReadWriteClose) EXPECT_EQ(nums, numbers); } -TEST(ChannelTest, MergeChannels) +class movable_only { + public: + explicit movable_only(int value) : value_{value} {} + + movable_only() = default; + + movable_only(const movable_only&) + { + std::cout << "Copy constructor should not be called"; + std::abort(); + } + + movable_only(movable_only&& other) noexcept : value_{std::move(other.value_)} { other.value_ = 0; } + + movable_only& operator=(const movable_only&) + { + std::cout << "Copy assignment should not be called"; + std::abort(); + } + + movable_only& operator=(movable_only&& other) noexcept + { + if (this != &other) { + value_ = other.value_; + other.value_ = 0; + } + + return *this; + } + + int getValue() const { return value_; } + + virtual ~movable_only() = default; + + private: + int value_{0}; +}; + +TEST(ChannelTest, Transform) { const int numbers = 100; - const std::int64_t expected_sum = 5050 * 2; - std::atomic sum{0}; - std::atomic nums{0}; + const int expected_sum = 5050 * 2; + std::atomic sum{0}; + std::atomic nums{0}; - msd::channel input_chan{30}; + msd::channel input_chan{30}; msd::channel output_chan{10}; - // Send to channel + // Send to input channel const auto writer = [&input_chan]() { for (int i = 1; i <= numbers; ++i) { - input_chan.write(i); + input_chan.write(movable_only{i}); } input_chan.close(); }; + // Transform input channel values from movable_only to int by multiplying by 2 and write to output channel + const auto double_transformer = [&input_chan, &output_chan]() { + const auto double_value = [](auto&& value) { return value.getValue() * 2; }; +#ifdef _MSC_VER + for (auto&& value : input_chan) { + output_chan.write(double_value(value)); + } + + // Does not work with std::transform + // -- Building for: Visual Studio 17 2022 + // -- The C compiler identification is MSVC 19.43.34808.0 + // -- The CXX compiler identification is MSVC 19.43.34808.0 + // + // Release: does not compile - warning C4702: unreachable code + // Debug: compiles, but copies the movable_only object instead of moving it + // + // Posibilities: + // - I am doing something very wrong (see operator* in blocking_writer_iterator) + // - MSVC has a bug + // - https://github.com/ericniebler/range-v3/issues/1814 + // - https://github.com/ericniebler/range-v3/issues/1762 + // - Other compilers are more permissive +#else + std::transform(input_chan.begin(), input_chan.end(), msd::back_inserter(output_chan), double_value); +#endif // _MSC_VER + + output_chan.close(); + }; + + // Read from output channel const auto reader = [&output_chan, &sum, &nums]() { - for (const auto out : output_chan) { // blocking until channel is drained (closed and empty) + for (auto&& out : output_chan) { // blocking until channel is drained (closed and empty) sum += out; ++nums; } }; - const auto double_transformer = [&input_chan, &output_chan]() { - std::transform(input_chan.begin(), input_chan.end(), msd::back_inserter(output_chan), - [](int value) { return value * 2; }); - output_chan.close(); - }; - - const auto reader_1 = std::async(std::launch::async, reader); - const auto reader_2 = std::async(std::launch::async, reader); + // Create async tasks for reading, transforming, and writing + const auto reader_task_1 = std::async(std::launch::async, reader); + const auto reader_task_2 = std::async(std::launch::async, reader); const auto writer_task = std::async(std::launch::async, writer); const auto transformer_task = std::async(std::launch::async, double_transformer); - reader_1.wait(); - reader_2.wait(); writer_task.wait(); transformer_task.wait(); + reader_task_1.wait(); + reader_task_2.wait(); EXPECT_EQ(sum, expected_sum); EXPECT_EQ(nums, numbers); } + +TEST(ChannelTest, FilterAndAccumulate) +{ + msd::channel input_chan{10}; + msd::channel output_chan{10}; + + // Producer: send numbers on input channel + const auto producer = [&input_chan]() { + for (int i = 1; i <= 101; ++i) { + input_chan.write(i); + } + input_chan.close(); + }; + + // Filter: take even numbers from input channel and write them to output channel + const auto filter = [&input_chan, &output_chan]() { + std::copy_if(input_chan.begin(), input_chan.end(), msd::back_inserter(output_chan), + [](int value) { return value % 2 == 0; }); + output_chan.close(); + }; + + const auto producer_task = std::async(std::launch::async, producer); + const auto filter_task = std::async(std::launch::async, filter); + + // Consumer: accumulate output channel values + const int sum = std::accumulate(output_chan.begin(), output_chan.end(), 0); + + producer_task.wait(); + filter_task.wait(); + + EXPECT_EQ(sum, 2550); +} + +TEST(ChannelTest, CopyToVector) +{ + msd::channel chan{10}; + std::vector results; + + // Producer: write 1..4 into channel and close + const auto producer = [&]() { + std::fill_n(msd::back_inserter(chan), 4, 0); + + for (int i = 1; i <= 4; ++i) { + chan.write(i); + } + chan.close(); + }; + + producer(); + + // Copy from channel to vector + std::copy(chan.begin(), chan.end(), std::back_inserter(results)); + + EXPECT_EQ(results, std::vector({0, 0, 0, 0, 1, 2, 3, 4})); +}