Skip to content

Conversation

@adamreichold
Copy link

No description provided.

@ashvardanian
Copy link
Owner

Hi @adamreichold! The proposed variant doesn't cover the synchronization between threads. Other threads need a way to know that the work has stopped.

There is probably a way to reduce the contention on the stop and generation, but it seems like this is not the way to achieve that 🤷

@adamreichold
Copy link
Author

The proposed variant doesn't cover the synchronization between threads.

The synchronization happens with the same atomics that are used to submit any work item to the workers.

Other threads need a way to know that the work has stopped.

By broadcasting the stop_trampoline, each thread will execute it once and thereby set its local stop variable. As written above, this relies on the existing synchronization used during operation of the pool.

@ashvardanian
Copy link
Owner

I am not sure, what "broadcasting" means? What kind of logic will be executed in the CPU - which instructions?

@ashvardanian ashvardanian changed the title Avoid one atomic access per work item by stopping workers from the inside out. Reduce stop Contention? Jun 1, 2025
@ashvardanian ashvardanian changed the base branch from main to main-dev June 1, 2025 13:42
@ashvardanian ashvardanian marked this pull request as draft June 1, 2025 13:42
@adamreichold
Copy link
Author

adamreichold commented Jun 1, 2025

The same as when any other work item/trampoline is submitted to the thread pool. The atomic instructions used to modify threads_to_sync and generation. I also think, it might be best if you try the changes out to convince yourself that this works.

Maybe think of it this way: If you already have a method to run arbitrary closures on a given set of threads, do you really need additional shared state to tell these threads to exit?

Or if a reference to an authority helps: I did not come up with this. I think the first time I read about this method to handle shutting down a thread pool was in an old blog post by Herb Sutter. (Of course, I can't find it now...)

@adamreichold adamreichold marked this pull request as ready for review June 1, 2025 13:42
@adamreichold adamreichold changed the title Reduce stop Contention? Avoid one atomic access per work item by stopping workers from the inside out. Jun 1, 2025
@adamreichold
Copy link
Author

adamreichold commented Jun 1, 2025

I am not sure, what "broadcasting" means?

It means what the existing method called "broadcast" does: Ensure that a given closure is called exactly one by each worker thread in the pool. Only the minor technicality that we are not interested in the result and do not need to run this on the current thread means that the actual method isn't called.

@ashvardanian
Copy link
Owner

I think an example may help. I think we are tackling two different use-cases:

  • Having each thread decide it's own completion time individually, vs.
  • Shutting down the thread pool.

The stop was used for the latter. And we don't need to pass an additional boolean into the function call, as any arbitrary lambda/closure context is propagated. So you can pass the thread_pool_t & into the lambda, and stop it from the inside. You can also pass std::atomic_flag or std::stop_token or any other primitive by reference - synchronized or not. Standard libraries often have too many pieces of functionality for that, and none of them are explicit about the underlying hardware features they leverage, so using them here is hard to justify 🤷

@adamreichold
Copy link
Author

The stop was used for the latter.

And the stop: &mut bool argument is also used for the latter. It is internal work items submitted by the pool itself which can see this argument. And they use it to tell the worker which is currently executing them to stop. Only the current worker, but since the work item is broadcast, all workers will do this eventually and exit. So this is method to shutdown the whole pool just as with the stop atomic flag.

But the "message to stop", so to speak, is packaged into a closure and transferred using the existing work submission mechanism instead of using the out-of-band stop flag.

(This is also why the extra trampoline argument is required. The point of this is to avoid shared global state. It could also be a return value, but I figured this might be more costly if every work has to produce it compared to ignoring the argument. It could also be an array with one (non-atomic) flag per worker which is then indexed using the thread index, but that is just much more complicated than the on-stack state for no good reason.)

I think an example may help.

Every test case in this repository runs this code when it shut downs its thread pool. There is nothing extra to see here, this is just a more efficient mechanism to implement the existing shutdown semantics.

@ashvardanian
Copy link
Owner

Thanks for the explanations! We can have such logic in the lambda, but we also need a way to propagate the signal handlers and other interruptions originating outside of the loop. I'll come back to this in a couple of weeks and will try to incorporate your suggestions 🤗

@ashvardanian ashvardanian self-assigned this Jun 3, 2025
let context = inner.context();
unsafe {
trampoline(context, thread_index);
trampoline(context, thread_index, &mut stop);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use an out pointer instead of a return value?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My initial idea was that the overhead might be lower as those work items which do not want to stop the pool (so basically all but one) can just ignore the additional argument (instead of producing a useless false as a return value). I should pull the initialization out of the loop though so that the worker loop does not pay the price of it for each work item...

I think there is an ever nicer way which does not need to touch the trampoline signature at all but using the identity of the stop_trampoline, c.f. adamreichold/fork-join-scope@cdb63bf But transplanting it here is not that trivial because of the rules around function items/pointers and their comparison. So personally, I would prefer to switch to using dyn Fn(usize) + Sync as the work items instead of manually splitting context and trampoline first before implementing that.

@chengts95
Copy link

chengts95 commented Oct 15, 2025

I have tested the stop_trampoline solution in the new C++ threadpool. It works as intended. However, the improvement is hard to observe. Maybe the benefit is small since the atomic read won't occur when the worker is busy.
By the way, I just wanted to say—I really appreciate the quality of your work. This library brings a refreshing clarity to how parallel computation can be done in Rust. Beautifully designed and very inspiring!

@ashvardanian ashvardanian changed the title Avoid one atomic access per work item by stopping workers from the inside out. Stoppable Submissions in Rust Oct 19, 2025
@ashvardanian
Copy link
Owner

Thanks for suggestions, @adamreichold! And for the kind words, @chengts95 😊

I'll be adding early-stoppable parallel iterators in the upcoming v3, together with Zig support and other features. Let me know if you have any other ideas worth exploring 🤗

@chengts95
Copy link

Hi. Maybe we can have an aysnc par for and allow manually join the threads.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants