From 3cf9c0a84de6900a01cf22d6542046410b51ead7 Mon Sep 17 00:00:00 2001 From: Ash Vardanian <1983160+ashvardanian@users.noreply.github.com> Date: Thu, 29 May 2025 16:11:58 +0000 Subject: [PATCH 1/2] Add: Draft Parallel STL operations --- include/fork_union.hpp | 193 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 183 insertions(+), 10 deletions(-) diff --git a/include/fork_union.hpp b/include/fork_union.hpp index 2f53e07..aff7e29 100644 --- a/include/fork_union.hpp +++ b/include/fork_union.hpp @@ -37,6 +37,24 @@ * - `for_n` - for iterating over a range of similar duration tasks, addressable by an index. * - `for_n_dynamic` - for unevenly distributed tasks, where each task may take a different time. * - `for_slices` - for iterating over a range of similar duration tasks, addressable by a slice. + * + * Aside from loops, "map-reduce" and "sorting" algorithms are also provided, but they go far beyond + * the capabilities of Parallel STL, to allow users to pass SIMD-capable lambdas, and implement not + * only one level of slicing for parallel operations, but also tree-like aggregations: + * + * - `transform_filter_n[_dynamic]` - + * - `transform_filter_reduce_n[_dynamic]` - + * - TODO: `argsort_n[_dynamic]` - + * - TODO: `sort_n[_dynamic]` - + * - TODO: `transform_filter_scan_n[_dynamic]` + * + * STL Ranges-like Parallel Algorithms for many iterator types are also provided. + * + * @see `for_each`: https://en.cppreference.com/w/cpp/algorithm/for_each.html + * @see `for_each_n`: https://en.cppreference.com/w/cpp/algorithm/for_each_n.html + * @see `transform`: https://en.cppreference.com/w/cpp/algorithm/transform.html + * @see `reduce`: https://en.cppreference.com/w/cpp/algorithm/reduce.html + * @see `sort`: https://en.cppreference.com/w/cpp/algorithm/sort.html */ #pragma once #include // `std::allocator` @@ -377,7 +395,7 @@ using thread_pool_t = thread_pool>; #pragma endregion - Thread Pool -#pragma region - Indexed Tasks +#pragma region - Parallel Loops /** * @brief A "prong" - is a tip of a "fork" - describing a "thread" pinning a "task". @@ -577,17 +595,172 @@ void for_n_dynamic( // }); } -#pragma endregion - Indexed Tasks +#pragma endregion - Parallel Loops + +#pragma region - Indexed Algorithms -#if defined(FU_ALLOW_UNSAFE) -#pragma region - Unsafe Extensions -template -void unsafe_broadcast(function_type_ const &function) noexcept(false) { - std::exception_ptr exception_ptr; - std::atomic first_exception_thread_index; +struct filter_result_t { + std::size_t inputs_received = 0; + std::size_t outputs_written = 0; +}; + +/** + * @brief Map and Filter in parallel, outputting into a pre-allocated buffer. + * + * @param[in] n The number of times to call the @p transform. + * @param[in] transform The function, receiving the `prong_t` or the task index as an argument. + * @param[in] filter The function, receiving the @p transform -ed object and returning `true` if it should be included. + * @param[in] outputs The pre-allocated buffer to write the @p filter -ed objects into. + * + * This function operates under the following assumptions: + * - The @p transform may be comparatively expensive, as it's at least a memory load. + * - The @p filter is relatively cheap, mostly comparing some cached or in-register values. + * - Writing to shared memory is expensive, especially if the addresses are scattered. + * + * Assuming writes are expensive, we want to write data just once, knowing exactly where in @p outputs + * to place it. Assuming reading cached data is orders of magnitude is cheaper than reading globally, + * we want to process the data in small chunks, fitting in L3, knowing how many of them will be exported. + * + * @see https://en.cppreference.com/w/cpp/algorithm/transform.html + * @see https://en.cppreference.com/w/cpp/algorithm/replace.html + * @see https://en.cppreference.com/w/cpp/algorithm/replace_copy.html + */ +template < // + typename allocator_type_ = std::allocator, // + typename index_type_ = std::size_t, // + std::size_t alignment_ = default_alignment_k, // + typename transform_type_ = dummy_lambda_t, // + typename filter_type_ = dummy_lambda_t, // + typename outputs_type_ = void // + > +filter_result_t transform_filter_n( // + thread_pool &pool, // + std::size_t const n, transform_type_ const &transform, // + filter_type_ const &filter, outputs_type_ &&outputs) noexcept { + + // We need to minimize false-sharing between threads exporting filtered data into + // a continuous buffer, or contention on locks/mutexes synchronizing appends to a + // dynamic container, like a queue, vector, or a binary search tree. } -#pragma endregion - Unsafe Extensions -#endif + +/** + * @brief Map-Reduce with an optional filter stage, outputting a single object. + * + * @param[in] pool The thread pool to use for parallel execution. + * @param[in] n The number of times to call the @p transform. + * @param[in] initial The initial value for the reduction. + * @param[in] transform The function, receiving the `prong_t` or the task index as an argument. + * @param[in] filter The function, receiving the @p transform -ed object and returning `true` if it should be included. + * @param[in] reduce The function, receiving two @p transform -ed objects and returning a reduced object. + * + * @see https://en.cppreference.com/w/cpp/algorithm/transform_reduce.html + * @see https://en.cppreference.com/w/cpp/algorithm/reduce.html + */ +template < // + typename allocator_type_ = std::allocator, // + typename index_type_ = std::size_t, // + std::size_t alignment_ = default_alignment_k, // + typename transform_type_ = dummy_lambda_t, // + typename filter_type_ = dummy_lambda_t, // + typename reduce_type_ = dummy_lambda_t, // + typename result_type_ = int, // + typename results_allocator_type_ = std::allocator // + > +result_type_ transform_filter_reduce_n( // + thread_pool &pool, // + std::size_t const n, result_type_ const initial, transform_type_ const &transform, // + filter_type_ const &filter, reduce_type_ const &reduce, results_allocator_type_ const &results_allocator) noexcept { + + using result_t = result_type_; + using transform_t = transform_type_; + using filter_t = filter_type_; + using reduce_t = reduce_type_; + static_assert(!std::is_void::value, "The reduce function must return a value"); + + // Infer the callback output types + using transformed_t = decltype(transform(prong_t {0, 0})); // ? The type of the transformed value + using reduced_t = decltype(reduce(result_t {}, transformed_t {})); // ? The type of the reduced value + static_assert(std::is_nothrow_invocable_r::value, + "The reduction combiner must be nothrow invocable with a `result_t` and a `transformed_t` argument"); + + // Allocate one temporary object for +} + +template < // + typename allocator_type_ = std::allocator, // + typename index_type_ = std::size_t, // + std::size_t alignment_ = default_alignment_k, // + typename transform_type_ = dummy_lambda_t, // + typename filter_type_ = dummy_lambda_t, // + typename reduce_type_ = dummy_lambda_t, // + typename result_type_ = int, // + typename results_allocator_type_ = std::allocator // + > +result_type_ transform_filter_reduce_n_dynamic( // + thread_pool &pool, // + std::size_t const n, result_type_ const initial, transform_type_ const &transform, // + filter_type_ const &filter, reduce_type_ const &reduce, results_allocator_type_ const &results_allocator) noexcept { + +} + +#pragma endregion - Indexed Algorithms + +#pragma region - Parallel STL + +/** + * @brief Simple `std::sort`-like algorithm for parallel sorting of a range of elements, that won't raise exceptions. + * @see https://en.cppreference.com/w/cpp/algorithm/sort.html + */ +template +void sort(iterator_type_ const begin, iterator_type_ const end) noexcept { + + // For parallel sorting, we can benefit from median-like algorithms, counting the number of elements + // in each threads output slice. +} + +/** + * @brief Takes a range of elements [A, B) and [B, C), forming two continuous sorted runs, and merges them in-place. + * @see https://en.cppreference.com/w/cpp/algorithm/inplace_merge.html + */ +template +void inplace_merge(iterator_type_ const begin, iterator_type_ const end, std::size_t const first_run_length) noexcept { + // +} + +/** + * + * Unlike the `std::sort`, doesn't have an implicit uncontrolled memory allocation, and won't raise an exception. + */ +template < // + typename allocator_type_ = std::allocator, // + typename index_type_ = std::size_t, // + std::size_t alignment_ = default_alignment_k, // + typename iterator_type_ = void // + > +void sort( // + thread_pool &pool, // + iterator_type_ const begin, iterator_type_ const end) noexcept { + + // For parallel sorting, we can benefit from median-like algorithms, counting the number of elements + // in each threads output slice. +} + +template < // + typename allocator_type_ = std::allocator, // + typename index_type_ = std::size_t, // + std::size_t alignment_ = default_alignment_k, // + typename iterator_type_ = void, // + typename result_type_ = int // + > +result_type_ reduce( // + thread_pool &pool, // + iterator_type_ const begin, iterator_type_ const end, result_type_ const initial) noexcept { + + // For parallel sorting, we can benefit from median-like algorithms, counting the number of elements + // in each threads output slice. +} + +#pragma endregion - Parallel STL } // namespace fork_union } // namespace ashvardanian From 41119cea843571ce2cb61ebe7fa9d535dd361624 Mon Sep 17 00:00:00 2001 From: Ash Vardanian <1983160+ashvardanian@users.noreply.github.com> Date: Sun, 3 Aug 2025 23:07:22 +0000 Subject: [PATCH 2/2] Add: Parallel alternative draft for `std::sort` --- include/fork_union.hpp | 200 +++++++++++++++++++++++++++++++++++++++++ scripts/test.cpp | 158 ++++++++++++++++++++++++++++++++ 2 files changed, 358 insertions(+) diff --git a/include/fork_union.hpp b/include/fork_union.hpp index c9c1abc..3aaf819 100644 --- a/include/fork_union.hpp +++ b/include/fork_union.hpp @@ -2008,6 +2008,206 @@ void bubble_sort(value_type_ *array, std::size_t size, comparator_type_ comp = { if (comp(array[j + 1], array[j])) std::swap(array[j], array[j + 1]); } +namespace { +namespace fu = ashvardanian::fork_union; +} + +/** + * @brief Find the first position where value could be inserted to maintain sorted order. + */ +template > +iterator_type_ upper_bound(iterator_type_ first, iterator_type_ last, value_type_ const &value, + comparator_type_ comp = {}) noexcept { + auto count = last - first; + while (count > 0) { + auto step = count / 2; + auto it = first + step; + if (!comp(value, *it)) { + first = ++it; + count -= step + 1; + } + else { count = step; } + } + return first; +} + +/** + * @brief Rotate elements in range [first, last) such that middle becomes the new first. + */ +template +iterator_type_ rotate(iterator_type_ first, iterator_type_ middle, iterator_type_ last) noexcept { + if (first == middle) return last; + if (middle == last) return first; + + auto result = first + (last - middle); + auto next = middle; + + while (first != next) { + std::swap(*first++, *next++); + if (next == last) next = middle; + else if (first == middle) + middle = next; + } + + return result; +} + +/** + * @brief In-place merge two sorted ranges without additional memory allocations. + * @param[in] first Iterator to the beginning of the first sorted range. + * @param[in] middle Iterator to the beginning of the second sorted range. + * @param[in] last Iterator to the end of the second sorted range. + * @param[in] comp Comparator function for ordering elements. + */ +template > +void inplace_merge_rotation(iterator_type_ first, iterator_type_ middle, iterator_type_ last, + comparator_type_ comp = {}) noexcept { + if (first == middle || middle == last) return; + + auto left_size = middle - first; + auto right_size = last - middle; + + if (left_size == 0 || right_size == 0) return; + + if (left_size + right_size <= 32) { + for (auto it = middle; it != last; ++it) { + auto value = std::move(*it); + auto pos = fu::upper_bound(first, it, value, comp); + std::move_backward(pos, it, it + 1); + *pos = std::move(value); + } + return; + } + + if (left_size <= right_size) { + if (left_size == 1) { + auto insert_pos = std::lower_bound(middle, last, *first, comp); + fu::rotate(first, middle, insert_pos); + } + else { + auto mid_left = first + left_size / 2; + auto mid_right = std::lower_bound(middle, last, *mid_left, comp); + auto new_middle = fu::rotate(mid_left, middle, mid_right); + if (mid_left > first) inplace_merge_rotation(first, mid_left, new_middle, comp); + if (mid_right < last) inplace_merge_rotation(new_middle, mid_right, last, comp); + } + } + else { + auto mid_right = middle + right_size / 2; + auto mid_left = fu::upper_bound(first, middle, *mid_right, comp); + auto new_middle = fu::rotate(mid_left, middle, mid_right); + inplace_merge_rotation(first, mid_left, new_middle, comp); + inplace_merge_rotation(new_middle, mid_right, last, comp); + } +} + +/** + * @brief Three-way quicksort implementation for efficient handling of duplicate elements. + * @param[in] first Iterator to the beginning of the range to sort. + * @param[in] last Iterator to the end of the range to sort. + * @param[in] comp Comparator function for ordering elements. + */ +template > +void three_way_quicksort(iterator_type_ first, iterator_type_ last, comparator_type_ comp = {}) noexcept { + if (last - first <= 1) return; + if (last - first <= 16) { + for (auto i = first + 1; i < last; ++i) { + auto value = std::move(*i); + auto j = i; + while (j > first && comp(value, *(j - 1))) { + *j = std::move(*(j - 1)); + --j; + } + *j = std::move(value); + } + return; + } + + auto pivot = *first; + auto lt = first; + auto gt = last; + auto i = first + 1; + + while (i < gt) { + if (comp(*i, pivot)) { std::iter_swap(i++, lt++); } + else if (comp(pivot, *i)) { std::iter_swap(i, --gt); } + else { ++i; } + } + + three_way_quicksort(first, lt, comp); + three_way_quicksort(gt, last, comp); +} + +/** + * @brief Sorts elements in parallel using multiple threads from the given thread pool. + * + * Executes a parallel sorting algorithm that divides the input range into chunks, sorts each + * chunk independently using three-way quicksort, and then merges the sorted chunks using + * in-place rotation-based merging. This approach avoids dynamic memory allocations during + * execution and efficiently handles datasets with duplicate values. + * + * @code{.cpp} + * std::vector data = {3, 1, 4, 1, 5, 9, 2, 6, 5, 3, 5}; + * fu::basic_pool_t pool; + * pool.try_spawn(std::thread::hardware_concurrency()); + * + * fu::sort(pool, data.begin(), data.end()); + * // data is now sorted: {1, 1, 2, 3, 3, 4, 5, 5, 5, 6, 9} + * @endcode + * + * @tparam pool_type_ Thread pool type that provides `threads_count()`, `for_threads()`, and `for_n()` methods. + * @tparam iterator_type_ Random access iterator type. + * @tparam comparator_type_ Callable type for element comparison, defaults to `std::less<>`. + * @param[in] pool Reference to the thread pool executor to use for parallel sorting. + * @param[in] first Iterator to the beginning of the range to sort. + * @param[in] last Iterator to the end of the range to sort. + * @param[in] comp Comparator function for ordering elements. + * + * @note The function is `noexcept` and performs no dynamic memory allocations during execution. + * @note Requires the iterator type to be random access and the range to be valid. + * @note For best performance, ensure the range size is significantly larger than the thread count. + * + * @sa `bubble_sort` for a simple sequential sorting algorithm. + * @sa `for_n` for the underlying parallel execution mechanism. + */ +template > +void sort(pool_type_ &pool, iterator_type_ first, iterator_type_ last, comparator_type_ comp = {}) noexcept { + assert(first <= last && "Iterator range must be valid"); + + std::size_t const size = static_cast(last - first); + std::size_t const threads = pool.threads_count(); + constexpr std::size_t const sequential_threshold = 1024; + + if (size <= sequential_threshold || threads == 1) return three_way_quicksort(first, last, comp); + + std::size_t const chunk_size = size / threads; + if (chunk_size == 0) return three_way_quicksort(first, last, comp); + + pool.for_threads([=](std::size_t thread) noexcept { + auto start = first + thread * chunk_size; + auto finish = (thread == threads - 1) ? last : start + chunk_size; + three_way_quicksort(start, finish, comp); + }); + + for (std::size_t stride = 1; stride < threads; stride *= 2) { + pool.for_n(threads, [=](std::size_t thread_idx) noexcept { + if (thread_idx % (2 * stride) != 0) return; + + std::size_t left_chunk = thread_idx; + std::size_t right_chunk = thread_idx + stride; + if (right_chunk >= threads) return; + + auto left_start = first + left_chunk * chunk_size; + auto mid = first + right_chunk * chunk_size; + + std::size_t right_end_chunk = std::min(thread_idx + 2 * stride, threads); + auto right_end = (right_end_chunk == threads) ? last : first + right_end_chunk * chunk_size; + + inplace_merge_rotation(left_start, mid, right_end, comp); + }); + } +} + /** * @brief NUMA topology descriptor: describing memory pools and core counts next to them. * diff --git a/scripts/test.cpp b/scripts/test.cpp index 806c30b..78acf88 100644 --- a/scripts/test.cpp +++ b/scripts/test.cpp @@ -2,6 +2,8 @@ #include // `EXIT_FAILURE`, `EXIT_SUCCESS` #include // `std::vector` #include // `std::sort` +#include // `std::iota` +#include // `std::random_device`, `std::mt19937`, `std::shuffle` #include @@ -430,6 +432,157 @@ static bool stress_test_composite(std::size_t const threads_count, std::size_t c return true; } +template +static bool test_sort_edge_cases() noexcept { + auto maker = make_pool_type_ {}; + auto pool = maker.construct(); + if (!pool.try_spawn(maker.scope())) return false; + + // Test various problematic size/thread combinations + std::vector thread_counts = {1, 2, 3, 4, 5, 7, 8, 15, 16, 31, 32}; + std::vector sizes = { + 0, 1, 2, 3, 7, 8, 15, 16, 31, 32, 63, 64, 127, 128, + 255, 256, 511, 512, 1023, 1024, 1025, 2047, 2048, 4095, 4096, 8191, 8192, + }; + + for (auto size : sizes) { + for (auto threads : thread_counts) { + if (threads > pool.threads_count()) continue; + + // Create test data: reverse sorted to stress the algorithm + std::vector data(size); + std::iota(data.rbegin(), data.rend(), 0); + + std::vector expected = data; + std::sort(expected.begin(), expected.end()); + + // Test with a pool limited to specific thread count + auto test_pool = maker.construct(); + if (!test_pool.try_spawn(static_cast(threads))) return false; + + fu::sort(test_pool, data.begin(), data.end()); + + if (data != expected) return false; + } + } + + return true; +} + +template +static bool test_sort_duplicates() noexcept { + auto maker = make_pool_type_ {}; + auto pool = maker.construct(); + if (!pool.try_spawn(maker.scope())) return false; + + // Test with many duplicates - this stresses the three-way quicksort + std::vector sizes = {100, 1000, 10000}; + std::vector patterns = {1, 5, 10, 50}; // Number of unique values + + for (auto size : sizes) { + for (auto pattern : patterns) { + std::vector data(size); + for (std::size_t i = 0; i < size; ++i) { data[i] = static_cast(i % pattern); } + + // Shuffle to create random order + std::random_device random_device; + std::mt19937 random_generator(random_device()); + std::shuffle(data.begin(), data.end(), random_generator); + + std::vector expected = data; + std::sort(expected.begin(), expected.end()); + + fu::sort(pool, data.begin(), data.end()); + + if (data != expected) return false; + } + } + + return true; +} + +template +static bool test_sort_pathological() noexcept { + auto maker = make_pool_type_ {}; + auto pool = maker.construct(); + if (!pool.try_spawn(maker.scope())) return false; + + // Test cases that are problematic for quicksort variants + std::vector> test_cases { + // Already sorted + {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + // Reverse sorted + {10, 9, 8, 7, 6, 5, 4, 3, 2, 1}, + // All same elements + {5, 5, 5, 5, 5, 5, 5, 5, 5, 5}, + // Alternating pattern + {1, 10, 2, 9, 3, 8, 4, 7, 5, 6}, + // Single outlier + {1, 1, 1, 1, 1000, 1, 1, 1, 1, 1}, + // Two groups + {1, 1, 1, 1, 1, 100, 100, 100, 100, 100}, + }; + + for (auto &test_case : test_cases) { + // Test with different sizes by replicating the pattern + for (std::size_t multiplier : {1, 10, 100}) { + + std::vector data; + for (std::size_t i = 0; i < multiplier; ++i) data.insert(data.end(), test_case.begin(), test_case.end()); + + std::vector expected = data; + std::sort(expected.begin(), expected.end()); + + fu::sort(pool, data.begin(), data.end()); + + if (data != expected) return false; + } + } + + return true; +} + +template +static bool test_sort_chunk_boundaries() noexcept { + auto maker = make_pool_type_ {}; + auto pool = maker.construct(); + if (!pool.try_spawn(maker.scope())) return false; + + std::size_t threads = pool.threads_count(); + + // Test sizes that create edge cases in chunk distribution + std::vector critical_sizes = { + threads - 1, // Fewer elements than threads + threads, // Equal to thread count + threads + 1, // One more than thread count + threads * 2 - 1, // Almost 2 per thread + threads * 2, // Exactly 2 per thread + threads * 2 + 1, // Just over 2 per thread + threads * 100 - 1, // Large with uneven distribution + threads * 100, // Large with even distribution + threads * 100 + 1 // Large with slight imbalance + }; + + for (auto size : critical_sizes) { + std::vector data(size); + std::iota(data.rbegin(), data.rend(), 0); + + // Add some randomness to avoid best-case scenarios + std::random_device random_device; + std::mt19937 random_generator(random_device()); + std::shuffle(data.begin(), data.end(), random_generator); + + std::vector expected = data; + std::sort(expected.begin(), expected.end()); + + fu::sort(pool, data.begin(), data.end()); + + if (data != expected) return false; + } + + return true; +} + /** * @brief Enhanced NUMA topology logging function using the logger class. */ @@ -480,6 +633,11 @@ int main(void) { {"`for_n_dynamic` oversubscribed threads", test_oversubscribed_threads}, // {"`terminate` avoided", test_mixed_restart}, // {"`terminate` and re-spawn", test_mixed_restart}, // + // Parallel sort tests + {"`sort` edge cases", test_sort_edge_cases}, // + {"`sort` duplicates", test_sort_duplicates}, // + {"`sort` pathological cases", test_sort_pathological}, // + {"`sort` chunk boundaries", test_sort_chunk_boundaries}, // #if FU_ENABLE_NUMA // Uniform Memory Access (UMA) tests for threads pinned to the same NUMA node {"UMA `try_spawn` normal", test_try_spawn_success},