Skip to content

Output iterator #70

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jun 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .clang-tidy
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ Checks: >
-readability-named-parameter,
-altera-unroll-loops,
-llvmlibc-inline-function-decl,
-cppcoreguidelines-avoid-const-or-ref-data-members
-cert-dcl21-cpp,
-fuchsia-default-arguments-declarations

WarningsAsErrors: "*"

Expand Down
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.12)
project(cpp_channel)
set(PROJECT_VERSION 1.1.0)
set(PROJECT_VERSION 1.2.0)

set(CMAKE_CXX_STANDARD 11 CACHE STRING "C++ standard")
set(CMAKE_CXX_STANDARD_REQUIRED YES)
Expand Down
12 changes: 7 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
# Channel

[![build](https://github.com/andreiavrammsd/cpp-channel/workflows/build/badge.svg)](https://github.com/andreiavrammsd/cpp-channel/actions) [![codecov](https://codecov.io/github/andreiavrammsd/cpp-channel/graph/badge.svg?token=CKQ0TVW62Z)](https://codecov.io/github/andreiavrammsd/cpp-channel)
[![build](https://github.com/andreiavrammsd/cpp-channel/actions/workflows/cmake.yml/badge.svg)](https://github.com/andreiavrammsd/cpp-channel/actions) [![codecov](https://codecov.io/github/andreiavrammsd/cpp-channel/graph/badge.svg?token=CKQ0TVW62Z)](https://codecov.io/github/andreiavrammsd/cpp-channel)
[![documentation](https://github.com/andreiavrammsd/cpp-channel/workflows/doc/badge.svg)](https://andreiavrammsd.github.io/cpp-channel/)

### Thread-safe container for sharing data between threads. Header-only.
### Thread-safe container for sharing data between threads (synchronized queue). Header-only. Compatible with C++11.

* Thread-safe push and fetch.
* Use stream operators to push (<<) and fetch (>>) items.
* Value type must be default constructible.
* Blocking (forever waiting to fetch).
* Range-based for loop supported.
* Close to prevent pushing and stop waiting to fetch.
* Integrates well with STL algorithms in some cases. Eg: std::move(ch.begin(), ch.end(), ...).
* Integrates well with STL algorithms in some cases. Eg:
* `std::move(ch.begin(), ch.end(), ...)`
* `std::transform(input_chan.begin(), input_chan.end(), msd::back_inserter(output_chan))`.
* Tested with GCC, Clang, and MSVC.
* Includes stack-based, exception-free alternative (static channel).

Expand All @@ -25,9 +27,9 @@ Choose one of the methods:

* Copy the [include](https://github.com/andreiavrammsd/cpp-channel/tree/master/include) directory into your project and add it to your include path.
* [CMake FetchContent](https://github.com/andreiavrammsd/cpp-channel/tree/master/examples/cmake-project)
* [CMake install](https://cmake.org/cmake/help/latest/command/install.html)
* [CMake install](https://cmake.org/cmake/help/latest/command/install.html) - Choose a [version](https://github.com/andreiavrammsd/cpp-channel/releases), then run:
```shell
VERSION=1.1.0 \
VERSION=X.Y.Z \
&& wget https://github.com/andreiavrammsd/cpp-channel/archive/refs/tags/v$VERSION.zip \
&& unzip v$VERSION.zip \
&& cd cpp-channel-$VERSION \
Expand Down
3 changes: 3 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,6 @@ run_example(example_streaming)

add_example(example_multithreading_static_channel multithreading_static_channel.cpp)
run_example(example_multithreading_static_channel)

add_example(example_merge_channels merge_channels.cpp)
run_example(example_merge_channels)
56 changes: 56 additions & 0 deletions examples/merge_channels.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#include <algorithm>
#include <chrono>
#include <future>
#include <iostream>
#include <sstream>
#include <thread>

#include "msd/channel.hpp"

int main()
{
msd::channel<int> input_chan{30};
msd::channel<int> output_chan{10};

// Send to channel
const auto writer = [&input_chan](int begin, int end) {
for (int i = begin; i <= end; ++i) {
input_chan.write(i);

std::stringstream msg;
msg << "Sent " << i << " from " << std::this_thread::get_id() << "\n";
std::cout << msg.str();

std::this_thread::sleep_for(std::chrono::milliseconds(10)); // simulate work
}
input_chan.close();
};

const auto reader = [&output_chan]() {
for (const auto out : output_chan) { // blocking until channel is drained (closed and empty)
std::stringstream msg;
msg << "Received " << out << " on " << std::this_thread::get_id() << "\n";
std::cout << msg.str();

std::this_thread::sleep_for(std::chrono::milliseconds(200)); // simulate work
}
};

const auto transformer = [&input_chan, &output_chan]() {
std::transform(input_chan.begin(), input_chan.end(), msd::back_inserter(output_chan),
[](int value) { return value * 2; });
output_chan.close();
};

const auto reader_1 = std::async(std::launch::async, reader);
const auto reader_2 = std::async(std::launch::async, reader);
const auto writer_1 = std::async(std::launch::async, writer, 1, 30);
const auto writer_2 = std::async(std::launch::async, writer, 31, 40);
const auto transformer_task = std::async(std::launch::async, transformer);

reader_1.wait();
reader_2.wait();
writer_1.wait();
writer_2.wait();
transformer_task.wait();
}
1 change: 1 addition & 0 deletions examples/multithreading_static_channel.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <chrono>
#include <future>
#include <iostream>
#include <sstream>
Expand Down
133 changes: 119 additions & 14 deletions include/msd/blocking_iterator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace msd {
*
* Used to implement channel range-based for loop.
*
* @tparam Channel Instance of channel.
* @tparam Channel Type of channel being iterated.
*/
template <typename Channel>
class blocking_iterator {
Expand Down Expand Up @@ -47,41 +47,146 @@ class blocking_iterator {
* @brief Constructs a blocking iterator from a channel reference.
*
* @param chan Reference to the channel this iterator will iterate over.
* @param is_end If true, the iterator is in an end state (no elements to read).
*/
explicit blocking_iterator(Channel& chan) : chan_{chan} {}
explicit blocking_iterator(Channel& chan, bool is_end = false) : chan_{&chan}, is_end_{is_end}
{
if (!is_end_ && !chan_->read(value_)) {
is_end_ = true;
}
}

/**
* @brief Advances the iterator to the next element.
* @brief Retrieves the next element from the channel.
*
* @return The iterator itself.
*/
blocking_iterator<Channel> operator++() const noexcept { return *this; }
blocking_iterator<Channel> operator++() noexcept
{
if (!chan_->read(value_)) {
is_end_ = true;
}
return *this;
}

/**
* @brief Retrieves and returns the next element from the channel.
* @brief Returns the latest element retrieved from the channel.
*
* @return A const reference to the current element.
* @return A const reference to the element.
*/
reference operator*()
{
chan_.read(value_);

return value_;
}
reference operator*() { return value_; }

/**
* @brief Makes iteration continue until the channel is closed and empty.
*
* @param other Another blocking_iterator to compare with.
*
* @return true if the channel is not closed or not empty (continue iterating).
* @return false if the channel is closed and empty (stop iterating).
*/
bool operator!=(blocking_iterator<Channel>) const { return !chan_.drained(); }
bool operator!=(const blocking_iterator& other) { return is_end_ != other.is_end_; }

private:
Channel& chan_;
Channel* chan_;
value_type value_{};
bool is_end_{false};
};

/**
* @brief An output iterator pushes elements into a channel. Blocking until the channel is not full.
*
* Used to integrate with standard algorithms that require an output iterator.
*
* @tparam Channel Type of channel being iterated.
*/
template <typename Channel>
class blocking_writer_iterator {
public:
/**
* @brief The type of the elements stored in the channel.
*/
using value_type = typename Channel::value_type;

/**
* @brief Constant reference to the type of the elements stored in the channel.
*/
using reference = const value_type&;

/**
* @brief Supporting writing of elements.
*/
using iterator_category = std::output_iterator_tag;

/**
* @brief Signed integral type for iterator difference.
*/
using difference_type = std::ptrdiff_t;

/**
* @brief Pointer type to the value_type.
*/
using pointer = const value_type*;

/**
* @brief Constructs a blocking iterator from a channel reference.
*
* @param chan Reference to the channel this iterator will write into.
*/
explicit blocking_writer_iterator(Channel& chan) : chan_{&chan} {}

/**
* @brief Writes an element into the channel, blocking until space is available.
*
* @param val The value to be written into the channel.
*
* @return The iterator itself.
*/
blocking_writer_iterator& operator=(const value_type& val)
{
chan_->write(val);
return *this;
}

/**
* @brief Not applicable (handled by operator=).
*
* @return The iterator itself.
*/
blocking_writer_iterator& operator*() { return *this; }

/**
* @brief Not applicable (handled by operator=).
*
* @return The iterator itself.
*/
blocking_writer_iterator& operator++() { return *this; }

/**
* @brief Not applicable (handled by operator=).
*
* @return The iterator itself.
*/
blocking_writer_iterator operator++(int) { return *this; }

private:
Channel* chan_;
};

/**
* @brief Creates a blocking iterator for the given channel.
*
* @tparam Channel Type of channel being iterated.
*
* @param chan Reference to the channel this iterator will iterate over.
*
* @return A blocking iterator for the specified channel.
*/
template <typename Channel>
blocking_writer_iterator<Channel> back_inserter(Channel& chan)
{
return blocking_writer_iterator<Channel>{chan};
}

} // namespace msd

#endif // MSD_CHANNEL_BLOCKING_ITERATOR_HPP_
2 changes: 1 addition & 1 deletion include/msd/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ class channel {
*
* @return A blocking iterator representing the end condition.
*/
iterator end() noexcept { return blocking_iterator<channel<T>>{*this}; }
iterator end() noexcept { return blocking_iterator<channel<T>>{*this, true}; }

/**
* Channel cannot be copied or moved.
Expand Down
2 changes: 1 addition & 1 deletion include/msd/static_channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class static_channel {
*
* @return A blocking iterator representing the end condition.
*/
iterator end() noexcept { return blocking_iterator<static_channel<T, Capacity>>{*this}; }
iterator end() noexcept { return blocking_iterator<static_channel<T, Capacity>>{*this, true}; }

/**
* Channel cannot be copied or moved.
Expand Down
Loading