Skip to content

[RFC] Async Gloo APIs #463

@nlbrown2

Description

@nlbrown2

Async Gloo APIs RFC

Overview

This RFC proposes a few key revisions to Gloo's design. It introduces async interfaces to Gloo, to be easily consumed by ML frameworks such as PyTorch. It shifts key responsibilities from the application to Gloo, enabling better decision making at runtime. Finally, it proposes a new transport-agnostic interface to be used by Gloo's algorithms.

The RFC introduces its goals, how Gloo's current APIs make achieving the goals difficult/impossible, the proposed solution, and alternatives considered.

Goals

This RFC proposes that Gloo should have first-class support for:

  • having a single CCL operation use multiple NIC ports
  • deciding which NIC port(s) to use for a given CCL operation
  • fully utilizing allocated CPU cores
  • network topologies which do not have a physical path between all end-points (NIC ports) in the network
  • transports which cannot scale to all-to-all connectivity between all NIC ports

"First-class support" means achieving these goals with Gloo is easy and intuitive. A user/application/ML framework should not resort to work-arounds to achieve these goals.

Problem Statement

Gloo's current APIs currently lack or conflict with providing first class support for the above goals. These APIs are public APIs, so changing their behavior will be difficult. This will be demonstrated for each goal.

Challenges enabling Gloo to use multiple NIC ports for a single CCL operation

Gloo's gloo::transport::Device provides a transport-agnostic abstraction over a single NIC port. Derived classes implement transport specific behavior to utilize the NIC.

Note: we have considered a gloo::transport::Device which abstracts over multiple physical NIC ports.
This is discussed in Multi-Port gloo::transport::Device.
The remainder of this section discusses a gloo::transport::Device which abstracts over a single NIC port.

A given gloo::Context contains exactly one gloo::transport::Device.
Each CCL operation (ex: gloo::allreduce) operates on a single gloo::Context, and uses the context's gloo::transport::Device for all communication for that operation.
Today, this results in a key challenge: a single CCL operation can only utilize a single NIC port.

Since gloo::Context is part of the public API, it cannot be easily changed to leverage multiple gloo::transport::Devices.

Challenges enabling Gloo to choose which NIC ports to utilize for a given CCL operation

Currently, the application/ML framework will choose which NIC port must be used to run a given CCL operation (op).
However, the application/ML framework has a limited understanding of NIC port load, or how much NIC bandwidth a given CCL op will require.
Therefore, the scheduling algorithms which could be employed have limited insight into CCL performance, and cannot respond dynamically to network conditions.
This results in another key challenge: Gloo relies on the application/ML framework for key decisions around NIC ports, including choosing which NIC port to use & how to load balance across them.

Gloo is responsible for decomposing collective operations to point-to-point data transfers.
By tracking the point-to-point transfers currently underway and/or leveraging NIC statistics, Gloo can understand the current load on each NIC port.
Gloo can also know ahead of time how much load any given CCL operation will generate.
Gloo can also most effectively utilize physical network topology information in a load balancing algorithm.
For example, Gloo may choose to send a point-to-point transfer to a more highly loaded NIC if it results in fewer network hops.
Additionally, Gloo could leverage multiple transports within a single CCL operation, if that resulted in optimal CCL performance.
For example, Gloo could communicate over shared memory with a peer on the same machine, but over RDMA or TCP to a remote peer.

For all these reasons, Gloo is best positioned to load balance across NIC ports, rather than the application/ML framework.

Challenges in Gloo's CPU core utilization

Gloo's approach towards thread management has room for Gloo to do more.

First, Gloo provides synchronous interfaces, and forces applications/ML frameworks to decide which thread processes a given CCL operation.
This means the choice of worker thread for a given CCL op is outside of Gloo's control.
Secondly, by leaving the choice of worker thread up to the application/ML framework, the application/ML framework must also handle any NUMA awareness around input/output data, NIC port, and worker thread.
Finally, Gloo is wholly dependent on the application/ML framework for deciding how worker threads are launched, if they are pinned to specific CPUs, etc.
Each of these characteristics can have major performance implications for Gloo.
With Gloo's current APIs, the key challenge of fully utilizing CPU cores to process CCL ops is left to the application/ML framework.

In PyTorch's ProcessGroupGloo, each instance spawns two worker threads that round-robin processing of CCL ops.
Since the ProcessGroupGloo worker threads are scope-bound to a single ProcessGroupGloo instance, it is possible to result in uneven load distribution where CCL ops on ProcessGroupGloo instance A are waiting to be executed, even though ProcessGroupGloo instance B has idle threads.
Additionally, it could be that a given PyTorch application only needs N worker threads to process CCL ops, but has to spawn many more threads than N with the current ProcessGroupGloo worker thread model.

Since scale-up parallelisms (e.g., tensor parallelism) are typically sensitive to CCL operation latency, Gloo must achieve the lowest possible latency while saturating the available NIC bandwidth.
Other high-performing networking stacks (e.g., DPDK, VPP) have seen that pinning worker threads to dedicated & isolated CPUs is crucial to obtain the best performance.
Avoiding kernel-space context switches, scheduling ticks, and thread scheduling overhead are the key performance benefits from pinning worker threads to isolated CPUs.
Gloo and any applications/ML frameworks should leverage thread pinning to realize these performance benefits.
If worker threads are pinned, then avoiding uneven load distribution becomes important to avoid wasting CPU cycles when a thread could be executing useful work.

As detailed above, thread pinning, NUMA affinity, and thread work allocation are all left to the application/ML framework to do correctly.
However, PyTorch does not currently pin threads, handle NUMA affinity, or implement a complex work scheduler.
Gloo is best positioned to solve these challenges, as it can solve them for all applications/frameworks which utilize Gloo, rather than relying on each app/framework to implement this logic.
Gloo can understand the system hardware topology (NIC <-> NUMA node affinity) to ensure NIC port utilization is NUMA-aware.
Gloo can also leverage the remote peer's system hardware topology (remote NIC <-> NUMA affinity) and optionally traverse a PCI bus to a remote NUMA node before transmitting to the remote peer.
This follows a similar optimization performed by NCCL called PXN (PCI x NVLink) [1].
Gloo is also the best component to utilize dynamic feedback from NIC ports regarding network performance, congestion, etc. within the work scheduling algorithm.
By putting worker thread management (both thread pinning and CCL op assignment) inside of Gloo, Gloo can ensure worker thread utilization is optimal.

Network topologies without all-to-all connectivity between all NICs

Currently, Gloo requires rendezvous to create an all-to-all mesh between all NIC ports on all machines used for a single job.
This requirement comes from the gloo::rendezvous::Context::connectFullMesh API.

Physical network topologies such as rail-optimized, torus, etc. do not have a physical path between all NICs in the network.
This is a key challenge: Gloo requires all-to-all physical network connectivity.
Only Gloo can solve this challenge - the application/ML framework cannot prevent Gloo from attempting to make use of connections between Devices which don't exist.

Additionally, this ends up consuming additional resources (sockets, QPs, etc.) than what may be necessary.
If rank A has 2 NICs and rank B has 2 NICs, then NIC 1 on each rank can connect with each other, and NIC 2 can connect with each other.
There isn't a requirement to connect NIC 1 to NIC 2 on either rank.

Challenges with scaling all-to-all connectivity

The same gloo::rendezvous::Context::connectFullMesh API that causes the previous challenge also causes this one.
Gloo's full mesh connectivity poses a scaling limit for non-TCP transports, such as RDMA.
Compared to the number of TCP sockets that can be connected, RDMA NICs have a limited number of QPs.
Above a certain scale, Gloo's full mesh assumption will exhaust the number of available QPs.

Summary

In summary, Gloo is best positioned to make key decisions around NIC utilization, thread pinning, CCL op scheduling, NUMA awareness, and network topology awareness, but currently leaves these responsibilities to the application/ML framework. Current Gloo APIs limit how much can be solved by modifying the implementation.

Proposed Solution

The proposed solution has changes in both the initialization & the steady state operations. All new APIs will be added under a new gloo::async namespace.

Image

Gloo initialization

Rather than rely on the application/ML framework to identify devices and create them, we propose adding a gloo::async::init API which will perform all necessary initialization. This will be called once, and will handle various setup tasks, including:

  • initialize logging
  • spawn worker threads
  • identify available NICs
  • understand physical network topology

The behavior of gloo::async::init can be customized via environment variables to allow users to tweak important aspects.
For example, environment variables may specify which NICs are (dis)allowed, or how many worker threads to spawn & which CPUs to pin the worker threads to.
NICs and worker threads will be global, rather than tied to a specific Context.

By hiding the details behind a stable API, Gloo can make changes/improvements to initialization code without needing to enable them in every application/framework. For example, the recent work of enabling PyTorch to utilize Gloo's ibverbs backend could have been handled entirely within gloo::async::init - without needing to update PyTorch.

Additional future features to Gloo, such as network topology discovery/awareness can also be added with minimal friction behind this API.

Gloo datapath/steady state operations

The proposed solution can be broken up into the following sections:

  • APIs to be consumed by applications/ML frameworks
  • Implementation of these APIs:
    • Top-level async server
    • Middle-level algorithm layer which implements allreduce/allgather/broadcast/etc. in a transport-agnostic manner
    • Transport Mapper - a new transport-agnostic interface to solve challenges in UnboundBuffer
    • Bottom-level transport-specific implementation

APIs

All CCL datapath APIs will be asynchronous, with the execution of the CCL operation performed by worker threads internal to Gloo.
This enables Gloo to decide how to best utilize the available CPUs to drive the CCL ops to completion.
NUMA awareness & affinity is handled in one place - within Gloo - and applies for all frameworks/apps which utilize these new APIs.
The CCL APIs will look like:

WorkItem* gloo::async::<Op>(<op-specific-args>);

For example, broadcast will look like:

WorkItem* gloo::async::broadcast(const gloo::async::Context& context, std::byte* buffer, size_t buffer_len, rank_t root);

where buffer is broadcasted to all other ranks if root == context.rank, or filled with the broadcasted data from the root rank otherwise.

The gloo::async::Context object is conceptually similar to MPI's communicator, and informs the CCL operation what the current rank is, and what other ranks to include in the CCL op.
There will be one gloo::async::Context for each ProcessGroupGloo object in PyTorch.
Importantly - there is no relationship between gloo::async::Context and any device/NIC ports or worker threads.
As mentioned in the Gloo Initialization section, NIC ports and worker threads are available for use by all Contexts, rather than dedicated to specific ones.
A CCL op for a context may be executed by any worker thread and utilize any/all NICs.

The async functions will return a WorkItem.
This supports a bool is_complete() method to enable the application to check if the operation has finished.
It also supports a wait() methods to block the calling thread until the corresponding CCL operation is complete.

Implementation of each of these APIs will focus on packing the details of the operation into a struct and enqueuing it for a worker thread to execute, using the async server layer.
The calling thread will spend minimal time in these functions, enabling it to get back to executing useful compute within the ML framework/application.

Implementation

The goal of this section is to sketch out the software architecture of how these APIs are implemented.
Specific implementation details such as default values, thread scheduling algorithms, etc. can be discussed in the relevant implementation PR(s).

Top-level Async Server

This layer is responsible for work orchestration/scheduling across worker threads.
When an async API is called, it will create a struct with all the provided parameters and enqueue it onto a work queue.
The worker threads (launched by gloo::async::init) will poll these work queue(s) and execute the operations they dequeue.

The load balancing logic around which threads will execute which CCL ops is determined solely by this layer within Gloo.
Additionally, the worker thread selection can be influenced/dictated by the data buffer(s) NUMA node(s).
The top-level server can also consume runtime information from lower layers to make more informed decisions around the load distribution on the various threads.

Middle-level Algorithm Layer

This layer decomposes the CCL operation into point-to-point operations.
It will use the participating rank information from the provided gloo::async::Context to decide which logical topology to use (ring vs bcube vs tree, etc.).
This layer can utilize the physical topology of the participating ranks to determine the most efficient algorithm to run.
It can also use this information when constructing the chosen logical topology out of participating rank IDs.
These steps can be cached to speed up subsequent CCL ops.

Transport Mapper

The middle layer will utilize a new transport-agnostic interface called TransportMapper.
This mapper is similar to the mapper in NCCLX [2].
It can decide at runtime which available transport to utilize.
A single CCL op may utilize multiple transports if that is advantageous.
One example could be a ring topology where the connection to the left peer may be over shared memory, but the right peer could be over TCP or RDMA.

The TransportMapper provides APIs to perform the following operations in order:

  1. establish connectivity with peer ranks (prior connections are re-used)
  2. pre-process memory buffers
  3. exchange metadata with peers
  4. exchange data with peers (can be done multiple times, as needed)
  5. post-process memory buffers

The APIs which communicate directly with a remote peer are assumed to be asynchronous, and provide a TransportRequest object to track their completion.
For example, one rank can be sending (meta)data to multiple peers at once.
The full TransportMapper APIs look like:

// class to track an asynchronous transport operation
class TransportRequest;
class TransportMapper {
 public:
  TransportMapper(rank_t thisRank);
  ~TransportMapper();

  TransportMapper(const TransportMapper&) = delete;
  TransportMapper& operator=(const TransportMapper&) = delete;

  /**
   * Fallible initialization
   */
  Status init();

  /**
   * @brief set up connections between thisRank to destination ranks
   * @param dRanks list of dRanks to create connection
   * @return Status
   */
  Status setupConnection(std::unordered_set<rank_t>& dRanks);

  /**
   * @brief pre-process a buffer to be used for data transfer.
   * @param buf the local buffer to be used during data transfer
   * @param len number of bytes of 'buf'
   * @param hdl a handle object return by this function for future reference
   * @return Status
   */
  Status preProcessMemory(const std::byte* buf, std::size_t len, void*& hdl);

  /**
   * @brief Postprocess a buffer that was used for data transfer if needed
   * @param hdl a handle object returned previous when preprocessing the buffer
   * @return Status
   */
  Status postProcessMemory(void* hdl);
  /**
   * @brief Asynchronous call to send metadata of local buffer to the
   * destination rank. User must call APIs of TransportRequest to ensure
   * success/failure
   * @param buf the local buffer to be remotely accessed by dRank
   * @param mData a handle object returned previous when preprocessing the
   * buffer
   * @param dRank the rank of the destination in the current communication
   * @param req the request object returned to track the progress.
   * @return Status
   */
  Status sendMetaData(
      std::byte* buf,
      void* mData,
      const rank_t dRank,
      std::unique_ptr<TransportRequest>& req);
  /**
   * @brief Asynchronous call to receive metadata of the remote buffer from the
   * destination rank. User must call APIs of TransportRequest to ensure
   * success/failure
   * @param buf the  buffer to be remotely accessed by sRank
   * @param mData metadata to access the remote buffer if required
   * @param dRank the rank of the destination in the current communication
   * @param req the request object returned to track the progress
   * @return Status
   */
  Status recvMetaData(
      std::byte*& buf,
      void*& mData,
      const rank_t dRank,
      std::unique_ptr<TransportRequest>& req);
  /**
   * @brief Asynchronous call to transfer data to a remote rank
   *        User must call APIs of TransportRequest to ensure success/failure
   * @param sbuf local buffer to read data from
   * @param dbuf address of the remote buffer to receive data
   * @param len number of bytes
   * @param dRank the rank of the destination in the current communication
   * @param shdl the handle of the source buffer
   * @param mData metadata to access the remote buffer if required
   * @param req the request object returned to track the progress
   * @return Status
   */
  Status transferData(
      const std::byte* sbuf,
      const std::byte* dbuf,
      std::size_t len,
      const rank_t dRank,
      void* shdl,
      void* mData,
      std::unique_ptr<TransportRequest>& req);
  /**
   * @brief Waiting for data transfer from a remote rank
   * @param dRank  the rank of the remote rank
   * @return Status
   */
  Status waitDataTransfer(const rank_t dRank);
};

The pre/post-processing of data maps nicely to some transports, such as ibv_[de]reg_mr from RDMA or pre/post-message synchronizations in shared memory.

Bottom-layer Transport-specific Implementation

This layer is solely focused on how to best transfer (meta) data to a peer.
It is responsible for enumerating & opening all relevant devices, and deciding how to best utilize them.
It implements all of the underlying TransportMapper APIs with any transport-specific details.
For example, TransportMapper::preProcessMemory going through the RoCE transport backend will invoke ibv_reg_mr.

By separating meta data and normal data transfers, the backend can enable these different messages to have different QoS.
This can result in prioritizing small meta data messages, which has been shown to be advantageous when experiencing network congestion [3].

When it comes time to transfer data, it is up to this transport backend to determine how to optimally utilize the available NICs.
This can include creating and load balancing across multiple connections to the same peer.
This load balancing is distinct from the load balancing of the top layer.
A few examples of decisions a given transport backend will make:

  • If multiple NIC ports connect this rank to another, which NIC port(s) should be used?
    • just one? some, but not all? all of them?
    • if multiple CCL ops are happening concurrently, how should those ops share the available bandwidth/NIC ports?
  • If the input data to transfer is large, should it be chunked into smaller pieces?
    • does the NIC HW require this chunking? e.g., an RDMA port's max message size
  • Should the transport backend spawn any additional background threads?
    • example: a thread to poll RDMA completions via ibv_poll_cq?
  • If a single NIC port has multiple physical paths between a source and destination, how can traffic pass through those physical paths in parallel?
    • example: create multiple RDMA QPs so that each QP has a unique UDP port, enabling ECMP to load balance the traffic across the physical paths
  • If a given NIC can only reach one (or some) of the NICs on a remote rank, how should these connections be created?
  • If a NIC has limited resources, how should those be utilized?

Alternatives Considered

1. Multi-Port gloo::transport::Device

This approach would hide multiple NIC ports behind a single gloo::transport::Device object.
The benefits of this approach is that it works well with the existing Gloo codebase, and is a minimal extension to achieve some goals.

Of the 4 goals, this approach would enable utilizing multiple NIC ports and enabling Gloo to utilize more physical network topologies.

This approach has a number of appealing qualities, but still has some downsides compared to the proposed approach.
This alternative will still force the application/ML framework to know about Gloo devices.
The framework has to create them, connect them, etc.
This can be a source of confusion for applications - how many multi port devices should be created? If there are multiple of them, how do they interact? Should the application still attempt to load balance CCL ops over multi-port devices?

It also does not prevent an application from mixing usage of a new multi-port device and a traditional single-port device which share the same underlying NIC port.
This will complicate any load balancing logic within the multi-port device.

In general, we believe the applications/ML frameworks should not concern themselves with the details of underlying NIC hardware.
All knowledge & decisions related to NIC devices should be handled within Gloo, with some inputs taken from either parameters to gloo::async::init or environment variables.

Additionally, the new TransportMapper interface avoids a key bottleneck present in the UnboundBuffer/Buffer send/recv APIs. - the TransportMapper forces the sender to provide the destination virtual address.
This is crucial to avoiding bottlenecks over RDMA, as the destination address is needed to populate an RDMA work request.
Since the UnboundBuffer/Buffer APIs are missing this, each send/recv operation needs to coordinate the destination address before transferring the data via RDMA Write.

The multi-port device approach does not attempt to solve NUMA affinity with respect to worker threads or load balancing of CCL ops across available worker threads.

2. Modifying existing Gloo APIs

This approach is appealing, as it avoids introducing the entire gloo::async namespace.
However, the desired end state of Gloo is different enough from the current state that attempting to support both in the same APIs/objects would be extremely challenging.

Implementation of the proposed solution will be faster if all new APIs are utilized, with frameworks/applications migrating once.

Backwards Compatibility

The proposed solution defines entirely new application level APIs, so there is no risk of breaking existing applications.
If an application migrates to the new APIs, they must migrate fully - using a mixture of the new APIs and the existing APIs may work, but won't be supported.

Plan To Integrate The Proposed Solution Into Gloo

Once the key objectives and approaches of this RFC reach agreement, the solution can be upstreamed in a phased process.

First, the new async APIs (gloo::async::init and the per-OP APIs) can be added, alongside the async server.
The datapath APIs can leverage the existing synchronous APIs (ex: gloo::allreduce) to be executed by Gloo's new worker threads.

This will provide a stable base for applications & frameworks to start to migrate to the new APIs & preserve existing behavior in the datapath.

Simultaneously, the TransportMapper internal APIs can be finalized to ensure they are appropriate for all transports Gloo wishes to support.

Once the TransportMapper interface is finalized, the middle layer can start migrating from the UnboundBufferAPIs to the TransportMapper interface.
Simultaneously, transport-specific implementation(s) of the TransportMapper interface can be added, including TCP and RoCE backends.

Once an operation has adopted the TransportMapper interface, and there is a suitable backend for it, the top level implementation of that operation can migrate to the new implementation.

In summary, the plan is to start with multiple concurrent work streams:

Stream 1:

  1. upstream new async APIs which leverage an internal worker thread pool.
    The worker threads invoke the existing gloo implementations.
  2. update the frameworks/applications of interest to utilize the new async APIs.

Stream 2:

  1. upstream and finalize the TransportMapper interface.
  2. convert CCL ops to the new interface and upstream TCP & RDMA backends

Once the above are both done, then:

  1. convert the worker threads to invoke the new TransportMapper based CCL ops

References

  1. NCCL PXN
  2. NCCLX Source Code
  3. RDMA over Ethernet for Distributed AI Training at Meta Scale

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions