Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 217 additions & 0 deletions include/fork_union.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,23 @@
* - `for_n_dynamic` - for unevenly distributed tasks, where each task may take a different time.
* - `for_slices` - for iterating over a range of similar duration tasks, addressable by a slice.
*
* Aside from loops, "map-reduce" and "sorting" algorithms are also provided, but they go far beyond
* the capabilities of Parallel STL, to allow users to pass SIMD-capable lambdas, and implement not
* only one level of slicing for parallel operations, but also tree-like aggregations:
*
* - `transform_filter_n[_dynamic]` -
* - `transform_filter_reduce_n[_dynamic]` -
* - TODO: `argsort_n[_dynamic]` -
* - TODO: `sort_n[_dynamic]` -
* - TODO: `transform_filter_scan_n[_dynamic]`
*
* STL Ranges-like Parallel Algorithms for many iterator types are also provided.
*
* @see `for_each`: https://en.cppreference.com/w/cpp/algorithm/for_each.html
* @see `for_each_n`: https://en.cppreference.com/w/cpp/algorithm/for_each_n.html
* @see `transform`: https://en.cppreference.com/w/cpp/algorithm/transform.html
* @see `reduce`: https://en.cppreference.com/w/cpp/algorithm/reduce.html
* @see `sort`: https://en.cppreference.com/w/cpp/algorithm/sort.html
* ------------------------------------------------------------------------------------------------
*
* On Linux, when NUMA and PThreads are available, the library can also leverage @b NUMA-aware
Expand Down Expand Up @@ -1991,6 +2008,206 @@ void bubble_sort(value_type_ *array, std::size_t size, comparator_type_ comp = {
if (comp(array[j + 1], array[j])) std::swap(array[j], array[j + 1]);
}

namespace {
namespace fu = ashvardanian::fork_union;
}

/**
* @brief Find the first position where value could be inserted to maintain sorted order.
*/
template <typename iterator_type_, typename value_type_, typename comparator_type_ = std::less<value_type_>>
iterator_type_ upper_bound(iterator_type_ first, iterator_type_ last, value_type_ const &value,
comparator_type_ comp = {}) noexcept {
auto count = last - first;
while (count > 0) {
auto step = count / 2;
auto it = first + step;
if (!comp(value, *it)) {
first = ++it;
count -= step + 1;
}
else { count = step; }
}
return first;
}

/**
* @brief Rotate elements in range [first, last) such that middle becomes the new first.
*/
template <typename iterator_type_>
iterator_type_ rotate(iterator_type_ first, iterator_type_ middle, iterator_type_ last) noexcept {
if (first == middle) return last;
if (middle == last) return first;

auto result = first + (last - middle);
auto next = middle;

while (first != next) {
std::swap(*first++, *next++);
if (next == last) next = middle;
else if (first == middle)
middle = next;
}

return result;
}

/**
* @brief In-place merge two sorted ranges without additional memory allocations.
* @param[in] first Iterator to the beginning of the first sorted range.
* @param[in] middle Iterator to the beginning of the second sorted range.
* @param[in] last Iterator to the end of the second sorted range.
* @param[in] comp Comparator function for ordering elements.
*/
template <typename iterator_type_, typename comparator_type_ = std::less<>>
void inplace_merge_rotation(iterator_type_ first, iterator_type_ middle, iterator_type_ last,
comparator_type_ comp = {}) noexcept {
if (first == middle || middle == last) return;

auto left_size = middle - first;
auto right_size = last - middle;

if (left_size == 0 || right_size == 0) return;

if (left_size + right_size <= 32) {
for (auto it = middle; it != last; ++it) {
auto value = std::move(*it);
auto pos = fu::upper_bound(first, it, value, comp);
std::move_backward(pos, it, it + 1);
*pos = std::move(value);
}
return;
}

if (left_size <= right_size) {
if (left_size == 1) {
auto insert_pos = std::lower_bound(middle, last, *first, comp);
fu::rotate(first, middle, insert_pos);
}
else {
auto mid_left = first + left_size / 2;
auto mid_right = std::lower_bound(middle, last, *mid_left, comp);
auto new_middle = fu::rotate(mid_left, middle, mid_right);
if (mid_left > first) inplace_merge_rotation(first, mid_left, new_middle, comp);
if (mid_right < last) inplace_merge_rotation(new_middle, mid_right, last, comp);
}
}
else {
auto mid_right = middle + right_size / 2;
auto mid_left = fu::upper_bound(first, middle, *mid_right, comp);
auto new_middle = fu::rotate(mid_left, middle, mid_right);
inplace_merge_rotation(first, mid_left, new_middle, comp);
inplace_merge_rotation(new_middle, mid_right, last, comp);
}
}

/**
* @brief Three-way quicksort implementation for efficient handling of duplicate elements.
* @param[in] first Iterator to the beginning of the range to sort.
* @param[in] last Iterator to the end of the range to sort.
* @param[in] comp Comparator function for ordering elements.
*/
template <typename iterator_type_, typename comparator_type_ = std::less<>>
void three_way_quicksort(iterator_type_ first, iterator_type_ last, comparator_type_ comp = {}) noexcept {
if (last - first <= 1) return;
if (last - first <= 16) {
for (auto i = first + 1; i < last; ++i) {
auto value = std::move(*i);
auto j = i;
while (j > first && comp(value, *(j - 1))) {
*j = std::move(*(j - 1));
--j;
}
*j = std::move(value);
}
return;
}

auto pivot = *first;
auto lt = first;
auto gt = last;
auto i = first + 1;

while (i < gt) {
if (comp(*i, pivot)) { std::iter_swap(i++, lt++); }
else if (comp(pivot, *i)) { std::iter_swap(i, --gt); }
else { ++i; }
}

three_way_quicksort(first, lt, comp);
three_way_quicksort(gt, last, comp);
}

/**
* @brief Sorts elements in parallel using multiple threads from the given thread pool.
*
* Executes a parallel sorting algorithm that divides the input range into chunks, sorts each
* chunk independently using three-way quicksort, and then merges the sorted chunks using
* in-place rotation-based merging. This approach avoids dynamic memory allocations during
* execution and efficiently handles datasets with duplicate values.
*
* @code{.cpp}
* std::vector<int> data = {3, 1, 4, 1, 5, 9, 2, 6, 5, 3, 5};
* fu::basic_pool_t pool;
* pool.try_spawn(std::thread::hardware_concurrency());
*
* fu::sort(pool, data.begin(), data.end());
* // data is now sorted: {1, 1, 2, 3, 3, 4, 5, 5, 5, 6, 9}
* @endcode
*
* @tparam pool_type_ Thread pool type that provides `threads_count()`, `for_threads()`, and `for_n()` methods.
* @tparam iterator_type_ Random access iterator type.
* @tparam comparator_type_ Callable type for element comparison, defaults to `std::less<>`.
* @param[in] pool Reference to the thread pool executor to use for parallel sorting.
* @param[in] first Iterator to the beginning of the range to sort.
* @param[in] last Iterator to the end of the range to sort.
* @param[in] comp Comparator function for ordering elements.
*
* @note The function is `noexcept` and performs no dynamic memory allocations during execution.
* @note Requires the iterator type to be random access and the range to be valid.
* @note For best performance, ensure the range size is significantly larger than the thread count.
*
* @sa `bubble_sort` for a simple sequential sorting algorithm.
* @sa `for_n` for the underlying parallel execution mechanism.
*/
template <typename pool_type_, typename iterator_type_, typename comparator_type_ = std::less<>>
void sort(pool_type_ &pool, iterator_type_ first, iterator_type_ last, comparator_type_ comp = {}) noexcept {
assert(first <= last && "Iterator range must be valid");

std::size_t const size = static_cast<std::size_t>(last - first);
std::size_t const threads = pool.threads_count();
constexpr std::size_t const sequential_threshold = 1024;

if (size <= sequential_threshold || threads == 1) return three_way_quicksort(first, last, comp);

std::size_t const chunk_size = size / threads;
if (chunk_size == 0) return three_way_quicksort(first, last, comp);

pool.for_threads([=](std::size_t thread) noexcept {
auto start = first + thread * chunk_size;
auto finish = (thread == threads - 1) ? last : start + chunk_size;
three_way_quicksort(start, finish, comp);
});

for (std::size_t stride = 1; stride < threads; stride *= 2) {
pool.for_n(threads, [=](std::size_t thread_idx) noexcept {
if (thread_idx % (2 * stride) != 0) return;

std::size_t left_chunk = thread_idx;
std::size_t right_chunk = thread_idx + stride;
if (right_chunk >= threads) return;

auto left_start = first + left_chunk * chunk_size;
auto mid = first + right_chunk * chunk_size;

std::size_t right_end_chunk = std::min(thread_idx + 2 * stride, threads);
auto right_end = (right_end_chunk == threads) ? last : first + right_end_chunk * chunk_size;

inplace_merge_rotation(left_start, mid, right_end, comp);
});
}
}

/**
* @brief NUMA topology descriptor: describing memory pools and core counts next to them.
*
Expand Down
Loading
Loading