-
Notifications
You must be signed in to change notification settings - Fork 340
Description
Async Gloo APIs RFC
Overview
This RFC proposes a few key revisions to Gloo's design. It introduces async interfaces to Gloo, to be easily consumed by ML frameworks such as PyTorch. It shifts key responsibilities from the application to Gloo, enabling better decision making at runtime. Finally, it proposes a new transport-agnostic interface to be used by Gloo's algorithms.
The RFC introduces its goals, how Gloo's current APIs make achieving the goals difficult/impossible, the proposed solution, and alternatives considered.
Goals
This RFC proposes that Gloo should have first-class support for:
- having a single CCL operation use multiple NIC ports
- deciding which NIC port(s) to use for a given CCL operation
- fully utilizing allocated CPU cores
- network topologies which do not have a physical path between all end-points (NIC ports) in the network
- transports which cannot scale to all-to-all connectivity between all NIC ports
"First-class support" means achieving these goals with Gloo is easy and intuitive. A user/application/ML framework should not resort to work-arounds to achieve these goals.
Problem Statement
Gloo's current APIs currently lack or conflict with providing first class support for the above goals. These APIs are public APIs, so changing their behavior will be difficult. This will be demonstrated for each goal.
Challenges enabling Gloo to use multiple NIC ports for a single CCL operation
Gloo's gloo::transport::Device
provides a transport-agnostic abstraction over a single NIC port. Derived classes implement transport specific behavior to utilize the NIC.
Note: we have considered a gloo::transport::Device
which abstracts over multiple physical NIC ports.
This is discussed in Multi-Port gloo::transport::Device
.
The remainder of this section discusses a gloo::transport::Device
which abstracts over a single NIC port.
A given gloo::Context
contains exactly one gloo::transport::Device
.
Each CCL operation (ex: gloo::allreduce
) operates on a single gloo::Context
, and uses the context's gloo::transport::Device
for all communication for that operation.
Today, this results in a key challenge: a single CCL operation can only utilize a single NIC port.
Since gloo::Context
is part of the public API, it cannot be easily changed to leverage multiple gloo::transport::Device
s.
Challenges enabling Gloo to choose which NIC ports to utilize for a given CCL operation
Currently, the application/ML framework will choose which NIC port must be used to run a given CCL operation (op).
However, the application/ML framework has a limited understanding of NIC port load, or how much NIC bandwidth a given CCL op will require.
Therefore, the scheduling algorithms which could be employed have limited insight into CCL performance, and cannot respond dynamically to network conditions.
This results in another key challenge: Gloo relies on the application/ML framework for key decisions around NIC ports, including choosing which NIC port to use & how to load balance across them.
Gloo is responsible for decomposing collective operations to point-to-point data transfers.
By tracking the point-to-point transfers currently underway and/or leveraging NIC statistics, Gloo can understand the current load on each NIC port.
Gloo can also know ahead of time how much load any given CCL operation will generate.
Gloo can also most effectively utilize physical network topology information in a load balancing algorithm.
For example, Gloo may choose to send a point-to-point transfer to a more highly loaded NIC if it results in fewer network hops.
Additionally, Gloo could leverage multiple transports within a single CCL operation, if that resulted in optimal CCL performance.
For example, Gloo could communicate over shared memory with a peer on the same machine, but over RDMA or TCP to a remote peer.
For all these reasons, Gloo is best positioned to load balance across NIC ports, rather than the application/ML framework.
Challenges in Gloo's CPU core utilization
Gloo's approach towards thread management has room for Gloo to do more.
First, Gloo provides synchronous interfaces, and forces applications/ML frameworks to decide which thread processes a given CCL operation.
This means the choice of worker thread for a given CCL op is outside of Gloo's control.
Secondly, by leaving the choice of worker thread up to the application/ML framework, the application/ML framework must also handle any NUMA awareness around input/output data, NIC port, and worker thread.
Finally, Gloo is wholly dependent on the application/ML framework for deciding how worker threads are launched, if they are pinned to specific CPUs, etc.
Each of these characteristics can have major performance implications for Gloo.
With Gloo's current APIs, the key challenge of fully utilizing CPU cores to process CCL ops is left to the application/ML framework.
In PyTorch's ProcessGroupGloo
, each instance spawns two worker threads that round-robin processing of CCL ops.
Since the ProcessGroupGloo
worker threads are scope-bound to a single ProcessGroupGloo
instance, it is possible to result in uneven load distribution where CCL ops on ProcessGroupGloo
instance A are waiting to be executed, even though ProcessGroupGloo
instance B has idle threads.
Additionally, it could be that a given PyTorch application only needs N worker threads to process CCL ops, but has to spawn many more threads than N with the current ProcessGroupGloo
worker thread model.
Since scale-up parallelisms (e.g., tensor parallelism) are typically sensitive to CCL operation latency, Gloo must achieve the lowest possible latency while saturating the available NIC bandwidth.
Other high-performing networking stacks (e.g., DPDK, VPP) have seen that pinning worker threads to dedicated & isolated CPUs is crucial to obtain the best performance.
Avoiding kernel-space context switches, scheduling ticks, and thread scheduling overhead are the key performance benefits from pinning worker threads to isolated CPUs.
Gloo and any applications/ML frameworks should leverage thread pinning to realize these performance benefits.
If worker threads are pinned, then avoiding uneven load distribution becomes important to avoid wasting CPU cycles when a thread could be executing useful work.
As detailed above, thread pinning, NUMA affinity, and thread work allocation are all left to the application/ML framework to do correctly.
However, PyTorch does not currently pin threads, handle NUMA affinity, or implement a complex work scheduler.
Gloo is best positioned to solve these challenges, as it can solve them for all applications/frameworks which utilize Gloo, rather than relying on each app/framework to implement this logic.
Gloo can understand the system hardware topology (NIC <-> NUMA node affinity) to ensure NIC port utilization is NUMA-aware.
Gloo can also leverage the remote peer's system hardware topology (remote NIC <-> NUMA affinity) and optionally traverse a PCI bus to a remote NUMA node before transmitting to the remote peer.
This follows a similar optimization performed by NCCL called PXN (PCI x NVLink) [1].
Gloo is also the best component to utilize dynamic feedback from NIC ports regarding network performance, congestion, etc. within the work scheduling algorithm.
By putting worker thread management (both thread pinning and CCL op assignment) inside of Gloo, Gloo can ensure worker thread utilization is optimal.
Network topologies without all-to-all connectivity between all NICs
Currently, Gloo requires rendezvous to create an all-to-all mesh between all NIC ports on all machines used for a single job.
This requirement comes from the gloo::rendezvous::Context::connectFullMesh
API.
Physical network topologies such as rail-optimized, torus, etc. do not have a physical path between all NICs in the network.
This is a key challenge: Gloo requires all-to-all physical network connectivity.
Only Gloo can solve this challenge - the application/ML framework cannot prevent Gloo from attempting to make use of connections between Device
s which don't exist.
Additionally, this ends up consuming additional resources (sockets, QPs, etc.) than what may be necessary.
If rank A has 2 NICs and rank B has 2 NICs, then NIC 1 on each rank can connect with each other, and NIC 2 can connect with each other.
There isn't a requirement to connect NIC 1 to NIC 2 on either rank.
Challenges with scaling all-to-all connectivity
The same gloo::rendezvous::Context::connectFullMesh
API that causes the previous challenge also causes this one.
Gloo's full mesh connectivity poses a scaling limit for non-TCP transports, such as RDMA.
Compared to the number of TCP sockets that can be connected, RDMA NICs have a limited number of QPs.
Above a certain scale, Gloo's full mesh assumption will exhaust the number of available QPs.
Summary
In summary, Gloo is best positioned to make key decisions around NIC utilization, thread pinning, CCL op scheduling, NUMA awareness, and network topology awareness, but currently leaves these responsibilities to the application/ML framework. Current Gloo APIs limit how much can be solved by modifying the implementation.
Proposed Solution
The proposed solution has changes in both the initialization & the steady state operations. All new APIs will be added under a new gloo::async
namespace.

Gloo initialization
Rather than rely on the application/ML framework to identify devices and create them, we propose adding a gloo::async::init
API which will perform all necessary initialization. This will be called once, and will handle various setup tasks, including:
- initialize logging
- spawn worker threads
- identify available NICs
- understand physical network topology
The behavior of gloo::async::init
can be customized via environment variables to allow users to tweak important aspects.
For example, environment variables may specify which NICs are (dis)allowed, or how many worker threads to spawn & which CPUs to pin the worker threads to.
NICs and worker threads will be global, rather than tied to a specific Context
.
By hiding the details behind a stable API, Gloo can make changes/improvements to initialization code without needing to enable them in every application/framework. For example, the recent work of enabling PyTorch to utilize Gloo's ibverbs backend could have been handled entirely within gloo::async::init
- without needing to update PyTorch.
Additional future features to Gloo, such as network topology discovery/awareness can also be added with minimal friction behind this API.
Gloo datapath/steady state operations
The proposed solution can be broken up into the following sections:
- APIs to be consumed by applications/ML frameworks
- Implementation of these APIs:
- Top-level async server
- Middle-level algorithm layer which implements allreduce/allgather/broadcast/etc. in a transport-agnostic manner
- Transport Mapper - a new transport-agnostic interface to solve challenges in UnboundBuffer
- Bottom-level transport-specific implementation
APIs
All CCL datapath APIs will be asynchronous, with the execution of the CCL operation performed by worker threads internal to Gloo.
This enables Gloo to decide how to best utilize the available CPUs to drive the CCL ops to completion.
NUMA awareness & affinity is handled in one place - within Gloo - and applies for all frameworks/apps which utilize these new APIs.
The CCL APIs will look like:
WorkItem* gloo::async::<Op>(<op-specific-args>);
For example, broadcast will look like:
WorkItem* gloo::async::broadcast(const gloo::async::Context& context, std::byte* buffer, size_t buffer_len, rank_t root);
where buffer is broadcasted to all other ranks if root == context.rank
, or filled with the broadcasted data from the root rank otherwise.
The gloo::async::Context
object is conceptually similar to MPI's communicator, and informs the CCL operation what the current rank is, and what other ranks to include in the CCL op.
There will be one gloo::async::Context
for each ProcessGroupGloo
object in PyTorch.
Importantly - there is no relationship between gloo::async::Context
and any device/NIC ports or worker threads.
As mentioned in the Gloo Initialization section, NIC ports and worker threads are available for use by all Context
s, rather than dedicated to specific ones.
A CCL op for a context may be executed by any worker thread and utilize any/all NICs.
The async functions will return a WorkItem
.
This supports a bool is_complete()
method to enable the application to check if the operation has finished.
It also supports a wait()
methods to block the calling thread until the corresponding CCL operation is complete.
Implementation of each of these APIs will focus on packing the details of the operation into a struct and enqueuing it for a worker thread to execute, using the async server layer.
The calling thread will spend minimal time in these functions, enabling it to get back to executing useful compute within the ML framework/application.
Implementation
The goal of this section is to sketch out the software architecture of how these APIs are implemented.
Specific implementation details such as default values, thread scheduling algorithms, etc. can be discussed in the relevant implementation PR(s).
Top-level Async Server
This layer is responsible for work orchestration/scheduling across worker threads.
When an async API is called, it will create a struct with all the provided parameters and enqueue it onto a work queue.
The worker threads (launched by gloo::async::init
) will poll these work queue(s) and execute the operations they dequeue.
The load balancing logic around which threads will execute which CCL ops is determined solely by this layer within Gloo.
Additionally, the worker thread selection can be influenced/dictated by the data buffer(s) NUMA node(s).
The top-level server can also consume runtime information from lower layers to make more informed decisions around the load distribution on the various threads.
Middle-level Algorithm Layer
This layer decomposes the CCL operation into point-to-point operations.
It will use the participating rank information from the provided gloo::async::Context
to decide which logical topology to use (ring vs bcube vs tree, etc.).
This layer can utilize the physical topology of the participating ranks to determine the most efficient algorithm to run.
It can also use this information when constructing the chosen logical topology out of participating rank IDs.
These steps can be cached to speed up subsequent CCL ops.
Transport Mapper
The middle layer will utilize a new transport-agnostic interface called TransportMapper
.
This mapper is similar to the mapper in NCCLX [2].
It can decide at runtime which available transport to utilize.
A single CCL op may utilize multiple transports if that is advantageous.
One example could be a ring topology where the connection to the left
peer may be over shared memory, but the right
peer could be over TCP or RDMA.
The TransportMapper
provides APIs to perform the following operations in order:
- establish connectivity with peer ranks (prior connections are re-used)
- pre-process memory buffers
- exchange metadata with peers
- exchange data with peers (can be done multiple times, as needed)
- post-process memory buffers
The APIs which communicate directly with a remote peer are assumed to be asynchronous, and provide a TransportRequest
object to track their completion.
For example, one rank can be sending (meta)data to multiple peers at once.
The full TransportMapper
APIs look like:
// class to track an asynchronous transport operation
class TransportRequest;
class TransportMapper {
public:
TransportMapper(rank_t thisRank);
~TransportMapper();
TransportMapper(const TransportMapper&) = delete;
TransportMapper& operator=(const TransportMapper&) = delete;
/**
* Fallible initialization
*/
Status init();
/**
* @brief set up connections between thisRank to destination ranks
* @param dRanks list of dRanks to create connection
* @return Status
*/
Status setupConnection(std::unordered_set<rank_t>& dRanks);
/**
* @brief pre-process a buffer to be used for data transfer.
* @param buf the local buffer to be used during data transfer
* @param len number of bytes of 'buf'
* @param hdl a handle object return by this function for future reference
* @return Status
*/
Status preProcessMemory(const std::byte* buf, std::size_t len, void*& hdl);
/**
* @brief Postprocess a buffer that was used for data transfer if needed
* @param hdl a handle object returned previous when preprocessing the buffer
* @return Status
*/
Status postProcessMemory(void* hdl);
/**
* @brief Asynchronous call to send metadata of local buffer to the
* destination rank. User must call APIs of TransportRequest to ensure
* success/failure
* @param buf the local buffer to be remotely accessed by dRank
* @param mData a handle object returned previous when preprocessing the
* buffer
* @param dRank the rank of the destination in the current communication
* @param req the request object returned to track the progress.
* @return Status
*/
Status sendMetaData(
std::byte* buf,
void* mData,
const rank_t dRank,
std::unique_ptr<TransportRequest>& req);
/**
* @brief Asynchronous call to receive metadata of the remote buffer from the
* destination rank. User must call APIs of TransportRequest to ensure
* success/failure
* @param buf the buffer to be remotely accessed by sRank
* @param mData metadata to access the remote buffer if required
* @param dRank the rank of the destination in the current communication
* @param req the request object returned to track the progress
* @return Status
*/
Status recvMetaData(
std::byte*& buf,
void*& mData,
const rank_t dRank,
std::unique_ptr<TransportRequest>& req);
/**
* @brief Asynchronous call to transfer data to a remote rank
* User must call APIs of TransportRequest to ensure success/failure
* @param sbuf local buffer to read data from
* @param dbuf address of the remote buffer to receive data
* @param len number of bytes
* @param dRank the rank of the destination in the current communication
* @param shdl the handle of the source buffer
* @param mData metadata to access the remote buffer if required
* @param req the request object returned to track the progress
* @return Status
*/
Status transferData(
const std::byte* sbuf,
const std::byte* dbuf,
std::size_t len,
const rank_t dRank,
void* shdl,
void* mData,
std::unique_ptr<TransportRequest>& req);
/**
* @brief Waiting for data transfer from a remote rank
* @param dRank the rank of the remote rank
* @return Status
*/
Status waitDataTransfer(const rank_t dRank);
};
The pre/post-processing of data maps nicely to some transports, such as ibv_[de]reg_mr
from RDMA or pre/post-message synchronizations in shared memory.
Bottom-layer Transport-specific Implementation
This layer is solely focused on how to best transfer (meta) data to a peer.
It is responsible for enumerating & opening all relevant devices, and deciding how to best utilize them.
It implements all of the underlying TransportMapper
APIs with any transport-specific details.
For example, TransportMapper::preProcessMemory
going through the RoCE transport backend will invoke ibv_reg_mr
.
By separating meta data and normal data transfers, the backend can enable these different messages to have different QoS.
This can result in prioritizing small meta data messages, which has been shown to be advantageous when experiencing network congestion [3].
When it comes time to transfer data, it is up to this transport backend to determine how to optimally utilize the available NICs.
This can include creating and load balancing across multiple connections to the same peer.
This load balancing is distinct from the load balancing of the top layer.
A few examples of decisions a given transport backend will make:
- If multiple NIC ports connect this rank to another, which NIC port(s) should be used?
- just one? some, but not all? all of them?
- if multiple CCL ops are happening concurrently, how should those ops share the available bandwidth/NIC ports?
- If the input data to transfer is large, should it be chunked into smaller pieces?
- does the NIC HW require this chunking? e.g., an RDMA port's max message size
- Should the transport backend spawn any additional background threads?
- example: a thread to poll RDMA completions via
ibv_poll_cq
?
- example: a thread to poll RDMA completions via
- If a single NIC port has multiple physical paths between a source and destination, how can traffic pass through those physical paths in parallel?
- example: create multiple RDMA QPs so that each QP has a unique UDP port, enabling ECMP to load balance the traffic across the physical paths
- If a given NIC can only reach one (or some) of the NICs on a remote rank, how should these connections be created?
- If a NIC has limited resources, how should those be utilized?
Alternatives Considered
1. Multi-Port gloo::transport::Device
This approach would hide multiple NIC ports behind a single gloo::transport::Device
object.
The benefits of this approach is that it works well with the existing Gloo codebase, and is a minimal extension to achieve some goals.
Of the 4 goals, this approach would enable utilizing multiple NIC ports and enabling Gloo to utilize more physical network topologies.
This approach has a number of appealing qualities, but still has some downsides compared to the proposed approach.
This alternative will still force the application/ML framework to know about Gloo devices.
The framework has to create them, connect them, etc.
This can be a source of confusion for applications - how many multi port devices should be created? If there are multiple of them, how do they interact? Should the application still attempt to load balance CCL ops over multi-port devices?
It also does not prevent an application from mixing usage of a new multi-port device and a traditional single-port device which share the same underlying NIC port.
This will complicate any load balancing logic within the multi-port device.
In general, we believe the applications/ML frameworks should not concern themselves with the details of underlying NIC hardware.
All knowledge & decisions related to NIC devices should be handled within Gloo, with some inputs taken from either parameters to gloo::async::init
or environment variables.
Additionally, the new TransportMapper
interface avoids a key bottleneck present in the UnboundBuffer
/Buffer
send/recv
APIs. - the TransportMapper
forces the sender to provide the destination virtual address.
This is crucial to avoiding bottlenecks over RDMA, as the destination address is needed to populate an RDMA work request.
Since the UnboundBuffer
/Buffer
APIs are missing this, each send/recv
operation needs to coordinate the destination address before transferring the data via RDMA Write.
The multi-port device approach does not attempt to solve NUMA affinity with respect to worker threads or load balancing of CCL ops across available worker threads.
2. Modifying existing Gloo APIs
This approach is appealing, as it avoids introducing the entire gloo::async
namespace.
However, the desired end state of Gloo is different enough from the current state that attempting to support both in the same APIs/objects would be extremely challenging.
Implementation of the proposed solution will be faster if all new APIs are utilized, with frameworks/applications migrating once.
Backwards Compatibility
The proposed solution defines entirely new application level APIs, so there is no risk of breaking existing applications.
If an application migrates to the new APIs, they must migrate fully - using a mixture of the new APIs and the existing APIs may work, but won't be supported.
Plan To Integrate The Proposed Solution Into Gloo
Once the key objectives and approaches of this RFC reach agreement, the solution can be upstreamed in a phased process.
First, the new async APIs (gloo::async::init
and the per-OP APIs) can be added, alongside the async server.
The datapath APIs can leverage the existing synchronous APIs (ex: gloo::allreduce
) to be executed by Gloo's new worker threads.
This will provide a stable base for applications & frameworks to start to migrate to the new APIs & preserve existing behavior in the datapath.
Simultaneously, the TransportMapper
internal APIs can be finalized to ensure they are appropriate for all transports Gloo wishes to support.
Once the TransportMapper
interface is finalized, the middle layer can start migrating from the UnboundBuffer
APIs to the TransportMapper
interface.
Simultaneously, transport-specific implementation(s) of the TransportMapper
interface can be added, including TCP and RoCE backends.
Once an operation has adopted the TransportMapper
interface, and there is a suitable backend for it, the top level implementation of that operation can migrate to the new implementation.
In summary, the plan is to start with multiple concurrent work streams:
Stream 1:
- upstream new async APIs which leverage an internal worker thread pool.
The worker threads invoke the existing gloo implementations. - update the frameworks/applications of interest to utilize the new async APIs.
Stream 2:
- upstream and finalize the
TransportMapper
interface. - convert CCL ops to the new interface and upstream TCP & RDMA backends
Once the above are both done, then:
- convert the worker threads to invoke the new
TransportMapper
based CCL ops