diff --git a/CMakeLists.txt b/CMakeLists.txt
index c8fbbab..65d5d7a 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.12)
project(cpp_channel)
-set(PROJECT_VERSION 1.2.0)
+set(PROJECT_VERSION 1.2.1)
set(CMAKE_CXX_STANDARD 11 CACHE STRING "C++ standard")
set(CMAKE_CXX_STANDARD_REQUIRED YES)
diff --git a/README.md b/README.md
index 72f88ed..4ffa5ba 100644
--- a/README.md
+++ b/README.md
@@ -1,7 +1,7 @@
# Channel
[](https://github.com/andreiavrammsd/cpp-channel/actions) [](https://codecov.io/github/andreiavrammsd/cpp-channel)
-[](https://andreiavrammsd.github.io/cpp-channel/)
+[](https://andreiavrammsd.github.io/cpp-channel/)
### Thread-safe container for sharing data between threads (synchronized queue). Header-only. Compatible with C++11.
@@ -11,9 +11,10 @@
* Blocking (forever waiting to fetch).
* Range-based for loop supported.
* Close to prevent pushing and stop waiting to fetch.
-* Integrates well with STL algorithms in some cases. Eg:
+* Integrates with some of the STD algorithms. Eg:
* `std::move(ch.begin(), ch.end(), ...)`
* `std::transform(input_chan.begin(), input_chan.end(), msd::back_inserter(output_chan))`.
+ * `std::copy_if(chan.begin(), chan.end(), ...);`
* Tested with GCC, Clang, and MSVC.
* Includes stack-based, exception-free alternative (static channel).
@@ -136,6 +137,10 @@ int main() {
See [examples](https://github.com/andreiavrammsd/cpp-channel/tree/master/examples) and [documentation](https://andreiavrammsd.github.io/cpp-channel/).
+## Known limitations
+
+* In some cases, the integration with some STD algorithms does not compile with MSVC. See the [Transform test](https://github.com/andreiavrammsd/cpp-channel/blob/master/tests/channel_test.cpp).
+
Developed with [CLion](https://www.jetbrains.com/?from=serializer) and [Visual Studio Code](https://code.visualstudio.com/).
diff --git a/examples/multithreading.cpp b/examples/multithreading.cpp
index 3a061d5..49a6593 100644
--- a/examples/multithreading.cpp
+++ b/examples/multithreading.cpp
@@ -1,6 +1,7 @@
#include
#include
#include
+#include
#include
#include
@@ -15,7 +16,9 @@ int main()
// Read
const auto out = [](msd::channel& ch, std::size_t i) {
for (auto number : ch) {
- std::cout << number << " from thread: " << i << '\n';
+ std::stringstream stream;
+ stream << number << " from thread: " << i << '\n';
+ std::cout << stream.str();
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
};
diff --git a/include/msd/blocking_iterator.hpp b/include/msd/blocking_iterator.hpp
index 4ce6d7d..90a3b78 100644
--- a/include/msd/blocking_iterator.hpp
+++ b/include/msd/blocking_iterator.hpp
@@ -9,9 +9,9 @@
namespace msd {
/**
- * @brief An iterator that block the current thread, waiting to fetch elements from the channel.
+ * @brief An iterator that blocks the current thread, waiting to fetch elements from the channel.
*
- * Used to implement channel range-based for loop.
+ * @details Used to implement channel range-based for loop.
*
* @tparam Channel Type of channel being iterated.
*/
@@ -61,7 +61,7 @@ class blocking_iterator {
*
* @return The iterator itself.
*/
- blocking_iterator operator++() noexcept
+ blocking_iterator& operator++() noexcept
{
if (!chan_->read(value_)) {
is_end_ = true;
@@ -80,7 +80,6 @@ class blocking_iterator {
* @brief Makes iteration continue until the channel is closed and empty.
*
* @param other Another blocking_iterator to compare with.
- *
* @return true if the channel is not closed or not empty (continue iterating).
* @return false if the channel is closed and empty (stop iterating).
*/
@@ -95,7 +94,7 @@ class blocking_iterator {
/**
* @brief An output iterator pushes elements into a channel. Blocking until the channel is not full.
*
- * Used to integrate with standard algorithms that require an output iterator.
+ * @details Used to integrate with standard algorithms that require an output iterator.
*
* @tparam Channel Type of channel being iterated.
*/
@@ -137,13 +136,13 @@ class blocking_writer_iterator {
/**
* @brief Writes an element into the channel, blocking until space is available.
*
- * @param val The value to be written into the channel.
- *
+ * @param value The value to be written into the channel.
* @return The iterator itself.
+ * @note There is no effect if the channel is closed.
*/
- blocking_writer_iterator& operator=(const value_type& val)
+ blocking_writer_iterator& operator=(reference value)
{
- chan_->write(val);
+ chan_->write(value);
return *this;
}
@@ -151,6 +150,11 @@ class blocking_writer_iterator {
* @brief Not applicable (handled by operator=).
*
* @return The iterator itself.
+ *
+ * @note It's uncommon to return a reference to an iterator, but I don't want to return a value from the channel.
+ * This iterator is supposed to be used only to write values.
+ * I don't know if it's a terrible idea or not, but it looks related to the issue with MSVC
+ * in the Transform test in tests/channel_test.cpp.
*/
blocking_writer_iterator& operator*() { return *this; }
@@ -176,9 +180,7 @@ class blocking_writer_iterator {
* @brief Creates a blocking iterator for the given channel.
*
* @tparam Channel Type of channel being iterated.
- *
* @param chan Reference to the channel this iterator will iterate over.
- *
* @return A blocking iterator for the specified channel.
*/
template
diff --git a/include/msd/channel.hpp b/include/msd/channel.hpp
index 9ffd26a..eb6a61e 100644
--- a/include/msd/channel.hpp
+++ b/include/msd/channel.hpp
@@ -31,7 +31,8 @@ class closed_channel : public std::runtime_error {
/**
* @brief Thread-safe container for sharing data between threads.
*
- * Implements a blocking input iterator.
+ * - Not movable, not copyable.
+ * - Includes a blocking input iterator.
*
* @tparam T The type of the elements.
*/
@@ -85,9 +86,7 @@ class channel {
* @brief Pushes an element into the channel.
*
* @tparam Type The type of the elements.
- *
* @param value The element to be pushed into the channel.
- *
* @return true If an element was successfully pushed into the channel.
* @return false If the channel is closed.
*/
@@ -103,7 +102,6 @@ class channel {
}
queue_.push(std::forward(value));
- ++size_;
}
cnd_.notify_one();
@@ -115,7 +113,6 @@ class channel {
* @brief Pops an element from the channel.
*
* @param out Reference to the variable where the popped element will be stored.
- *
* @return true If an element was successfully read from the channel.
* @return false If the channel is closed and empty.
*/
@@ -125,13 +122,12 @@ class channel {
std::unique_lock lock{mtx_};
waitBeforeRead(lock);
- if (is_closed_ && size_ == 0) {
+ if (is_closed_ && queue_.empty()) {
return false;
}
out = std::move(queue_.front());
queue_.pop();
- --size_;
}
cnd_.notify_one();
@@ -147,7 +143,7 @@ class channel {
NODISCARD size_type size() const noexcept
{
std::unique_lock lock{mtx_};
- return size_;
+ return queue_.size();
}
/**
@@ -159,7 +155,7 @@ class channel {
NODISCARD bool empty() const noexcept
{
std::unique_lock lock{mtx_};
- return size_ == 0;
+ return queue_.empty();
}
/**
@@ -195,7 +191,7 @@ class channel {
NODISCARD bool drained() noexcept
{
std::unique_lock lock{mtx_};
- return size_ == 0 && is_closed_;
+ return queue_.empty() && is_closed_;
}
/**
@@ -212,9 +208,6 @@ class channel {
*/
iterator end() noexcept { return blocking_iterator>{*this, true}; }
- /**
- * Channel cannot be copied or moved.
- */
channel(const channel&) = delete;
channel& operator=(const channel&) = delete;
channel(channel&&) = delete;
@@ -223,7 +216,6 @@ class channel {
private:
std::queue queue_;
- std::size_t size_{0};
const size_type cap_{0};
mutable std::mutex mtx_;
std::condition_variable cnd_;
@@ -231,13 +223,13 @@ class channel {
void waitBeforeRead(std::unique_lock& lock)
{
- cnd_.wait(lock, [this]() { return size_ > 0 || is_closed_; });
+ cnd_.wait(lock, [this]() { return !queue_.empty() || is_closed_; });
};
void waitBeforeWrite(std::unique_lock& lock)
{
- if (cap_ > 0 && size_ == cap_) {
- cnd_.wait(lock, [this]() { return size_ < cap_; });
+ if (cap_ > 0 && queue_.size() == cap_) {
+ cnd_.wait(lock, [this]() { return queue_.size() < cap_; });
}
}
};
diff --git a/include/msd/static_channel.hpp b/include/msd/static_channel.hpp
index 469000f..bb6e7ba 100644
--- a/include/msd/static_channel.hpp
+++ b/include/msd/static_channel.hpp
@@ -16,9 +16,10 @@ namespace msd {
/**
* @brief Thread-safe container for sharing data between threads.
*
- * Allocates elements on the stack.
- * Does not throw exceptions.
- * Implements a blocking input iterator.
+ * - Allocates elements on the stack.
+ * - Does not throw exceptions.
+ * - Not movable, not copyable.
+ * - Includes a blocking input iterator.
*
* @tparam T The type of the elements.
* @tparam Capacity The maximum number of elements the channel can hold before blocking.
@@ -52,9 +53,7 @@ class static_channel {
* @brief Pushes an element into the channel.
*
* @tparam Type The type of the elements.
- *
* @param value The element to be pushed into the channel.
- *
* @return true If an element was successfully pushed into the channel.
* @return false If the channel is closed.
*/
@@ -82,7 +81,6 @@ class static_channel {
* @brief Pops an element from the channel.
*
* @param out Reference to the variable where the popped element will be stored.
- *
* @return true If an element was successfully read from the channel.
* @return false If the channel is closed and empty.
*/
@@ -179,9 +177,6 @@ class static_channel {
*/
iterator end() noexcept { return blocking_iterator>{*this, true}; }
- /**
- * Channel cannot be copied or moved.
- */
static_channel(const static_channel&) = delete;
static_channel& operator=(const static_channel&) = delete;
static_channel(static_channel&&) = delete;
diff --git a/tests/blocking_iterator_test.cpp b/tests/blocking_iterator_test.cpp
index 474fd16..b88ff6e 100644
--- a/tests/blocking_iterator_test.cpp
+++ b/tests/blocking_iterator_test.cpp
@@ -70,6 +70,7 @@ TEST(BlockingWriterIteratorTest, WriteToChannelUsingBackInserter)
*out = 20;
*out = 30;
channel.close();
+ *out = 40; // Ignored because the channel is closed
});
std::vector results;
diff --git a/tests/channel_test.cpp b/tests/channel_test.cpp
index d976dd2..3b353f3 100644
--- a/tests/channel_test.cpp
+++ b/tests/channel_test.cpp
@@ -6,6 +6,7 @@
#include
#include
#include
+#include
#include
#include
#include
@@ -323,47 +324,165 @@ TEST(ChannelTest, ReadWriteClose)
EXPECT_EQ(nums, numbers);
}
-TEST(ChannelTest, MergeChannels)
+class movable_only {
+ public:
+ explicit movable_only(int value) : value_{value} {}
+
+ movable_only() = default;
+
+ movable_only(const movable_only&)
+ {
+ std::cout << "Copy constructor should not be called";
+ std::abort();
+ }
+
+ movable_only(movable_only&& other) noexcept : value_{std::move(other.value_)} { other.value_ = 0; }
+
+ movable_only& operator=(const movable_only&)
+ {
+ std::cout << "Copy assignment should not be called";
+ std::abort();
+ }
+
+ movable_only& operator=(movable_only&& other) noexcept
+ {
+ if (this != &other) {
+ value_ = other.value_;
+ other.value_ = 0;
+ }
+
+ return *this;
+ }
+
+ int getValue() const { return value_; }
+
+ virtual ~movable_only() = default;
+
+ private:
+ int value_{0};
+};
+
+TEST(ChannelTest, Transform)
{
const int numbers = 100;
- const std::int64_t expected_sum = 5050 * 2;
- std::atomic sum{0};
- std::atomic nums{0};
+ const int expected_sum = 5050 * 2;
+ std::atomic sum{0};
+ std::atomic nums{0};
- msd::channel input_chan{30};
+ msd::channel input_chan{30};
msd::channel output_chan{10};
- // Send to channel
+ // Send to input channel
const auto writer = [&input_chan]() {
for (int i = 1; i <= numbers; ++i) {
- input_chan.write(i);
+ input_chan.write(movable_only{i});
}
input_chan.close();
};
+ // Transform input channel values from movable_only to int by multiplying by 2 and write to output channel
+ const auto double_transformer = [&input_chan, &output_chan]() {
+ const auto double_value = [](auto&& value) { return value.getValue() * 2; };
+#ifdef _MSC_VER
+ for (auto&& value : input_chan) {
+ output_chan.write(double_value(value));
+ }
+
+ // Does not work with std::transform
+ // -- Building for: Visual Studio 17 2022
+ // -- The C compiler identification is MSVC 19.43.34808.0
+ // -- The CXX compiler identification is MSVC 19.43.34808.0
+ //
+ // Release: does not compile - warning C4702: unreachable code
+ // Debug: compiles, but copies the movable_only object instead of moving it
+ //
+ // Posibilities:
+ // - I am doing something very wrong (see operator* in blocking_writer_iterator)
+ // - MSVC has a bug
+ // - https://github.com/ericniebler/range-v3/issues/1814
+ // - https://github.com/ericniebler/range-v3/issues/1762
+ // - Other compilers are more permissive
+#else
+ std::transform(input_chan.begin(), input_chan.end(), msd::back_inserter(output_chan), double_value);
+#endif // _MSC_VER
+
+ output_chan.close();
+ };
+
+ // Read from output channel
const auto reader = [&output_chan, &sum, &nums]() {
- for (const auto out : output_chan) { // blocking until channel is drained (closed and empty)
+ for (auto&& out : output_chan) { // blocking until channel is drained (closed and empty)
sum += out;
++nums;
}
};
- const auto double_transformer = [&input_chan, &output_chan]() {
- std::transform(input_chan.begin(), input_chan.end(), msd::back_inserter(output_chan),
- [](int value) { return value * 2; });
- output_chan.close();
- };
-
- const auto reader_1 = std::async(std::launch::async, reader);
- const auto reader_2 = std::async(std::launch::async, reader);
+ // Create async tasks for reading, transforming, and writing
+ const auto reader_task_1 = std::async(std::launch::async, reader);
+ const auto reader_task_2 = std::async(std::launch::async, reader);
const auto writer_task = std::async(std::launch::async, writer);
const auto transformer_task = std::async(std::launch::async, double_transformer);
- reader_1.wait();
- reader_2.wait();
writer_task.wait();
transformer_task.wait();
+ reader_task_1.wait();
+ reader_task_2.wait();
EXPECT_EQ(sum, expected_sum);
EXPECT_EQ(nums, numbers);
}
+
+TEST(ChannelTest, FilterAndAccumulate)
+{
+ msd::channel input_chan{10};
+ msd::channel output_chan{10};
+
+ // Producer: send numbers on input channel
+ const auto producer = [&input_chan]() {
+ for (int i = 1; i <= 101; ++i) {
+ input_chan.write(i);
+ }
+ input_chan.close();
+ };
+
+ // Filter: take even numbers from input channel and write them to output channel
+ const auto filter = [&input_chan, &output_chan]() {
+ std::copy_if(input_chan.begin(), input_chan.end(), msd::back_inserter(output_chan),
+ [](int value) { return value % 2 == 0; });
+ output_chan.close();
+ };
+
+ const auto producer_task = std::async(std::launch::async, producer);
+ const auto filter_task = std::async(std::launch::async, filter);
+
+ // Consumer: accumulate output channel values
+ const int sum = std::accumulate(output_chan.begin(), output_chan.end(), 0);
+
+ producer_task.wait();
+ filter_task.wait();
+
+ EXPECT_EQ(sum, 2550);
+}
+
+TEST(ChannelTest, CopyToVector)
+{
+ msd::channel chan{10};
+ std::vector results;
+
+ // Producer: write 1..4 into channel and close
+ const auto producer = [&]() {
+ std::fill_n(msd::back_inserter(chan), 4, 0);
+
+ for (int i = 1; i <= 4; ++i) {
+ chan.write(i);
+ }
+ chan.close();
+ };
+
+ producer();
+
+ // Copy from channel to vector
+ std::copy(chan.begin(), chan.end(), std::back_inserter(results));
+
+ EXPECT_EQ(results, std::vector({0, 0, 0, 0, 1, 2, 3, 4}));
+}