diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index feb1264..c45bebb 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -23,5 +23,10 @@ "source=cache,target=/home/ubuntu/.cache,type=volume" ], "workspaceFolder": "/workspace", + "runArgs": [ + "--cap-add=SYS_PTRACE", + "--security-opt", + "seccomp=unconfined" + ], "remoteUser": "ubuntu" } diff --git a/examples/.clang-tidy b/examples/.clang-tidy index a96e77e..aee73ff 100644 --- a/examples/.clang-tidy +++ b/examples/.clang-tidy @@ -2,4 +2,5 @@ InheritParentConfig: true Checks: > -cppcoreguidelines-avoid-magic-numbers, - -readability-magic-numbers + -readability-magic-numbers, + -fuchsia-default-arguments-calls diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 9d9fbf9..e664ef3 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -26,23 +26,12 @@ add_custom_target(examples) add_custom_target(run_examples) # Examples -add_example(example_basic basic.cpp) -run_example(example_basic) - -add_example(example_close close.cpp) -run_example(example_close) - add_example(example_move move.cpp) run_example(example_move) -add_example(example_multithreading multithreading.cpp) - add_example(example_streaming streaming.cpp) run_example(example_streaming) -add_example(example_multithreading_static_channel multithreading_static_channel.cpp) -run_example(example_multithreading_static_channel) - add_example(example_concurrent_map_filter concurrent_map_filter.cpp) run_example(example_concurrent_map_filter) diff --git a/examples/basic.cpp b/examples/basic.cpp deleted file mode 100644 index 873c664..0000000 --- a/examples/basic.cpp +++ /dev/null @@ -1,21 +0,0 @@ -#include - -#include - -int main() -{ - constexpr std::size_t capacity = 10; - msd::channel channel{capacity}; - - channel << 1; - - channel << 2 << 3; - - for (auto out : channel) { - std::cout << out << '\n'; - - if (channel.empty()) { - break; - } - } -} diff --git a/examples/close.cpp b/examples/close.cpp deleted file mode 100644 index 9d5d69e..0000000 --- a/examples/close.cpp +++ /dev/null @@ -1,58 +0,0 @@ -#include - -#include -#include -#include -#include -#include - -int main() -{ - msd::channel channel{}; - - // Write data on the channel until it's closed - const auto produce = [](msd::channel& chan, int time_ms) { - static int inc = 0; - - while (!chan.closed()) { - chan << ++inc; - - std::this_thread::sleep_for(std::chrono::milliseconds{time_ms}); - } - }; - const auto producer = std::async(produce, std::ref(channel), 10); - - // Close the channel after some time - const auto close = [](msd::channel& chan, int time_ms) { - std::this_thread::sleep_for(std::chrono::milliseconds{time_ms}); - - chan.close(); - }; - const auto closer = std::async(close, std::ref(channel), 100); - - // Display all the data from the channel - // When the channel is closed and empty, the iteration will end - std::mutex mutex; - - const auto consume = [&mutex](msd::channel& chan, int time_ms) { - for (auto value : chan) { - std::stringstream stream; - stream << "value " << value << " from consumer " << std::this_thread::get_id() << '\n'; - - { - std::lock_guard lock(mutex); - - std::cout << stream.str(); - } - - std::this_thread::sleep_for(std::chrono::milliseconds{time_ms}); - } - }; - const auto consumer_1 = std::async(consume, std::ref(channel), 50); - const auto consumer_2 = std::async(consume, std::ref(channel), 10); - - producer.wait(); - closer.wait(); - consumer_1.wait(); - consumer_2.wait(); -} diff --git a/examples/move.cpp b/examples/move.cpp index 90ed282..62cf1ab 100644 --- a/examples/move.cpp +++ b/examples/move.cpp @@ -1,58 +1,126 @@ -#include +#include +#include #include +#include +#include class data final { - int value_{}; - public: + static std::size_t copies_; + static std::size_t moves_; + data() = default; explicit data(int value) : value_{value} {} int get_value() const { return value_; } - data(const data& other) noexcept : value_{other.value_} { std::cout << "copy " << value_ << '\n'; } + data(const data& other) noexcept : value_{other.value_} + { + std::cout << "copy " << value_ << '\n'; + ++copies_; + } + data& operator=(const data& other) { if (this != &other) { value_ = other.value_; + std::cout << "copy " << value_ << '\n'; + ++copies_; } - std::cout << "copy " << value_ << '\n'; return *this; } - data(data&& other) noexcept : value_{other.value_} { std::cout << "move " << value_ << '\n'; } + data(data&& other) noexcept : value_{other.value_} + { + std::cout << "move " << value_ << '\n'; + ++moves_; + } + data& operator=(data&& other) noexcept { if (this != &other) { value_ = other.value_; std::cout << "move " << value_ << '\n'; + ++moves_; } return *this; } ~data() = default; + + private: + int value_{}; }; +std::size_t data::copies_{}; +std::size_t data::moves_{}; + +// Copy and move semantics with a user-defined type. + int main() { - msd::channel chan{10}; + msd::static_channel chan{}; - auto in1 = data{1}; + // l-value: will be copied + const auto in1 = data{1}; chan << in1; + // r-value: will be moved chan << data{2}; + // l-value -> std::move -> r-value: will be moved auto in3 = data{3}; chan << std::move(in3); - for (const auto& out : chan) { + std::vector actual; + + // Each value will be moved when read + for (const data& out : chan) { std::cout << out.get_value() << '\n'; + actual.push_back(out.get_value()); + if (chan.empty()) { break; } } + + // Read values + const std::vector expected{1, 2, 3}; + + if (actual != expected) { + std::cerr << "Error: got: "; + for (const int value : actual) { + std::cerr << value << ", "; + } + + std::cerr << ", expected: "; + for (const int value : expected) { + std::cerr << value << ", "; + } + std::cerr << '\n'; + + std::terminate(); + } + + // 1 copy when in1 was written + constexpr std::size_t expected_copies = 1; + + if (data::copies_ != expected_copies) { + std::cerr << "Error: copies: " << data::copies_ << ", expected: " << expected_copies << '\n'; + std::terminate(); + } + + // 1 move when the second value was written + // 1 move when in3 was written + // 3 moves when the 3 values were read + constexpr std::size_t expected_moves = 5; + + if (data::moves_ != expected_moves) { + std::cerr << "Error: moves: " << data::moves_ << ", expected: " << expected_moves << '\n'; + std::terminate(); + } } diff --git a/examples/multithreading.cpp b/examples/multithreading.cpp deleted file mode 100644 index 40324e3..0000000 --- a/examples/multithreading.cpp +++ /dev/null @@ -1,46 +0,0 @@ -#include - -#include -#include -#include -#include -#include - -int main() -{ - const auto threads = std::thread::hardware_concurrency(); - - msd::channel channel{threads}; - - // Read - const auto out = [](msd::channel& chan, const std::size_t value) { - for (auto number : chan) { - std::stringstream stream; - stream << number << " from thread: " << value << '\n'; - std::cout << stream.str(); - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - } - }; - - std::vector reads; - for (std::size_t i = 0U; i < threads; ++i) { - reads.emplace_back(out, std::ref(channel), i); - } - - // Write - const auto input = [](msd::channel& chan) { - while (true) { - static std::int64_t value = 0; - chan << ++value; - } - }; - - auto write = std::thread{input, std::ref(channel)}; - - // Join all threads - for (std::size_t i = 0U; i < threads; ++i) { - reads.at(i).join(); - } - - write.join(); -} diff --git a/examples/multithreading_static_channel.cpp b/examples/multithreading_static_channel.cpp deleted file mode 100644 index 0834a1b..0000000 --- a/examples/multithreading_static_channel.cpp +++ /dev/null @@ -1,60 +0,0 @@ -#include - -#include -#include -#include -#include -#include -#include - -int main() -{ - msd::static_channel chan{}; // always buffered - - std::mutex cout_mutex; - - // Send to channel - const auto writer = [&chan, &cout_mutex](int begin, int end) { - for (int i = begin; i <= end; ++i) { - chan.write(i); - - std::stringstream msg; - msg << "Sent " << i << " from " << std::this_thread::get_id() << '\n'; - - { - std::lock_guard lock(cout_mutex); - std::cout << msg.str(); - } - - std::this_thread::sleep_for(std::chrono::milliseconds(10)); // simulate work - } - chan.close(); - }; - - // Read - const auto reader = [&chan, &cout_mutex]() { - for (const auto out : chan) { // blocking until channel is drained (closed and empty) - std::stringstream msg; - msg << "Received " << out << " on " << std::this_thread::get_id() << '\n'; - - { - std::lock_guard lock(cout_mutex); - std::cout << msg.str(); - } - - std::this_thread::sleep_for(std::chrono::milliseconds(200)); // simulate work - } - }; - - const auto reader_1 = std::async(std::launch::async, reader); - const auto reader_2 = std::async(std::launch::async, reader); - const auto reader_3 = std::async(std::launch::async, reader); - const auto writer_1 = std::async(std::launch::async, writer, 1, 50); - const auto writer_2 = std::async(std::launch::async, writer, 51, 100); - - reader_1.wait(); - reader_2.wait(); - reader_3.wait(); - writer_1.wait(); - writer_2.wait(); -} diff --git a/examples/semaphore.cpp b/examples/semaphore.cpp index a1ad821..29f6bac 100644 --- a/examples/semaphore.cpp +++ b/examples/semaphore.cpp @@ -29,6 +29,8 @@ int simulate_heavy_computation(const int value) return result; }; +// https://en.wikipedia.org/wiki/Semaphore_(programming) + int main() { semaphore sem{2}; diff --git a/examples/streaming.cpp b/examples/streaming.cpp index 7e12dfc..7953cc0 100644 --- a/examples/streaming.cpp +++ b/examples/streaming.cpp @@ -7,6 +7,8 @@ #include #include +// Spawns multiple producers that send strings into a channel, which are streamed to std::cout from a consumer. + int main() { using messages = msd::channel; @@ -15,7 +17,7 @@ int main() messages channel{threads}; // Continuously get some data on multiple threads and send it all to a channel - const auto input = [](messages& chan, std::size_t thread, std::chrono::milliseconds pause) { + const auto produce = [](const std::size_t thread, const std::chrono::milliseconds pause, messages& chan) { thread_local static std::size_t inc = 0U; while (!chan.closed()) { @@ -26,24 +28,24 @@ int main() } }; - std::vector> in_futures; + std::vector> producers; for (std::size_t i = 0U; i < threads; ++i) { - in_futures.push_back(std::async(input, std::ref(channel), i, std::chrono::milliseconds{500})); + producers.push_back(std::async(produce, i, std::chrono::milliseconds{500}, std::ref(channel))); } // Close the channel after some time - const auto timeout = [](messages& chan, std::chrono::milliseconds after) { + const auto close = [](const std::chrono::milliseconds after, messages& chan) { std::this_thread::sleep_for(after); chan.close(); }; - const auto timeout_future = std::async(timeout, std::ref(channel), std::chrono::milliseconds{3000U}); + const auto closer = std::async(close, std::chrono::milliseconds{3000U}, std::ref(channel)); - // Stream incoming data to a destination + // Stream incoming messages std::move(channel.begin(), channel.end(), std::ostream_iterator(std::cout, "\n")); - // Wait for other threads - for (auto& future : in_futures) { - future.wait(); + // Wait all tasks + for (auto& producer : producers) { + producer.wait(); } - timeout_future.wait(); + closer.wait(); }