Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
3f7047e
[dist-rg] adds distributed row-gatherer
MarcelKoch Apr 2, 2024
0c1ffaf
[dist-rg] handle copy to host buffer
MarcelKoch Apr 19, 2024
6031ddc
[dist-rg] use mpi request instead of future
MarcelKoch Apr 30, 2024
85fff10
[dist-rg] adds distributed row-gatherer tests
MarcelKoch Apr 4, 2024
5212fb2
[dist-mat] use row-gatherer
MarcelKoch Apr 30, 2024
a41bc82
[dist-rg] review update:
MarcelKoch Jul 15, 2024
bbbb253
[dist-rg] review updates:
MarcelKoch Oct 23, 2024
ce9fd72
[pgm] use row-gatherer from matrix
MarcelKoch May 28, 2024
bf09845
[dist] review updates:
MarcelKoch Feb 14, 2025
e000040
[core] allow empty diagonal mtx created from array
MarcelKoch Feb 20, 2025
b3e8d5b
[dist] allocate recv buffers only once
MarcelKoch Feb 20, 2025
86676b3
[dist-rg] don't implement apply
MarcelKoch Feb 20, 2025
895a031
[dist-rg] use device exec in tests
MarcelKoch Feb 20, 2025
36f4f94
[dist-rg] remove row gatherer from linop hierarchy
MarcelKoch Feb 20, 2025
34e67eb
[dist-rg] add comment on output vector exec
MarcelKoch Feb 20, 2025
4e5e198
[core] fix temporary clone of segmented array
MarcelKoch Feb 20, 2025
0b597d7
[dist-rg] use correct exec when constructing RG
MarcelKoch Feb 20, 2025
70980ac
[bench] update benchmark tests
MarcelKoch Mar 3, 2025
3581d6d
[dist-rg] add missing half precision dispatch
MarcelKoch Mar 17, 2025
358db33
[dist] use intermediate coll-comm creator function
MarcelKoch Mar 24, 2025
2ef6e24
review updates:
MarcelKoch Mar 24, 2025
8e43f59
[dist] revert coll-comm creator function
MarcelKoch Apr 25, 2025
011dfba
[test] use neighborhood comm only if available and reduce total numbe…
MarcelKoch May 5, 2025
0f67484
fixup! [dist-rg] add missing half precision dispatch
MarcelKoch May 12, 2025
e7178d5
[dist] accept only dist-vectors for row-gather
MarcelKoch May 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions benchmark/test/reference/distributed_solver.profile.stderr
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,6 @@ DEBUG: begin components::convert_idxs_to_ptrs
DEBUG: end components::convert_idxs_to_ptrs
DEBUG: begin components::convert_idxs_to_ptrs
DEBUG: end components::convert_idxs_to_ptrs
DEBUG: begin copy
DEBUG: end copy
DEBUG: begin components::aos_to_soa
DEBUG: end components::aos_to_soa
DEBUG: begin dense::fill
Expand Down
2 changes: 0 additions & 2 deletions benchmark/test/reference/spmv_distributed.profile.stderr
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,6 @@ DEBUG: begin components::convert_idxs_to_ptrs
DEBUG: end components::convert_idxs_to_ptrs
DEBUG: begin components::convert_idxs_to_ptrs
DEBUG: end components::convert_idxs_to_ptrs
DEBUG: begin copy
DEBUG: end copy
DEBUG: begin copy(<typename>)
DEBUG: begin dense::copy
DEBUG: end dense::copy
Expand Down
1 change: 1 addition & 0 deletions core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ if(GINKGO_BUILD_MPI)
distributed/matrix.cpp
distributed/neighborhood_communicator.cpp
distributed/partition_helpers.cpp
distributed/row_gatherer.cpp
distributed/vector.cpp
distributed/preconditioner/schwarz.cpp
)
Expand Down
8 changes: 4 additions & 4 deletions core/distributed/dense_communicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,12 @@ request DenseCommunicator::i_all_to_all_v_impl(


std::unique_ptr<CollectiveCommunicator>
DenseCommunicator::create_with_same_type(
communicator base, const distributed::index_map_variant& imap) const
DenseCommunicator::create_with_same_type(communicator base,
index_map_ptr imap) const
{
return std::visit(
[base](const auto& imap) {
return std::make_unique<DenseCommunicator>(base, imap);
[base](const auto* imap) {
return std::make_unique<DenseCommunicator>(base, *imap);
},
imap);
}
Expand Down
266 changes: 80 additions & 186 deletions core/distributed/matrix.cpp

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions core/distributed/neighborhood_communicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,13 @@ request NeighborhoodCommunicator::i_all_to_all_v_impl(


std::unique_ptr<CollectiveCommunicator>
NeighborhoodCommunicator::create_with_same_type(
communicator base, const distributed::index_map_variant& imap) const
NeighborhoodCommunicator::create_with_same_type(communicator base,
index_map_ptr imap) const
{
return std::visit(
[base](const auto& imap) {
[base](const auto* imap) {
return std::unique_ptr<CollectiveCommunicator>(
new NeighborhoodCommunicator(base, imap));
new NeighborhoodCommunicator(base, *imap));
},
imap);
}
Expand Down
296 changes: 296 additions & 0 deletions core/distributed/row_gatherer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,296 @@
// SPDX-FileCopyrightText: 2017 - 2025 The Ginkgo authors
//
// SPDX-License-Identifier: BSD-3-Clause

#include "ginkgo/core/distributed/row_gatherer.hpp"

#include <ginkgo/core/base/dense_cache.hpp>
#include <ginkgo/core/base/precision_dispatch.hpp>
#include <ginkgo/core/distributed/dense_communicator.hpp>
#include <ginkgo/core/distributed/neighborhood_communicator.hpp>
#include <ginkgo/core/matrix/dense.hpp>

#include "core/base/dispatch_helper.hpp"

namespace gko {
namespace experimental {
namespace distributed {


#if GINKGO_HAVE_OPENMPI_PRE_4_1_X
using DefaultCollComm = mpi::DenseCommunicator;
#else
using DefaultCollComm = mpi::NeighborhoodCommunicator;
#endif


template <typename LocalIndexType>
mpi::request RowGatherer<LocalIndexType>::apply_async(ptr_param<const LinOp> b,
ptr_param<LinOp> x) const
{
int is_inactive;
MPI_Status status;
GKO_ASSERT_NO_MPI_ERRORS(
MPI_Request_get_status(req_listener_, &is_inactive, &status));
// This is untestable. Some processes might complete the previous request
// while others don't, so it's impossible to create a predictable behavior
// for a test.
GKO_THROW_IF_INVALID(is_inactive,
"Tried to call RowGatherer::apply_async while there "
"is already an active communication. Please use the "
"overload with a workspace to handle multiple "
"connections.");

auto req = apply_async(b, x, send_workspace_);
req_listener_ = *req.get();
return req;
}


template <typename LocalIndexType>
mpi::request RowGatherer<LocalIndexType>::apply_async(
ptr_param<const LinOp> b, ptr_param<LinOp> x, array<char>& workspace) const
{
mpi::request req;

auto exec = this->get_executor();
auto use_host_buffer =
mpi::requires_host_buffer(exec, coll_comm_->get_base_communicator());
auto mpi_exec = use_host_buffer ? exec->get_master() : exec;

GKO_THROW_IF_INVALID(
!use_host_buffer || mpi_exec->memory_accessible(x->get_executor()),
"The receive buffer uses device memory, but MPI support of device "
"memory is not available or host buffer were explicitly requested. "
"Please provide a host buffer or enable MPI support for device "
"memory.");

// dispatch global vector
run<Vector,
#if GINKGO_ENABLE_HALF
half, std::complex<half>,
#endif
#if GINKGO_ENABLE_BFLOAT16
bfloat16, std::complex<bfloat16>,
#endif
double, float, std::complex<double>, std::complex<float>>(
make_temporary_clone(exec, b).get(), [&](const auto* b_global) {
using ValueType =
typename std::decay_t<decltype(*b_global)>::value_type;
// dispatch local vector with the same precision as the global
// vector
distributed::precision_dispatch<ValueType>(
[&](auto* x_global) {
auto b_local = b_global->get_local_vector();

dim<2> send_size(coll_comm_->get_send_size(),
b_local->get_size()[1]);
auto send_size_in_bytes =
sizeof(ValueType) * send_size[0] * send_size[1];
if (!workspace.get_executor() ||
!mpi_exec->memory_accessible(
workspace.get_executor()) ||
send_size_in_bytes > workspace.get_size()) {
workspace = array<char>(
mpi_exec,
sizeof(ValueType) * send_size[0] * send_size[1]);
}
auto send_buffer = matrix::Dense<ValueType>::create(
mpi_exec, send_size,
make_array_view(
mpi_exec, send_size[0] * send_size[1],
reinterpret_cast<ValueType*>(workspace.get_data())),
send_size[1]);
b_local->row_gather(&send_idxs_, send_buffer);

auto recv_ptr = x_global->get_local_values();
auto send_ptr = send_buffer->get_values();

b_local->get_executor()->synchronize();
mpi::contiguous_type type(
b_local->get_size()[1],
mpi::type_impl<ValueType>::get_type());
req = coll_comm_->i_all_to_all_v(
mpi_exec, send_ptr, type.get(), recv_ptr, type.get());
Comment on lines +113 to +114
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

send_buffer might be on the host but the recv_ptr(x_local) might be on the device

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a check above to ensure that the memory space of the recv buffer is accessible from the mpi executor. So if GPU aware MPI is used, it should work (even if send buffer is on the host and recv buffer in the device or vice versa). Otherwise an exception will be thrown.

},
x.get());
});
return req;
}


template <typename LocalIndexType>
dim<2> RowGatherer<LocalIndexType>::get_size() const
{
return size_;
}


template <typename LocalIndexType>
std::shared_ptr<const mpi::CollectiveCommunicator>
RowGatherer<LocalIndexType>::get_collective_communicator() const
{
return coll_comm_;
}


template <typename T>
T global_add(std::shared_ptr<const Executor> exec,
const mpi::communicator& comm, const T& value)
{
T result;
comm.all_reduce(std::move(exec), &value, &result, 1, MPI_SUM);
return result;
}


template <typename LocalIndexType>
template <typename GlobalIndexType>
RowGatherer<LocalIndexType>::RowGatherer(
std::shared_ptr<const Executor> exec,
std::shared_ptr<const mpi::CollectiveCommunicator> coll_comm,
const index_map<LocalIndexType, GlobalIndexType>& imap)
: EnablePolymorphicObject<RowGatherer>(exec),
DistributedBase(coll_comm->get_base_communicator()),
size_(dim<2>{global_add(exec, coll_comm->get_base_communicator(),
imap.get_non_local_size()),
imap.get_global_size()}),
coll_comm_(std::move(coll_comm)),
send_idxs_(exec),
send_workspace_(exec),
req_listener_(MPI_REQUEST_NULL)
{
// check that the coll_comm_ and imap have the same recv size
// the same check for the send size is not possible, since the
// imap doesn't store send indices
GKO_THROW_IF_INVALID(
coll_comm_->get_recv_size() == imap.get_non_local_size(),
"The collective communicator doesn't match the index map.");

auto comm = coll_comm_->get_base_communicator();
auto inverse_comm = coll_comm_->create_inverse();

auto mpi_exec =
mpi::requires_host_buffer(exec, coll_comm_->get_base_communicator())
? exec->get_master()
: exec;
auto temp_remote_local_idxs =
make_temporary_clone(mpi_exec, &imap.get_remote_local_idxs());

send_idxs_.set_executor(mpi_exec);
send_idxs_.resize_and_reset(coll_comm_->get_send_size());
inverse_comm
->i_all_to_all_v(exec, temp_remote_local_idxs->get_const_flat_data(),
send_idxs_.get_data())
.wait();
send_idxs_.set_executor(exec);
}


template <typename LocalIndexType>
const LocalIndexType* RowGatherer<LocalIndexType>::get_const_send_idxs() const
{
return send_idxs_.get_const_data();
}


template <typename LocalIndexType>
size_type RowGatherer<LocalIndexType>::get_num_send_idxs() const
{
return send_idxs_.get_size();
}


template <typename LocalIndexType>
std::unique_ptr<RowGatherer<LocalIndexType>>
RowGatherer<LocalIndexType>::create(std::shared_ptr<const Executor> exec,
mpi::communicator comm)
{
return std::unique_ptr<RowGatherer>(new RowGatherer(std::move(exec), comm));
}


template <typename LocalIndexType>
RowGatherer<LocalIndexType>::RowGatherer(std::shared_ptr<const Executor> exec,
mpi::communicator comm)
: EnablePolymorphicObject<RowGatherer>(exec),
DistributedBase(comm),
coll_comm_(std::make_shared<DefaultCollComm>(comm)),
send_idxs_(exec),
send_workspace_(exec),
req_listener_(MPI_REQUEST_NULL)
{}


template <typename LocalIndexType>
RowGatherer<LocalIndexType>::RowGatherer(RowGatherer&& o) noexcept
: EnablePolymorphicObject<RowGatherer>(o.get_executor()),
DistributedBase(o.get_communicator()),
send_idxs_(o.get_executor()),
send_workspace_(o.get_executor()),
req_listener_(MPI_REQUEST_NULL)
{
*this = std::move(o);
}


template <typename LocalIndexType>
RowGatherer<LocalIndexType>& RowGatherer<LocalIndexType>::operator=(
const RowGatherer& o)
{
if (this != &o) {
size_ = o.get_size();
coll_comm_ = o.coll_comm_;
send_idxs_ = o.send_idxs_;
}
return *this;
}


template <typename LocalIndexType>
RowGatherer<LocalIndexType>& RowGatherer<LocalIndexType>::operator=(
RowGatherer&& o)
{
if (this != &o) {
size_ = std::exchange(o.size_, dim<2>{});
coll_comm_ = std::exchange(
o.coll_comm_,
std::make_shared<DefaultCollComm>(o.get_communicator()));
send_idxs_ = std::move(o.send_idxs_);
send_workspace_ = std::move(o.send_workspace_);
req_listener_ = std::exchange(o.req_listener_, MPI_REQUEST_NULL);
}
return *this;
}


template <typename LocalIndexType>
RowGatherer<LocalIndexType>::RowGatherer(const RowGatherer& o)
: EnablePolymorphicObject<RowGatherer>(o.get_executor()),
DistributedBase(o.get_communicator()),
send_idxs_(o.get_executor())
{
*this = o;
}


#define GKO_DECLARE_ROW_GATHERER(_itype) class RowGatherer<_itype>

GKO_INSTANTIATE_FOR_EACH_INDEX_TYPE(GKO_DECLARE_ROW_GATHERER);

#undef GKO_DECLARE_ROW_GATHERER


#define GKO_DECLARE_ROW_GATHERER_CONSTRUCTOR(_ltype, _gtype) \
RowGatherer<_ltype>::RowGatherer( \
std::shared_ptr<const Executor> exec, \
std::shared_ptr<const mpi::CollectiveCommunicator> coll_comm, \
const index_map<_ltype, _gtype>& imap)

GKO_INSTANTIATE_FOR_EACH_LOCAL_GLOBAL_INDEX_TYPE(
GKO_DECLARE_ROW_GATHERER_CONSTRUCTOR);

#undef GKO_DECLARE_ROW_GATHERER_CONSTRUCTOR
} // namespace distributed
} // namespace experimental
} // namespace gko
2 changes: 1 addition & 1 deletion core/matrix/diagonal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ Diagonal<ValueType>::Diagonal(std::shared_ptr<const Executor> exec,
: EnableLinOp<Diagonal>(exec, dim<2>(size)),
values_{exec, std::move(values)}
{
GKO_ENSURE_IN_BOUNDS(size - 1, values_.get_size());
GKO_ENSURE_COMPATIBLE_BOUNDS(size, values_.get_size());
}


Expand Down
Loading