Skip to content

Commit 3d033e5

Browse files
committed
Docs: Refresh NUMA example
1 parent 4e480e6 commit 3d033e5

File tree

1 file changed

+28
-27
lines changed

1 file changed

+28
-27
lines changed

README.md

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ int main() {
146146

147147
// Dispatch a callback to each thread in the pool
148148
pool.for_threads([&](std::size_t thread_index) noexcept {
149-
std::printf("Hello from thread # %zu (of %zu)\n", thread_index + 1, pool.count_threads());
149+
std::printf("Hello from thread # %zu (of %zu)\n", thread_index + 1, pool.threads_count());
150150
});
151151

152152
// Execute 1000 tasks in parallel, expecting them to have comparable runtimes
@@ -299,49 +299,50 @@ inline search_result_t pick_best(search_result_t const& a, search_result_t const
299299
300300
/// Uses all CPU threads to search for the closest vector to the @p query.
301301
search_result_t search(std::span<float, dimensions> query) {
302-
303-
bool const need_to_spawn_threads = !distributed_pool.count_threads();
302+
303+
bool const need_to_spawn_threads = distributed_pool.threads_count() == 0;
304304
if (need_to_spawn_threads) {
305305
assert(numa_topology.try_harvest() && "Failed to harvest NUMA topology");
306-
assert(numa_topology.count_nodes() == 2 && "Expected exactly 2 NUMA nodes");
306+
assert(numa_topology.nodes_count() == 2 && "Expected exactly 2 NUMA nodes");
307307
assert(distributed_pool.try_spawn(numa_topology, sizeof(search_result_t)) && "Failed to spawn NUMA pools");
308308
}
309-
309+
310310
search_result_t result;
311311
fu::spin_mutex_t result_update; // ? Lighter `std::mutex` alternative w/out system calls
312-
313-
auto concurrent_searcher = [&](auto first_prong, std::size_t count) noexcept {
314-
auto [first_index, _, colocation] = first_prong;
315-
auto& vectors = colocation == 0 ? first_half : second_half;
312+
313+
std::size_t const total_vectors =
314+
(first_half.size() + second_half.size()) / dimensions;
315+
316+
auto slices = distributed_pool.for_slices(total_vectors,
317+
[&](fu::colocated_prong<> first, std::size_t count) noexcept {
318+
319+
bool const in_second = first.colocation != 0;
320+
auto const &shard = in_second ? second_half : first_half;
321+
std::size_t const shard_base = in_second ? first_half.size() / dimensions : 0;
322+
std::size_t const local_begin = first.task - shard_base;
323+
316324
search_result_t thread_local_result;
317-
for (std::size_t task_index = first_index; task_index < first_index + count; ++task_index) {
325+
for (std::size_t i = 0; i < count; ++i) {
326+
std::size_t const local_index = local_begin + i;
327+
std::size_t const global_index = shard_base + local_index;
328+
318329
simsimd_distance_t distance;
319-
simsimd_f32_cos(query.data(), vectors.data() + task_index * dimensions, dimensions, &distance);
320-
thread_local_result = pick_best(thread_local_result, {distance, task_index});
330+
simsimd_f32_cos(query.data(), shard.data() + local_index * dimensions, dimensions, &distance);
331+
thread_local_result = pick_best(thread_local_result, {distance, global_index});
321332
}
322-
323-
// ! We are spinning on a remote cache line... for simplicity.
333+
334+
// ! Still synchronizing over a shared mutex for brevity.
324335
std::lock_guard<fu::spin_mutex_t> lock(result_update);
325336
result = pick_best(result, thread_local_result);
326-
};
327-
328-
auto _ = distributed_pool[0].for_slices(first_half.size() / dimensions, concurrent_searcher);
329-
auto _ = distributed_pool[1].for_slices(second_half.size() / dimensions, concurrent_searcher);
337+
});
338+
slices.join();
330339
return result;
331340
}
332341
```
333342

334343
In a dream world, we would call `distributed_pool.for_n`, but there is no clean way to make the scheduling processes aware of the data distribution in an arbitrary application, so that's left to the user.
335-
Calling `linux_colocated_pool::for_slices` on individual NUMA-node-specific colocated pools is the cheapest general-purpose recipe for Big Data applications.
344+
The `for_slices` helper provides colocated metadata (`fu::colocated_prong`) that lets you pick the right shard of data based on the NUMA node, while keeping scheduling inside the distributed pool.
336345
For more flexibility around building higher-level low-latency systems, there are unsafe APIs expecting you to manually "join" the broadcasted calls, like `unsafe_for_threads` and `unsafe_join`.
337-
Instead of hard-coding the `distributed_pool[0]` and `distributed_pool[1]`, we can iterate through them without keeping the lifetime-preserving handle to the passed `concurrent_searcher`:
338-
339-
```cpp
340-
for (std::size_t colocation = 0; colocation < distributed_pool.colocations_count(); ++colocation)
341-
distributed_pool[colocation].unsafe_for_threads(..., concurrent_searcher);
342-
for (std::size_t colocation = 0; colocation < distributed_pool.colocations_count(); ++colocation)
343-
distributed_pool[colocation].unsafe_join();
344-
```
345346

346347
### Efficient Busy Waiting
347348

0 commit comments

Comments
 (0)