diff --git a/include/fork_union.hpp b/include/fork_union.hpp index e1b25aa..3aaf819 100644 --- a/include/fork_union.hpp +++ b/include/fork_union.hpp @@ -38,6 +38,23 @@ * - `for_n_dynamic` - for unevenly distributed tasks, where each task may take a different time. * - `for_slices` - for iterating over a range of similar duration tasks, addressable by a slice. * + * Aside from loops, "map-reduce" and "sorting" algorithms are also provided, but they go far beyond + * the capabilities of Parallel STL, to allow users to pass SIMD-capable lambdas, and implement not + * only one level of slicing for parallel operations, but also tree-like aggregations: + * + * - `transform_filter_n[_dynamic]` - + * - `transform_filter_reduce_n[_dynamic]` - + * - TODO: `argsort_n[_dynamic]` - + * - TODO: `sort_n[_dynamic]` - + * - TODO: `transform_filter_scan_n[_dynamic]` + * + * STL Ranges-like Parallel Algorithms for many iterator types are also provided. + * + * @see `for_each`: https://en.cppreference.com/w/cpp/algorithm/for_each.html + * @see `for_each_n`: https://en.cppreference.com/w/cpp/algorithm/for_each_n.html + * @see `transform`: https://en.cppreference.com/w/cpp/algorithm/transform.html + * @see `reduce`: https://en.cppreference.com/w/cpp/algorithm/reduce.html + * @see `sort`: https://en.cppreference.com/w/cpp/algorithm/sort.html * ------------------------------------------------------------------------------------------------ * * On Linux, when NUMA and PThreads are available, the library can also leverage @b NUMA-aware @@ -1991,6 +2008,206 @@ void bubble_sort(value_type_ *array, std::size_t size, comparator_type_ comp = { if (comp(array[j + 1], array[j])) std::swap(array[j], array[j + 1]); } +namespace { +namespace fu = ashvardanian::fork_union; +} + +/** + * @brief Find the first position where value could be inserted to maintain sorted order. + */ +template > +iterator_type_ upper_bound(iterator_type_ first, iterator_type_ last, value_type_ const &value, + comparator_type_ comp = {}) noexcept { + auto count = last - first; + while (count > 0) { + auto step = count / 2; + auto it = first + step; + if (!comp(value, *it)) { + first = ++it; + count -= step + 1; + } + else { count = step; } + } + return first; +} + +/** + * @brief Rotate elements in range [first, last) such that middle becomes the new first. + */ +template +iterator_type_ rotate(iterator_type_ first, iterator_type_ middle, iterator_type_ last) noexcept { + if (first == middle) return last; + if (middle == last) return first; + + auto result = first + (last - middle); + auto next = middle; + + while (first != next) { + std::swap(*first++, *next++); + if (next == last) next = middle; + else if (first == middle) + middle = next; + } + + return result; +} + +/** + * @brief In-place merge two sorted ranges without additional memory allocations. + * @param[in] first Iterator to the beginning of the first sorted range. + * @param[in] middle Iterator to the beginning of the second sorted range. + * @param[in] last Iterator to the end of the second sorted range. + * @param[in] comp Comparator function for ordering elements. + */ +template > +void inplace_merge_rotation(iterator_type_ first, iterator_type_ middle, iterator_type_ last, + comparator_type_ comp = {}) noexcept { + if (first == middle || middle == last) return; + + auto left_size = middle - first; + auto right_size = last - middle; + + if (left_size == 0 || right_size == 0) return; + + if (left_size + right_size <= 32) { + for (auto it = middle; it != last; ++it) { + auto value = std::move(*it); + auto pos = fu::upper_bound(first, it, value, comp); + std::move_backward(pos, it, it + 1); + *pos = std::move(value); + } + return; + } + + if (left_size <= right_size) { + if (left_size == 1) { + auto insert_pos = std::lower_bound(middle, last, *first, comp); + fu::rotate(first, middle, insert_pos); + } + else { + auto mid_left = first + left_size / 2; + auto mid_right = std::lower_bound(middle, last, *mid_left, comp); + auto new_middle = fu::rotate(mid_left, middle, mid_right); + if (mid_left > first) inplace_merge_rotation(first, mid_left, new_middle, comp); + if (mid_right < last) inplace_merge_rotation(new_middle, mid_right, last, comp); + } + } + else { + auto mid_right = middle + right_size / 2; + auto mid_left = fu::upper_bound(first, middle, *mid_right, comp); + auto new_middle = fu::rotate(mid_left, middle, mid_right); + inplace_merge_rotation(first, mid_left, new_middle, comp); + inplace_merge_rotation(new_middle, mid_right, last, comp); + } +} + +/** + * @brief Three-way quicksort implementation for efficient handling of duplicate elements. + * @param[in] first Iterator to the beginning of the range to sort. + * @param[in] last Iterator to the end of the range to sort. + * @param[in] comp Comparator function for ordering elements. + */ +template > +void three_way_quicksort(iterator_type_ first, iterator_type_ last, comparator_type_ comp = {}) noexcept { + if (last - first <= 1) return; + if (last - first <= 16) { + for (auto i = first + 1; i < last; ++i) { + auto value = std::move(*i); + auto j = i; + while (j > first && comp(value, *(j - 1))) { + *j = std::move(*(j - 1)); + --j; + } + *j = std::move(value); + } + return; + } + + auto pivot = *first; + auto lt = first; + auto gt = last; + auto i = first + 1; + + while (i < gt) { + if (comp(*i, pivot)) { std::iter_swap(i++, lt++); } + else if (comp(pivot, *i)) { std::iter_swap(i, --gt); } + else { ++i; } + } + + three_way_quicksort(first, lt, comp); + three_way_quicksort(gt, last, comp); +} + +/** + * @brief Sorts elements in parallel using multiple threads from the given thread pool. + * + * Executes a parallel sorting algorithm that divides the input range into chunks, sorts each + * chunk independently using three-way quicksort, and then merges the sorted chunks using + * in-place rotation-based merging. This approach avoids dynamic memory allocations during + * execution and efficiently handles datasets with duplicate values. + * + * @code{.cpp} + * std::vector data = {3, 1, 4, 1, 5, 9, 2, 6, 5, 3, 5}; + * fu::basic_pool_t pool; + * pool.try_spawn(std::thread::hardware_concurrency()); + * + * fu::sort(pool, data.begin(), data.end()); + * // data is now sorted: {1, 1, 2, 3, 3, 4, 5, 5, 5, 6, 9} + * @endcode + * + * @tparam pool_type_ Thread pool type that provides `threads_count()`, `for_threads()`, and `for_n()` methods. + * @tparam iterator_type_ Random access iterator type. + * @tparam comparator_type_ Callable type for element comparison, defaults to `std::less<>`. + * @param[in] pool Reference to the thread pool executor to use for parallel sorting. + * @param[in] first Iterator to the beginning of the range to sort. + * @param[in] last Iterator to the end of the range to sort. + * @param[in] comp Comparator function for ordering elements. + * + * @note The function is `noexcept` and performs no dynamic memory allocations during execution. + * @note Requires the iterator type to be random access and the range to be valid. + * @note For best performance, ensure the range size is significantly larger than the thread count. + * + * @sa `bubble_sort` for a simple sequential sorting algorithm. + * @sa `for_n` for the underlying parallel execution mechanism. + */ +template > +void sort(pool_type_ &pool, iterator_type_ first, iterator_type_ last, comparator_type_ comp = {}) noexcept { + assert(first <= last && "Iterator range must be valid"); + + std::size_t const size = static_cast(last - first); + std::size_t const threads = pool.threads_count(); + constexpr std::size_t const sequential_threshold = 1024; + + if (size <= sequential_threshold || threads == 1) return three_way_quicksort(first, last, comp); + + std::size_t const chunk_size = size / threads; + if (chunk_size == 0) return three_way_quicksort(first, last, comp); + + pool.for_threads([=](std::size_t thread) noexcept { + auto start = first + thread * chunk_size; + auto finish = (thread == threads - 1) ? last : start + chunk_size; + three_way_quicksort(start, finish, comp); + }); + + for (std::size_t stride = 1; stride < threads; stride *= 2) { + pool.for_n(threads, [=](std::size_t thread_idx) noexcept { + if (thread_idx % (2 * stride) != 0) return; + + std::size_t left_chunk = thread_idx; + std::size_t right_chunk = thread_idx + stride; + if (right_chunk >= threads) return; + + auto left_start = first + left_chunk * chunk_size; + auto mid = first + right_chunk * chunk_size; + + std::size_t right_end_chunk = std::min(thread_idx + 2 * stride, threads); + auto right_end = (right_end_chunk == threads) ? last : first + right_end_chunk * chunk_size; + + inplace_merge_rotation(left_start, mid, right_end, comp); + }); + } +} + /** * @brief NUMA topology descriptor: describing memory pools and core counts next to them. * diff --git a/scripts/test.cpp b/scripts/test.cpp index 806c30b..78acf88 100644 --- a/scripts/test.cpp +++ b/scripts/test.cpp @@ -2,6 +2,8 @@ #include // `EXIT_FAILURE`, `EXIT_SUCCESS` #include // `std::vector` #include // `std::sort` +#include // `std::iota` +#include // `std::random_device`, `std::mt19937`, `std::shuffle` #include @@ -430,6 +432,157 @@ static bool stress_test_composite(std::size_t const threads_count, std::size_t c return true; } +template +static bool test_sort_edge_cases() noexcept { + auto maker = make_pool_type_ {}; + auto pool = maker.construct(); + if (!pool.try_spawn(maker.scope())) return false; + + // Test various problematic size/thread combinations + std::vector thread_counts = {1, 2, 3, 4, 5, 7, 8, 15, 16, 31, 32}; + std::vector sizes = { + 0, 1, 2, 3, 7, 8, 15, 16, 31, 32, 63, 64, 127, 128, + 255, 256, 511, 512, 1023, 1024, 1025, 2047, 2048, 4095, 4096, 8191, 8192, + }; + + for (auto size : sizes) { + for (auto threads : thread_counts) { + if (threads > pool.threads_count()) continue; + + // Create test data: reverse sorted to stress the algorithm + std::vector data(size); + std::iota(data.rbegin(), data.rend(), 0); + + std::vector expected = data; + std::sort(expected.begin(), expected.end()); + + // Test with a pool limited to specific thread count + auto test_pool = maker.construct(); + if (!test_pool.try_spawn(static_cast(threads))) return false; + + fu::sort(test_pool, data.begin(), data.end()); + + if (data != expected) return false; + } + } + + return true; +} + +template +static bool test_sort_duplicates() noexcept { + auto maker = make_pool_type_ {}; + auto pool = maker.construct(); + if (!pool.try_spawn(maker.scope())) return false; + + // Test with many duplicates - this stresses the three-way quicksort + std::vector sizes = {100, 1000, 10000}; + std::vector patterns = {1, 5, 10, 50}; // Number of unique values + + for (auto size : sizes) { + for (auto pattern : patterns) { + std::vector data(size); + for (std::size_t i = 0; i < size; ++i) { data[i] = static_cast(i % pattern); } + + // Shuffle to create random order + std::random_device random_device; + std::mt19937 random_generator(random_device()); + std::shuffle(data.begin(), data.end(), random_generator); + + std::vector expected = data; + std::sort(expected.begin(), expected.end()); + + fu::sort(pool, data.begin(), data.end()); + + if (data != expected) return false; + } + } + + return true; +} + +template +static bool test_sort_pathological() noexcept { + auto maker = make_pool_type_ {}; + auto pool = maker.construct(); + if (!pool.try_spawn(maker.scope())) return false; + + // Test cases that are problematic for quicksort variants + std::vector> test_cases { + // Already sorted + {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + // Reverse sorted + {10, 9, 8, 7, 6, 5, 4, 3, 2, 1}, + // All same elements + {5, 5, 5, 5, 5, 5, 5, 5, 5, 5}, + // Alternating pattern + {1, 10, 2, 9, 3, 8, 4, 7, 5, 6}, + // Single outlier + {1, 1, 1, 1, 1000, 1, 1, 1, 1, 1}, + // Two groups + {1, 1, 1, 1, 1, 100, 100, 100, 100, 100}, + }; + + for (auto &test_case : test_cases) { + // Test with different sizes by replicating the pattern + for (std::size_t multiplier : {1, 10, 100}) { + + std::vector data; + for (std::size_t i = 0; i < multiplier; ++i) data.insert(data.end(), test_case.begin(), test_case.end()); + + std::vector expected = data; + std::sort(expected.begin(), expected.end()); + + fu::sort(pool, data.begin(), data.end()); + + if (data != expected) return false; + } + } + + return true; +} + +template +static bool test_sort_chunk_boundaries() noexcept { + auto maker = make_pool_type_ {}; + auto pool = maker.construct(); + if (!pool.try_spawn(maker.scope())) return false; + + std::size_t threads = pool.threads_count(); + + // Test sizes that create edge cases in chunk distribution + std::vector critical_sizes = { + threads - 1, // Fewer elements than threads + threads, // Equal to thread count + threads + 1, // One more than thread count + threads * 2 - 1, // Almost 2 per thread + threads * 2, // Exactly 2 per thread + threads * 2 + 1, // Just over 2 per thread + threads * 100 - 1, // Large with uneven distribution + threads * 100, // Large with even distribution + threads * 100 + 1 // Large with slight imbalance + }; + + for (auto size : critical_sizes) { + std::vector data(size); + std::iota(data.rbegin(), data.rend(), 0); + + // Add some randomness to avoid best-case scenarios + std::random_device random_device; + std::mt19937 random_generator(random_device()); + std::shuffle(data.begin(), data.end(), random_generator); + + std::vector expected = data; + std::sort(expected.begin(), expected.end()); + + fu::sort(pool, data.begin(), data.end()); + + if (data != expected) return false; + } + + return true; +} + /** * @brief Enhanced NUMA topology logging function using the logger class. */ @@ -480,6 +633,11 @@ int main(void) { {"`for_n_dynamic` oversubscribed threads", test_oversubscribed_threads}, // {"`terminate` avoided", test_mixed_restart}, // {"`terminate` and re-spawn", test_mixed_restart}, // + // Parallel sort tests + {"`sort` edge cases", test_sort_edge_cases}, // + {"`sort` duplicates", test_sort_duplicates}, // + {"`sort` pathological cases", test_sort_pathological}, // + {"`sort` chunk boundaries", test_sort_chunk_boundaries}, // #if FU_ENABLE_NUMA // Uniform Memory Access (UMA) tests for threads pinned to the same NUMA node {"UMA `try_spawn` normal", test_try_spawn_success},