Skip to content

Add batch write #85

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .clang-tidy
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ Checks: >
-altera-unroll-loops,
-llvmlibc-inline-function-decl,
-cert-dcl21-cpp,
-fuchsia-default-arguments-declarations
-fuchsia-default-arguments-declarations,
-cppcoreguidelines-pro-type-union-access

WarningsAsErrors: "*"

Expand All @@ -28,8 +29,7 @@ CheckOptions:
- { key: readability-identifier-naming.ClassMemberSuffix, value: _ }
- { key: readability-identifier-naming.PrivateMemberSuffix, value: _ }
- { key: readability-identifier-naming.ProtectedMemberSuffix, value: _ }
- { key: readability-identifier-naming.EnumConstantCase, value: CamelCase }
- { key: readability-identifier-naming.EnumConstantPrefix, value: k }
- { key: readability-identifier-naming.EnumConstantCase, value: lower_case }
- { key: readability-identifier-naming.ConstexprVariableCase, value: lower_case }
- { key: readability-identifier-naming.GlobalConstantCase, value: CamelCase }
- { key: readability-identifier-naming.GlobalConstantPrefix, value: k }
Expand Down
85 changes: 72 additions & 13 deletions include/msd/channel.hpp

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In include/msd/channel.hpp, the wait_before_write method doesn't check if is_closed_ is true before waiting, which could lead to a thread waiting indefinitely when the channel is closed but full. Consider adding a check for is_closed_ in the wait condition.

Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "blocking_iterator.hpp"
#include "nodiscard.hpp"
#include "result.hpp"
#include "storage.hpp"

#include <condition_variable>
Expand All @@ -17,19 +18,6 @@

namespace msd {

/**
* @brief Exception thrown if trying to write on closed channel.
*/
class closed_channel : public std::runtime_error {
public:
/**
* @brief Constructs the exception with an error message.
*
* @param msg A descriptive message explaining the cause of the error.
*/
explicit closed_channel(const char* msg) : std::runtime_error{msg} {}
};

/**
* @brief Default storage for msd::channel.
*
Expand Down Expand Up @@ -74,6 +62,35 @@
template <typename Storage>
struct is_static_storage<Storage, decltype((void)Storage::capacity, void())> : std::true_type {};

/**
* @brief Exception thrown if trying to write on closed channel.
*/
class closed_channel : public std::runtime_error {
public:
/**
* @brief Constructs the exception with an error message.
*
* @param msg A descriptive message explaining the cause of the error.
*/
explicit closed_channel(const char* msg) : std::runtime_error{msg} {}
};

/**
* @brief Possible errors during a batch write operation.
*/
enum class batch_write_error {
/**
* @brief The specified range exceeds the available capacity.
*/
range_exceeds_capacity,

/**
* @brief The receiving channel is closed and cannot accept data.
*/
channel_is_closed,
};


/**
* @brief Thread-safe container for sharing data between threads.
*
Expand Down Expand Up @@ -180,6 +197,48 @@
return true;
}

/**
* @brief Writes a range of elements into the channel in batch mode.
*
* This function attempts to write all elements from the input range [begin, end)
* into the channel. If the channel has a capacity and the range exceeds that capacity,
* the function returns an error. If the channel is closed, it also returns an error.
*
* @tparam InputIterator An input iterator type pointing to elements of type `T`.
* @param begin Iterator pointing to the beginning of the range to write.
* @param end Iterator pointing to the end of the range to write (exclusive).
* @return A result indicating success or containing a `batch_write_error`:
* - `batch_write_error::range_exceeds_capacity` if the range is too large to fit.
* - `batch_write_error::channel_is_closed` if the channel is already closed.
* - Empty (success) if all elements were successfully written.
*
* @note It takes the lock on the channel once for all elements and release it at the end.
*/
template <typename InputIterator>
result<void, batch_write_error> batch_write(const InputIterator begin, const InputIterator end)
{
if (capacity_ > 0 && (static_cast<std::size_t>(std::distance(begin, end)) + storage_.size()) > capacity_) {
return result<void, batch_write_error>{batch_write_error::range_exceeds_capacity};
}

{
std::unique_lock<std::mutex> lock{mtx_};
wait_before_write(lock);

if (is_closed_) {
return result<void, batch_write_error>{batch_write_error::channel_is_closed};

Check warning on line 229 in include/msd/channel.hpp

View check run for this annotation

Codecov / codecov/patch

include/msd/channel.hpp#L229

Added line #L229 was not covered by tests
}

for (InputIterator it = begin; it != end; ++it) {
storage_.push_back(*it);
}
}

cnd_.notify_one();

return result<void, batch_write_error>{};
}

/**
* @brief Pops an element from the channel.
*
Expand Down
111 changes: 111 additions & 0 deletions include/msd/result.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright (C) 2020-2025 Andrei Avram

#ifndef MSD_CHANNEL_RESULT_HPP_
#define MSD_CHANNEL_RESULT_HPP_

/** @file */

namespace msd {

/**
* @brief A result that contains either a value of type T or an error of type E.
*
* @tparam T The type of the value on success.
* @tparam E The type of the error on failure.
*/
template <typename T, typename E>
class result {
public:
/**
* @brief Constructs an empty result (not valid).
*/
explicit result() = default;

/**
* @brief Constructs a successful result with a value.
*
* @param value The value to store into the result.
*/
explicit result(T value) : value_{value} {}

/**
* @brief Constructs an error result.
*
* @param error The error value to store.
*/
explicit result(E error) : has_error_{true}, error_{error} {}

/**
* @brief Checks whether the result is a success.
*
* @return true if the result holds a value, false if it holds an error.
*/
explicit operator bool() const { return !has_error_; }

/**
* @brief Gets the stored value.
*
* @return const T& Reference to the stored value.
* @warning Behavior is undefined if the result holds an error.
*/
const T& value() const { return value_; }

/**
* @brief Gets the stored error.
*
* @return const E& Reference to the stored error.
* @warning Behavior is undefined if the result holds a value.
*/
const E& error() const { return error_; }
Comment on lines +45 to +59

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In include/msd/result.hpp, the value() and error() methods don't check the state of the result before returning values. This could lead to undefined behavior when accessing the wrong union member. Consider adding assertions or other safety mechanisms to prevent misuse.


private:
union {
T value_;
E error_;
};
bool has_error_{};
Comment on lines +61 to +66

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In include/msd/result.hpp, using a union with non-trivial types without proper lifetime management could lead to undefined behavior. Consider implementing proper construction/destruction handling for the union members or using std::variant instead.

Comment on lines +62 to +66

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In include/msd/result.hpp, when E is a non-trivial type, the union could lead to undefined behavior without proper destruction. Consider adding a destructor that checks has_error_ and properly calls the destructor of the active member.

};

/**
* @brief Specialization of result for void success type.
*
* @tparam E The type of the error on failure.
*/
template <typename E>
class result<void, E> {
public:
/**
* @brief Constructs a successful void result.
*/
result() = default;

/**
* @brief Constructs an error result.
*
* @param error The error value to store.
*/
explicit result(E error) : has_value_{false}, error_{error} {}

/**
* @brief Checks whether the result is a success.
*
* @return true if the result is successful, false otherwise.
*/
explicit operator bool() const { return has_value_; }

/**
* @brief Gets the stored error.
*
* @return Const reference to the stored error.
* @note Only valid if the result holds an error.
*/
const E& error() const { return error_; }

private:
bool has_value_{true};
E error_;
};

} // namespace msd

#endif // MSD_CHANNEL_RESULT_HPP_
57 changes: 57 additions & 0 deletions tests/channel_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <atomic>
#include <cstdint>
#include <future>
#include <iterator>
#include <numeric>
#include <string>
#include <thread>
Expand Down Expand Up @@ -155,6 +156,62 @@ TEST(ChannelTest, PushByMoveAndFetch)
EXPECT_EQ("def", out);
}

TEST(ChannelTest, BatchWriteOnUnbufferedChannel)
{
msd::channel<int> channel{};

msd::result<void, msd::batch_write_error> result;
std::vector<int> input(100);
std::iota(input.begin(), input.end(), 0);

result = channel.batch_write(input.cbegin(), input.cend());
EXPECT_TRUE(result);
EXPECT_EQ(channel.size(), 100);
EXPECT_EQ(input.size(), 100);

std::vector<int> output(100);
auto iter = output.begin();
while (!channel.empty()) {
channel >> *iter;
++iter;
}
for (std::size_t i = 0; i < output.size(); ++i) {
EXPECT_EQ(output[i], i);
}
}

TEST(ChannelTest, BatchWriteOnBufferedChannel)
{
msd::channel<int> channel{10};

msd::result<void, msd::batch_write_error> result;
std::vector<int> input(100);
std::iota(input.begin(), input.end(), 0);

// Too many
result = channel.batch_write(input.cbegin(), input.cend());
EXPECT_FALSE(result);
EXPECT_EQ(result.error(), msd::batch_write_error::range_exceeds_capacity);
EXPECT_EQ(channel.size(), 0);
EXPECT_EQ(input.size(), 100);

// Ok
result = channel.batch_write(std::next(input.begin(), 2), std::next(input.begin(), 7));
EXPECT_TRUE(result);
EXPECT_EQ(channel.size(), 5);
EXPECT_EQ(input.size(), 100);

std::vector<int> output(5);
auto iter = output.begin();
while (!channel.empty()) {
channel >> *iter;
++iter;
}
for (std::size_t i = 0; i < output.size(); ++i) {
EXPECT_EQ(output[i], i + 2);
}
}

TEST(ChannelTest, size)
{
msd::channel<int> channel;
Expand Down
Loading