Skip to content

Commit 8d0616c

Browse files
Output iterator (#70)
* Add output iterator * Explicitly control end of iterator * Set release version
1 parent a2c25d2 commit 8d0616c

12 files changed

+293
-43
lines changed

.clang-tidy

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ Checks: >
1010
-readability-named-parameter,
1111
-altera-unroll-loops,
1212
-llvmlibc-inline-function-decl,
13-
-cppcoreguidelines-avoid-const-or-ref-data-members
13+
-cert-dcl21-cpp,
14+
-fuchsia-default-arguments-declarations
1415
1516
WarningsAsErrors: "*"
1617

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
cmake_minimum_required(VERSION 3.12)
22
project(cpp_channel)
3-
set(PROJECT_VERSION 1.1.0)
3+
set(PROJECT_VERSION 1.2.0)
44

55
set(CMAKE_CXX_STANDARD 11 CACHE STRING "C++ standard")
66
set(CMAKE_CXX_STANDARD_REQUIRED YES)

README.md

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,19 @@
11
# Channel
22

3-
[![build](https://github.com/andreiavrammsd/cpp-channel/workflows/build/badge.svg)](https://github.com/andreiavrammsd/cpp-channel/actions) [![codecov](https://codecov.io/github/andreiavrammsd/cpp-channel/graph/badge.svg?token=CKQ0TVW62Z)](https://codecov.io/github/andreiavrammsd/cpp-channel)
3+
[![build](https://github.com/andreiavrammsd/cpp-channel/actions/workflows/cmake.yml/badge.svg)](https://github.com/andreiavrammsd/cpp-channel/actions) [![codecov](https://codecov.io/github/andreiavrammsd/cpp-channel/graph/badge.svg?token=CKQ0TVW62Z)](https://codecov.io/github/andreiavrammsd/cpp-channel)
44
[![documentation](https://github.com/andreiavrammsd/cpp-channel/workflows/doc/badge.svg)](https://andreiavrammsd.github.io/cpp-channel/)
55

6-
### Thread-safe container for sharing data between threads. Header-only.
6+
### Thread-safe container for sharing data between threads (synchronized queue). Header-only. Compatible with C++11.
77

88
* Thread-safe push and fetch.
99
* Use stream operators to push (<<) and fetch (>>) items.
1010
* Value type must be default constructible.
1111
* Blocking (forever waiting to fetch).
1212
* Range-based for loop supported.
1313
* Close to prevent pushing and stop waiting to fetch.
14-
* Integrates well with STL algorithms in some cases. Eg: std::move(ch.begin(), ch.end(), ...).
14+
* Integrates well with STL algorithms in some cases. Eg:
15+
* `std::move(ch.begin(), ch.end(), ...)`
16+
* `std::transform(input_chan.begin(), input_chan.end(), msd::back_inserter(output_chan))`.
1517
* Tested with GCC, Clang, and MSVC.
1618
* Includes stack-based, exception-free alternative (static channel).
1719

@@ -25,9 +27,9 @@ Choose one of the methods:
2527

2628
* Copy the [include](https://github.com/andreiavrammsd/cpp-channel/tree/master/include) directory into your project and add it to your include path.
2729
* [CMake FetchContent](https://github.com/andreiavrammsd/cpp-channel/tree/master/examples/cmake-project)
28-
* [CMake install](https://cmake.org/cmake/help/latest/command/install.html)
30+
* [CMake install](https://cmake.org/cmake/help/latest/command/install.html) - Choose a [version](https://github.com/andreiavrammsd/cpp-channel/releases), then run:
2931
```shell
30-
VERSION=1.1.0 \
32+
VERSION=X.Y.Z \
3133
&& wget https://github.com/andreiavrammsd/cpp-channel/archive/refs/tags/v$VERSION.zip \
3234
&& unzip v$VERSION.zip \
3335
&& cd cpp-channel-$VERSION \

examples/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,6 @@ run_example(example_streaming)
3939

4040
add_example(example_multithreading_static_channel multithreading_static_channel.cpp)
4141
run_example(example_multithreading_static_channel)
42+
43+
add_example(example_merge_channels merge_channels.cpp)
44+
run_example(example_merge_channels)

examples/merge_channels.cpp

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
#include <algorithm>
2+
#include <chrono>
3+
#include <future>
4+
#include <iostream>
5+
#include <sstream>
6+
#include <thread>
7+
8+
#include "msd/channel.hpp"
9+
10+
int main()
11+
{
12+
msd::channel<int> input_chan{30};
13+
msd::channel<int> output_chan{10};
14+
15+
// Send to channel
16+
const auto writer = [&input_chan](int begin, int end) {
17+
for (int i = begin; i <= end; ++i) {
18+
input_chan.write(i);
19+
20+
std::stringstream msg;
21+
msg << "Sent " << i << " from " << std::this_thread::get_id() << "\n";
22+
std::cout << msg.str();
23+
24+
std::this_thread::sleep_for(std::chrono::milliseconds(10)); // simulate work
25+
}
26+
input_chan.close();
27+
};
28+
29+
const auto reader = [&output_chan]() {
30+
for (const auto out : output_chan) { // blocking until channel is drained (closed and empty)
31+
std::stringstream msg;
32+
msg << "Received " << out << " on " << std::this_thread::get_id() << "\n";
33+
std::cout << msg.str();
34+
35+
std::this_thread::sleep_for(std::chrono::milliseconds(200)); // simulate work
36+
}
37+
};
38+
39+
const auto transformer = [&input_chan, &output_chan]() {
40+
std::transform(input_chan.begin(), input_chan.end(), msd::back_inserter(output_chan),
41+
[](int value) { return value * 2; });
42+
output_chan.close();
43+
};
44+
45+
const auto reader_1 = std::async(std::launch::async, reader);
46+
const auto reader_2 = std::async(std::launch::async, reader);
47+
const auto writer_1 = std::async(std::launch::async, writer, 1, 30);
48+
const auto writer_2 = std::async(std::launch::async, writer, 31, 40);
49+
const auto transformer_task = std::async(std::launch::async, transformer);
50+
51+
reader_1.wait();
52+
reader_2.wait();
53+
writer_1.wait();
54+
writer_2.wait();
55+
transformer_task.wait();
56+
}

examples/multithreading_static_channel.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#include <chrono>
12
#include <future>
23
#include <iostream>
34
#include <sstream>

include/msd/blocking_iterator.hpp

Lines changed: 119 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ namespace msd {
1313
*
1414
* Used to implement channel range-based for loop.
1515
*
16-
* @tparam Channel Instance of channel.
16+
* @tparam Channel Type of channel being iterated.
1717
*/
1818
template <typename Channel>
1919
class blocking_iterator {
@@ -47,41 +47,146 @@ class blocking_iterator {
4747
* @brief Constructs a blocking iterator from a channel reference.
4848
*
4949
* @param chan Reference to the channel this iterator will iterate over.
50+
* @param is_end If true, the iterator is in an end state (no elements to read).
5051
*/
51-
explicit blocking_iterator(Channel& chan) : chan_{chan} {}
52+
explicit blocking_iterator(Channel& chan, bool is_end = false) : chan_{&chan}, is_end_{is_end}
53+
{
54+
if (!is_end_ && !chan_->read(value_)) {
55+
is_end_ = true;
56+
}
57+
}
5258

5359
/**
54-
* @brief Advances the iterator to the next element.
60+
* @brief Retrieves the next element from the channel.
5561
*
5662
* @return The iterator itself.
5763
*/
58-
blocking_iterator<Channel> operator++() const noexcept { return *this; }
64+
blocking_iterator<Channel> operator++() noexcept
65+
{
66+
if (!chan_->read(value_)) {
67+
is_end_ = true;
68+
}
69+
return *this;
70+
}
5971

6072
/**
61-
* @brief Retrieves and returns the next element from the channel.
73+
* @brief Returns the latest element retrieved from the channel.
6274
*
63-
* @return A const reference to the current element.
75+
* @return A const reference to the element.
6476
*/
65-
reference operator*()
66-
{
67-
chan_.read(value_);
68-
69-
return value_;
70-
}
77+
reference operator*() { return value_; }
7178

7279
/**
7380
* @brief Makes iteration continue until the channel is closed and empty.
7481
*
82+
* @param other Another blocking_iterator to compare with.
83+
*
7584
* @return true if the channel is not closed or not empty (continue iterating).
7685
* @return false if the channel is closed and empty (stop iterating).
7786
*/
78-
bool operator!=(blocking_iterator<Channel>) const { return !chan_.drained(); }
87+
bool operator!=(const blocking_iterator& other) { return is_end_ != other.is_end_; }
7988

8089
private:
81-
Channel& chan_;
90+
Channel* chan_;
8291
value_type value_{};
92+
bool is_end_{false};
93+
};
94+
95+
/**
96+
* @brief An output iterator pushes elements into a channel. Blocking until the channel is not full.
97+
*
98+
* Used to integrate with standard algorithms that require an output iterator.
99+
*
100+
* @tparam Channel Type of channel being iterated.
101+
*/
102+
template <typename Channel>
103+
class blocking_writer_iterator {
104+
public:
105+
/**
106+
* @brief The type of the elements stored in the channel.
107+
*/
108+
using value_type = typename Channel::value_type;
109+
110+
/**
111+
* @brief Constant reference to the type of the elements stored in the channel.
112+
*/
113+
using reference = const value_type&;
114+
115+
/**
116+
* @brief Supporting writing of elements.
117+
*/
118+
using iterator_category = std::output_iterator_tag;
119+
120+
/**
121+
* @brief Signed integral type for iterator difference.
122+
*/
123+
using difference_type = std::ptrdiff_t;
124+
125+
/**
126+
* @brief Pointer type to the value_type.
127+
*/
128+
using pointer = const value_type*;
129+
130+
/**
131+
* @brief Constructs a blocking iterator from a channel reference.
132+
*
133+
* @param chan Reference to the channel this iterator will write into.
134+
*/
135+
explicit blocking_writer_iterator(Channel& chan) : chan_{&chan} {}
136+
137+
/**
138+
* @brief Writes an element into the channel, blocking until space is available.
139+
*
140+
* @param val The value to be written into the channel.
141+
*
142+
* @return The iterator itself.
143+
*/
144+
blocking_writer_iterator& operator=(const value_type& val)
145+
{
146+
chan_->write(val);
147+
return *this;
148+
}
149+
150+
/**
151+
* @brief Not applicable (handled by operator=).
152+
*
153+
* @return The iterator itself.
154+
*/
155+
blocking_writer_iterator& operator*() { return *this; }
156+
157+
/**
158+
* @brief Not applicable (handled by operator=).
159+
*
160+
* @return The iterator itself.
161+
*/
162+
blocking_writer_iterator& operator++() { return *this; }
163+
164+
/**
165+
* @brief Not applicable (handled by operator=).
166+
*
167+
* @return The iterator itself.
168+
*/
169+
blocking_writer_iterator operator++(int) { return *this; }
170+
171+
private:
172+
Channel* chan_;
83173
};
84174

175+
/**
176+
* @brief Creates a blocking iterator for the given channel.
177+
*
178+
* @tparam Channel Type of channel being iterated.
179+
*
180+
* @param chan Reference to the channel this iterator will iterate over.
181+
*
182+
* @return A blocking iterator for the specified channel.
183+
*/
184+
template <typename Channel>
185+
blocking_writer_iterator<Channel> back_inserter(Channel& chan)
186+
{
187+
return blocking_writer_iterator<Channel>{chan};
188+
}
189+
85190
} // namespace msd
86191

87192
#endif // MSD_CHANNEL_BLOCKING_ITERATOR_HPP_

include/msd/channel.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ class channel {
210210
*
211211
* @return A blocking iterator representing the end condition.
212212
*/
213-
iterator end() noexcept { return blocking_iterator<channel<T>>{*this}; }
213+
iterator end() noexcept { return blocking_iterator<channel<T>>{*this, true}; }
214214

215215
/**
216216
* Channel cannot be copied or moved.

include/msd/static_channel.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ class static_channel {
177177
*
178178
* @return A blocking iterator representing the end condition.
179179
*/
180-
iterator end() noexcept { return blocking_iterator<static_channel<T, Capacity>>{*this}; }
180+
iterator end() noexcept { return blocking_iterator<static_channel<T, Capacity>>{*this, true}; }
181181

182182
/**
183183
* Channel cannot be copied or moved.

0 commit comments

Comments
 (0)