diff --git a/benchmark/test/reference/distributed_solver.profile.stderr b/benchmark/test/reference/distributed_solver.profile.stderr index dca62ddff33..64782f7fd05 100644 --- a/benchmark/test/reference/distributed_solver.profile.stderr +++ b/benchmark/test/reference/distributed_solver.profile.stderr @@ -100,8 +100,6 @@ DEBUG: begin components::convert_idxs_to_ptrs DEBUG: end components::convert_idxs_to_ptrs DEBUG: begin components::convert_idxs_to_ptrs DEBUG: end components::convert_idxs_to_ptrs -DEBUG: begin copy -DEBUG: end copy DEBUG: begin components::aos_to_soa DEBUG: end components::aos_to_soa DEBUG: begin dense::fill diff --git a/benchmark/test/reference/spmv_distributed.profile.stderr b/benchmark/test/reference/spmv_distributed.profile.stderr index 1931cd2030e..09492d6d9c5 100644 --- a/benchmark/test/reference/spmv_distributed.profile.stderr +++ b/benchmark/test/reference/spmv_distributed.profile.stderr @@ -116,8 +116,6 @@ DEBUG: begin components::convert_idxs_to_ptrs DEBUG: end components::convert_idxs_to_ptrs DEBUG: begin components::convert_idxs_to_ptrs DEBUG: end components::convert_idxs_to_ptrs -DEBUG: begin copy -DEBUG: end copy DEBUG: begin copy() DEBUG: begin dense::copy DEBUG: end dense::copy diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index 8c915c1f7ff..b7342bb03d8 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -154,6 +154,7 @@ if(GINKGO_BUILD_MPI) distributed/matrix.cpp distributed/neighborhood_communicator.cpp distributed/partition_helpers.cpp + distributed/row_gatherer.cpp distributed/vector.cpp distributed/preconditioner/schwarz.cpp ) diff --git a/core/distributed/dense_communicator.cpp b/core/distributed/dense_communicator.cpp index 2957bed0d46..5424a1ac9c7 100644 --- a/core/distributed/dense_communicator.cpp +++ b/core/distributed/dense_communicator.cpp @@ -111,12 +111,12 @@ request DenseCommunicator::i_all_to_all_v_impl( std::unique_ptr -DenseCommunicator::create_with_same_type( - communicator base, const distributed::index_map_variant& imap) const +DenseCommunicator::create_with_same_type(communicator base, + index_map_ptr imap) const { return std::visit( - [base](const auto& imap) { - return std::make_unique(base, imap); + [base](const auto* imap) { + return std::make_unique(base, *imap); }, imap); } diff --git a/core/distributed/matrix.cpp b/core/distributed/matrix.cpp index 6e264c1b765..84079ec6c2e 100644 --- a/core/distributed/matrix.cpp +++ b/core/distributed/matrix.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -31,62 +32,6 @@ GKO_REGISTER_OPERATION(separate_local_nonlocal, } // namespace matrix -template -void initialize_communication_pattern( - std::shared_ptr exec, mpi::communicator comm, - const index_map& imap, - std::vector& recv_sizes, - std::vector& recv_offsets, - std::vector& send_sizes, - std::vector& send_offsets, - array& gather_idxs) -{ - // exchange step 1: determine recv_sizes, send_sizes, send_offsets - auto host_recv_targets = - make_temporary_clone(exec->get_master(), &imap.get_remote_target_ids()); - auto host_offsets = make_temporary_clone( - exec->get_master(), &imap.get_remote_global_idxs().get_offsets()); - auto compute_recv_sizes = [](const auto* recv_targets, size_type size, - const auto* offsets, auto& recv_sizes) { - for (size_type i = 0; i < size; ++i) { - recv_sizes[recv_targets[i]] = offsets[i + 1] - offsets[i]; - } - }; - std::fill(recv_sizes.begin(), recv_sizes.end(), 0); - compute_recv_sizes(host_recv_targets->get_const_data(), - host_recv_targets->get_size(), - host_offsets->get_const_data(), recv_sizes); - std::partial_sum(recv_sizes.begin(), recv_sizes.end(), - recv_offsets.begin() + 1); - comm.all_to_all(exec, recv_sizes.data(), 1, send_sizes.data(), 1); - std::partial_sum(send_sizes.begin(), send_sizes.end(), - send_offsets.begin() + 1); - send_offsets[0] = 0; - recv_offsets[0] = 0; - - // exchange step 2: exchange gather_idxs from receivers to senders - auto recv_gather_idxs = - make_const_array_view( - imap.get_executor(), imap.get_non_local_size(), - imap.get_remote_local_idxs().get_const_flat_data()) - .copy_to_array(); - auto use_host_buffer = mpi::requires_host_buffer(exec, comm); - if (use_host_buffer) { - recv_gather_idxs.set_executor(exec->get_master()); - gather_idxs.clear(); - gather_idxs.set_executor(exec->get_master()); - } - gather_idxs.resize_and_reset(send_offsets.back()); - comm.all_to_all_v(use_host_buffer ? exec->get_master() : exec, - recv_gather_idxs.get_const_data(), recv_sizes.data(), - recv_offsets.data(), gather_idxs.get_data(), - send_sizes.data(), send_offsets.data()); - if (use_host_buffer) { - gather_idxs.set_executor(exec); - } -} - - template Matrix::Matrix( std::shared_ptr exec, mpi::communicator comm) @@ -103,12 +48,8 @@ Matrix::Matrix( ptr_param non_local_matrix_template) : EnableLinOp{exec}, DistributedBase{comm}, - imap_(exec), - send_offsets_(comm.size() + 1), - send_sizes_(comm.size()), - recv_offsets_(comm.size() + 1), - recv_sizes_(comm.size()), - gather_idxs_{exec}, + row_gatherer_{RowGatherer::create(exec, comm)}, + imap_{exec}, one_scalar_{}, local_mtx_{local_matrix_template->clone(exec)}, non_local_mtx_{non_local_matrix_template->clone(exec)} @@ -129,12 +70,8 @@ Matrix::Matrix( std::shared_ptr local_linop) : EnableLinOp{exec}, DistributedBase{comm}, - imap_(exec), - send_offsets_(comm.size() + 1), - send_sizes_(comm.size()), - recv_offsets_(comm.size() + 1), - recv_sizes_(comm.size()), - gather_idxs_{exec}, + row_gatherer_{RowGatherer::create(exec, comm)}, + imap_{exec}, one_scalar_{}, non_local_mtx_(::gko::matrix::Coo::create( exec, dim<2>{local_linop->get_size()[0], 0})) @@ -152,12 +89,8 @@ Matrix::Matrix( std::shared_ptr local_linop, std::shared_ptr non_local_linop) : EnableLinOp{exec}, DistributedBase{comm}, + row_gatherer_(RowGatherer::create(exec, comm)), imap_(std::move(imap)), - send_offsets_(comm.size() + 1), - send_sizes_(comm.size()), - recv_offsets_(comm.size() + 1), - recv_sizes_(comm.size()), - gather_idxs_{exec}, one_scalar_{} { this->set_size({imap_.get_global_size(), imap_.get_global_size()}); @@ -166,9 +99,11 @@ Matrix::Matrix( one_scalar_.init(exec, dim<2>{1, 1}); one_scalar_->fill(one()); - initialize_communication_pattern( - this->get_executor(), this->get_communicator(), imap_, recv_sizes_, - recv_offsets_, send_sizes_, send_offsets_, gather_idxs_); + row_gatherer_ = RowGatherer::create( + row_gatherer_->get_executor(), + row_gatherer_->get_collective_communicator()->create_with_same_type( + comm, &imap_), + imap_); } @@ -274,12 +209,8 @@ void Matrix::convert_to( result->get_communicator().size()); result->local_mtx_->copy_from(this->local_mtx_); result->non_local_mtx_->copy_from(this->non_local_mtx_); + result->row_gatherer_->copy_from(this->row_gatherer_); result->imap_ = this->imap_; - result->gather_idxs_ = this->gather_idxs_; - result->send_offsets_ = this->send_offsets_; - result->recv_offsets_ = this->recv_offsets_; - result->recv_sizes_ = this->recv_sizes_; - result->send_sizes_ = this->send_sizes_; result->set_size(this->get_size()); } @@ -293,12 +224,8 @@ void Matrix::move_to( result->get_communicator().size()); result->local_mtx_->move_from(this->local_mtx_); result->non_local_mtx_->move_from(this->non_local_mtx_); + result->row_gatherer_->move_from(this->row_gatherer_); result->imap_ = std::move(this->imap_); - result->gather_idxs_ = std::move(this->gather_idxs_); - result->send_offsets_ = std::move(this->send_offsets_); - result->recv_offsets_ = std::move(this->recv_offsets_); - result->recv_sizes_ = std::move(this->recv_sizes_); - result->send_sizes_ = std::move(this->send_sizes_); result->set_size(this->get_size()); this->set_size({}); } @@ -314,11 +241,7 @@ void Matrix::convert_to( result->get_communicator().size()); result->local_mtx_->copy_from(this->local_mtx_.get()); result->non_local_mtx_->copy_from(this->non_local_mtx_.get()); - result->gather_idxs_ = this->gather_idxs_; - result->send_offsets_ = this->send_offsets_; - result->recv_offsets_ = this->recv_offsets_; - result->recv_sizes_ = this->recv_sizes_; - result->send_sizes_ = this->send_sizes_; + result->row_gatherer_->copy_from(this->row_gatherer_); result->imap_ = this->imap_; result->set_size(this->get_size()); } @@ -333,11 +256,7 @@ void Matrix::move_to( result->get_communicator().size()); result->local_mtx_->move_from(this->local_mtx_.get()); result->non_local_mtx_->move_from(this->non_local_mtx_.get()); - result->gather_idxs_ = std::move(this->gather_idxs_); - result->send_offsets_ = std::move(this->send_offsets_); - result->recv_offsets_ = std::move(this->recv_offsets_); - result->recv_sizes_ = std::move(this->recv_sizes_); - result->send_sizes_ = std::move(this->send_sizes_); + result->row_gatherer_->move_from(this->row_gatherer_); result->imap_ = std::move(this->imap_); result->set_size(this->get_size()); this->set_size({}); @@ -355,11 +274,7 @@ void Matrix::convert_to( result->get_communicator().size()); result->local_mtx_->copy_from(this->local_mtx_.get()); result->non_local_mtx_->copy_from(this->non_local_mtx_.get()); - result->gather_idxs_ = this->gather_idxs_; - result->send_offsets_ = this->send_offsets_; - result->recv_offsets_ = this->recv_offsets_; - result->recv_sizes_ = this->recv_sizes_; - result->send_sizes_ = this->send_sizes_; + result->row_gatherer_->copy_from(this->row_gatherer_); result->imap_ = this->imap_; result->set_size(this->get_size()); } @@ -374,11 +289,7 @@ void Matrix::move_to( result->get_communicator().size()); result->local_mtx_->move_from(this->local_mtx_.get()); result->non_local_mtx_->move_from(this->non_local_mtx_.get()); - result->gather_idxs_ = std::move(this->gather_idxs_); - result->send_offsets_ = std::move(this->send_offsets_); - result->recv_offsets_ = std::move(this->recv_offsets_); - result->recv_sizes_ = std::move(this->recv_sizes_); - result->send_sizes_ = std::move(this->send_sizes_); + result->row_gatherer_->move_from(this->row_gatherer_); result->imap_ = std::move(this->imap_); result->set_size(this->get_size()); this->set_size({}); @@ -462,9 +373,11 @@ void Matrix::read_distributed( as>(this->non_local_mtx_) ->read(std::move(non_local_data)); - initialize_communication_pattern(exec, comm, imap_, recv_sizes_, - recv_offsets_, send_sizes_, send_offsets_, - gather_idxs_); + row_gatherer_ = RowGatherer::create( + row_gatherer_->get_executor(), + row_gatherer_->get_collective_communicator()->create_with_same_type( + comm, &imap_), + imap_); } @@ -509,52 +422,23 @@ void Matrix::read_distributed( } -template -mpi::request Matrix::communicate( - const local_vector_type* local_b) const +template +void init_recv_buffers(std::shared_ptr exec, + const RowGatherer* row_gatherer, + size_type num_cols, + const detail::VectorCache& buffer, + const detail::VectorCache& host_buffer) { - // This function can never return early! - // Even if the non-local part is empty, i.e. this process doesn't need - // any data from other processes, the used MPI calls are collective - // operations. They need to be called on all processes, even if a process - // might not communicate any data. - auto exec = this->get_executor(); - const auto comm = this->get_communicator(); - auto num_cols = local_b->get_size()[1]; - auto send_size = send_offsets_.back(); - auto recv_size = recv_offsets_.back(); - auto send_dim = dim<2>{static_cast(send_size), num_cols}; - auto recv_dim = dim<2>{static_cast(recv_size), num_cols}; - recv_buffer_.init(exec, recv_dim); - send_buffer_.init(exec, send_dim); - - local_b->row_gather(&gather_idxs_, send_buffer_.get()); - - auto use_host_buffer = mpi::requires_host_buffer(exec, comm); - if (use_host_buffer) { - host_recv_buffer_.init(exec->get_master(), recv_dim); - host_send_buffer_.init(exec->get_master(), send_dim); - host_send_buffer_->copy_from(send_buffer_.get()); - } - - mpi::contiguous_type type(num_cols, mpi::type_impl::get_type()); - auto send_ptr = use_host_buffer ? host_send_buffer_->get_const_values() - : send_buffer_->get_const_values(); - auto recv_ptr = use_host_buffer ? host_recv_buffer_->get_values() - : recv_buffer_->get_values(); - exec->synchronize(); -#ifdef GINKGO_HAVE_OPENMPI_PRE_4_1_X - comm.all_to_all_v(use_host_buffer ? exec->get_master() : exec, send_ptr, - send_sizes_.data(), send_offsets_.data(), type.get(), - recv_ptr, recv_sizes_.data(), recv_offsets_.data(), - type.get()); - return {}; -#else - return comm.i_all_to_all_v( - use_host_buffer ? exec->get_master() : exec, send_ptr, - send_sizes_.data(), send_offsets_.data(), type.get(), recv_ptr, - recv_sizes_.data(), recv_offsets_.data(), type.get()); -#endif + auto comm = + row_gatherer->get_collective_communicator()->get_base_communicator(); + auto global_recv_dim = + dim<2>{static_cast(row_gatherer->get_size()[0]), num_cols}; + auto local_recv_dim = dim<2>{ + static_cast( + row_gatherer->get_collective_communicator()->get_recv_size()), + num_cols}; + buffer.init(exec, comm, global_recv_dim, local_recv_dim); + host_buffer.init(exec->get_master(), comm, global_recv_dim, local_recv_dim); } @@ -573,17 +457,22 @@ void Matrix::apply_impl( dense_x->get_local_values()), dense_x->get_local_vector()->get_stride()); + auto exec = this->get_executor(); auto comm = this->get_communicator(); - auto req = this->communicate(dense_b->get_local_vector()); + init_recv_buffers(exec, row_gatherer_.get(), dense_b->get_size()[1], + recv_buffer_, host_recv_buffer_); + auto recv_ptr = mpi::requires_host_buffer(exec, comm) + ? host_recv_buffer_.get() + : recv_buffer_.get(); + auto req = this->row_gatherer_->apply_async(dense_b, recv_ptr); local_mtx_->apply(dense_b->get_local_vector(), local_x); req.wait(); - auto exec = this->get_executor(); - auto use_host_buffer = mpi::requires_host_buffer(exec, comm); - if (use_host_buffer) { + if (recv_ptr != recv_buffer_.get()) { recv_buffer_->copy_from(host_recv_buffer_.get()); } - non_local_mtx_->apply(one_scalar_.get(), recv_buffer_.get(), + non_local_mtx_->apply(one_scalar_.get(), + recv_buffer_->get_local_vector(), one_scalar_.get(), local_x); }, b, x); @@ -606,18 +495,22 @@ void Matrix::apply_impl( dense_x->get_local_values()), dense_x->get_local_vector()->get_stride()); + auto exec = this->get_executor(); auto comm = this->get_communicator(); - auto req = this->communicate(dense_b->get_local_vector()); + init_recv_buffers(exec, row_gatherer_.get(), dense_b->get_size()[1], + recv_buffer_, host_recv_buffer_); + auto recv_ptr = mpi::requires_host_buffer(exec, comm) + ? host_recv_buffer_.get() + : recv_buffer_.get(); + auto req = this->row_gatherer_->apply_async(dense_b, recv_ptr); local_mtx_->apply(local_alpha, dense_b->get_local_vector(), local_beta, local_x); req.wait(); - auto exec = this->get_executor(); - auto use_host_buffer = mpi::requires_host_buffer(exec, comm); - if (use_host_buffer) { + if (recv_ptr != recv_buffer_.get()) { recv_buffer_->copy_from(host_recv_buffer_.get()); } - non_local_mtx_->apply(local_alpha, recv_buffer_.get(), + non_local_mtx_->apply(local_alpha, recv_buffer_->get_local_vector(), one_scalar_.get(), local_x); }, alpha, b, beta, x); @@ -634,34 +527,39 @@ void Matrix::col_scale( auto comm = this->get_communicator(); size_type n_local_cols = local_mtx_->get_size()[1]; size_type n_non_local_cols = non_local_mtx_->get_size()[1]; + std::unique_ptr scaling_factors_single_stride; - auto stride = scaling_factors->get_stride(); - if (stride != 1) { + auto scaling_stride = scaling_factors->get_stride(); + if (scaling_stride != 1) { scaling_factors_single_stride = global_vector_type::create(exec, comm); scaling_factors_single_stride->copy_from(scaling_factors.get()); } - const auto scale_values = - stride == 1 ? scaling_factors->get_const_local_values() - : scaling_factors_single_stride->get_const_local_values(); + const global_vector_type* scaling_factors_ptr = + scaling_stride == 1 ? scaling_factors.get() + : scaling_factors_single_stride.get(); const auto scale_diag = gko::matrix::Diagonal::create_const( exec, n_local_cols, - make_const_array_view(exec, n_local_cols, scale_values)); + make_const_array_view(exec, n_local_cols, + scaling_factors_ptr->get_const_local_values())); + + init_recv_buffers(exec, row_gatherer_.get(), scaling_factors->get_size()[1], + recv_buffer_, host_recv_buffer_); + auto recv_ptr = mpi::requires_host_buffer(exec, comm) + ? host_recv_buffer_.get() + : recv_buffer_.get(); - auto req = this->communicate( - stride == 1 ? scaling_factors->get_local_vector() - : scaling_factors_single_stride->get_local_vector()); + auto req = row_gatherer_->apply_async(scaling_factors_ptr, recv_ptr); scale_diag->rapply(local_mtx_, local_mtx_); req.wait(); if (n_non_local_cols > 0) { - auto use_host_buffer = mpi::requires_host_buffer(exec, comm); - if (use_host_buffer) { + if (recv_ptr != recv_buffer_.get()) { recv_buffer_->copy_from(host_recv_buffer_.get()); } const auto non_local_scale_diag = gko::matrix::Diagonal::create_const( exec, n_non_local_cols, make_const_array_view(exec, n_non_local_cols, - recv_buffer_->get_const_values())); + recv_buffer_->get_const_local_values())); non_local_scale_diag->rapply(non_local_mtx_, non_local_mtx_); } } @@ -699,6 +597,8 @@ Matrix::Matrix(const Matrix& other) : EnableLinOp>{other.get_executor()}, DistributedBase{other.get_communicator()}, + row_gatherer_{RowGatherer::create( + other.get_executor(), other.get_communicator())}, imap_(other.get_executor()) { *this = other; @@ -711,6 +611,8 @@ Matrix::Matrix( : EnableLinOp>{other.get_executor()}, DistributedBase{other.get_communicator()}, + row_gatherer_{RowGatherer::create( + other.get_executor(), other.get_communicator())}, imap_(other.get_executor()) { *this = std::move(other); @@ -728,12 +630,8 @@ Matrix::operator=( this->set_size(other.get_size()); local_mtx_->copy_from(other.local_mtx_); non_local_mtx_->copy_from(other.non_local_mtx_); + row_gatherer_->copy_from(other.row_gatherer_); imap_ = other.imap_; - gather_idxs_ = other.gather_idxs_; - send_offsets_ = other.send_offsets_; - recv_offsets_ = other.recv_offsets_; - send_sizes_ = other.send_sizes_; - recv_sizes_ = other.recv_sizes_; one_scalar_.init(this->get_executor(), dim<2>{1, 1}); one_scalar_->fill(one()); } @@ -752,12 +650,8 @@ Matrix::operator=(Matrix&& other) other.set_size({}); local_mtx_->move_from(other.local_mtx_); non_local_mtx_->move_from(other.non_local_mtx_); + row_gatherer_->move_from(other.row_gatherer_); imap_ = std::move(other.imap_); - gather_idxs_ = std::move(other.gather_idxs_); - send_offsets_ = std::move(other.send_offsets_); - recv_offsets_ = std::move(other.recv_offsets_); - send_sizes_ = std::move(other.send_sizes_); - recv_sizes_ = std::move(other.recv_sizes_); one_scalar_.init(this->get_executor(), dim<2>{1, 1}); one_scalar_->fill(one()); } diff --git a/core/distributed/neighborhood_communicator.cpp b/core/distributed/neighborhood_communicator.cpp index c3ba9155e8b..d5afb7217e5 100644 --- a/core/distributed/neighborhood_communicator.cpp +++ b/core/distributed/neighborhood_communicator.cpp @@ -157,13 +157,13 @@ request NeighborhoodCommunicator::i_all_to_all_v_impl( std::unique_ptr -NeighborhoodCommunicator::create_with_same_type( - communicator base, const distributed::index_map_variant& imap) const +NeighborhoodCommunicator::create_with_same_type(communicator base, + index_map_ptr imap) const { return std::visit( - [base](const auto& imap) { + [base](const auto* imap) { return std::unique_ptr( - new NeighborhoodCommunicator(base, imap)); + new NeighborhoodCommunicator(base, *imap)); }, imap); } diff --git a/core/distributed/row_gatherer.cpp b/core/distributed/row_gatherer.cpp new file mode 100644 index 00000000000..6a3f873e2c1 --- /dev/null +++ b/core/distributed/row_gatherer.cpp @@ -0,0 +1,296 @@ +// SPDX-FileCopyrightText: 2017 - 2025 The Ginkgo authors +// +// SPDX-License-Identifier: BSD-3-Clause + +#include "ginkgo/core/distributed/row_gatherer.hpp" + +#include +#include +#include +#include +#include + +#include "core/base/dispatch_helper.hpp" + +namespace gko { +namespace experimental { +namespace distributed { + + +#if GINKGO_HAVE_OPENMPI_PRE_4_1_X +using DefaultCollComm = mpi::DenseCommunicator; +#else +using DefaultCollComm = mpi::NeighborhoodCommunicator; +#endif + + +template +mpi::request RowGatherer::apply_async(ptr_param b, + ptr_param x) const +{ + int is_inactive; + MPI_Status status; + GKO_ASSERT_NO_MPI_ERRORS( + MPI_Request_get_status(req_listener_, &is_inactive, &status)); + // This is untestable. Some processes might complete the previous request + // while others don't, so it's impossible to create a predictable behavior + // for a test. + GKO_THROW_IF_INVALID(is_inactive, + "Tried to call RowGatherer::apply_async while there " + "is already an active communication. Please use the " + "overload with a workspace to handle multiple " + "connections."); + + auto req = apply_async(b, x, send_workspace_); + req_listener_ = *req.get(); + return req; +} + + +template +mpi::request RowGatherer::apply_async( + ptr_param b, ptr_param x, array& workspace) const +{ + mpi::request req; + + auto exec = this->get_executor(); + auto use_host_buffer = + mpi::requires_host_buffer(exec, coll_comm_->get_base_communicator()); + auto mpi_exec = use_host_buffer ? exec->get_master() : exec; + + GKO_THROW_IF_INVALID( + !use_host_buffer || mpi_exec->memory_accessible(x->get_executor()), + "The receive buffer uses device memory, but MPI support of device " + "memory is not available or host buffer were explicitly requested. " + "Please provide a host buffer or enable MPI support for device " + "memory."); + + // dispatch global vector + run, +#endif +#if GINKGO_ENABLE_BFLOAT16 + bfloat16, std::complex, +#endif + double, float, std::complex, std::complex>( + make_temporary_clone(exec, b).get(), [&](const auto* b_global) { + using ValueType = + typename std::decay_t::value_type; + // dispatch local vector with the same precision as the global + // vector + distributed::precision_dispatch( + [&](auto* x_global) { + auto b_local = b_global->get_local_vector(); + + dim<2> send_size(coll_comm_->get_send_size(), + b_local->get_size()[1]); + auto send_size_in_bytes = + sizeof(ValueType) * send_size[0] * send_size[1]; + if (!workspace.get_executor() || + !mpi_exec->memory_accessible( + workspace.get_executor()) || + send_size_in_bytes > workspace.get_size()) { + workspace = array( + mpi_exec, + sizeof(ValueType) * send_size[0] * send_size[1]); + } + auto send_buffer = matrix::Dense::create( + mpi_exec, send_size, + make_array_view( + mpi_exec, send_size[0] * send_size[1], + reinterpret_cast(workspace.get_data())), + send_size[1]); + b_local->row_gather(&send_idxs_, send_buffer); + + auto recv_ptr = x_global->get_local_values(); + auto send_ptr = send_buffer->get_values(); + + b_local->get_executor()->synchronize(); + mpi::contiguous_type type( + b_local->get_size()[1], + mpi::type_impl::get_type()); + req = coll_comm_->i_all_to_all_v( + mpi_exec, send_ptr, type.get(), recv_ptr, type.get()); + }, + x.get()); + }); + return req; +} + + +template +dim<2> RowGatherer::get_size() const +{ + return size_; +} + + +template +std::shared_ptr +RowGatherer::get_collective_communicator() const +{ + return coll_comm_; +} + + +template +T global_add(std::shared_ptr exec, + const mpi::communicator& comm, const T& value) +{ + T result; + comm.all_reduce(std::move(exec), &value, &result, 1, MPI_SUM); + return result; +} + + +template +template +RowGatherer::RowGatherer( + std::shared_ptr exec, + std::shared_ptr coll_comm, + const index_map& imap) + : EnablePolymorphicObject(exec), + DistributedBase(coll_comm->get_base_communicator()), + size_(dim<2>{global_add(exec, coll_comm->get_base_communicator(), + imap.get_non_local_size()), + imap.get_global_size()}), + coll_comm_(std::move(coll_comm)), + send_idxs_(exec), + send_workspace_(exec), + req_listener_(MPI_REQUEST_NULL) +{ + // check that the coll_comm_ and imap have the same recv size + // the same check for the send size is not possible, since the + // imap doesn't store send indices + GKO_THROW_IF_INVALID( + coll_comm_->get_recv_size() == imap.get_non_local_size(), + "The collective communicator doesn't match the index map."); + + auto comm = coll_comm_->get_base_communicator(); + auto inverse_comm = coll_comm_->create_inverse(); + + auto mpi_exec = + mpi::requires_host_buffer(exec, coll_comm_->get_base_communicator()) + ? exec->get_master() + : exec; + auto temp_remote_local_idxs = + make_temporary_clone(mpi_exec, &imap.get_remote_local_idxs()); + + send_idxs_.set_executor(mpi_exec); + send_idxs_.resize_and_reset(coll_comm_->get_send_size()); + inverse_comm + ->i_all_to_all_v(exec, temp_remote_local_idxs->get_const_flat_data(), + send_idxs_.get_data()) + .wait(); + send_idxs_.set_executor(exec); +} + + +template +const LocalIndexType* RowGatherer::get_const_send_idxs() const +{ + return send_idxs_.get_const_data(); +} + + +template +size_type RowGatherer::get_num_send_idxs() const +{ + return send_idxs_.get_size(); +} + + +template +std::unique_ptr> +RowGatherer::create(std::shared_ptr exec, + mpi::communicator comm) +{ + return std::unique_ptr(new RowGatherer(std::move(exec), comm)); +} + + +template +RowGatherer::RowGatherer(std::shared_ptr exec, + mpi::communicator comm) + : EnablePolymorphicObject(exec), + DistributedBase(comm), + coll_comm_(std::make_shared(comm)), + send_idxs_(exec), + send_workspace_(exec), + req_listener_(MPI_REQUEST_NULL) +{} + + +template +RowGatherer::RowGatherer(RowGatherer&& o) noexcept + : EnablePolymorphicObject(o.get_executor()), + DistributedBase(o.get_communicator()), + send_idxs_(o.get_executor()), + send_workspace_(o.get_executor()), + req_listener_(MPI_REQUEST_NULL) +{ + *this = std::move(o); +} + + +template +RowGatherer& RowGatherer::operator=( + const RowGatherer& o) +{ + if (this != &o) { + size_ = o.get_size(); + coll_comm_ = o.coll_comm_; + send_idxs_ = o.send_idxs_; + } + return *this; +} + + +template +RowGatherer& RowGatherer::operator=( + RowGatherer&& o) +{ + if (this != &o) { + size_ = std::exchange(o.size_, dim<2>{}); + coll_comm_ = std::exchange( + o.coll_comm_, + std::make_shared(o.get_communicator())); + send_idxs_ = std::move(o.send_idxs_); + send_workspace_ = std::move(o.send_workspace_); + req_listener_ = std::exchange(o.req_listener_, MPI_REQUEST_NULL); + } + return *this; +} + + +template +RowGatherer::RowGatherer(const RowGatherer& o) + : EnablePolymorphicObject(o.get_executor()), + DistributedBase(o.get_communicator()), + send_idxs_(o.get_executor()) +{ + *this = o; +} + + +#define GKO_DECLARE_ROW_GATHERER(_itype) class RowGatherer<_itype> + +GKO_INSTANTIATE_FOR_EACH_INDEX_TYPE(GKO_DECLARE_ROW_GATHERER); + +#undef GKO_DECLARE_ROW_GATHERER + + +#define GKO_DECLARE_ROW_GATHERER_CONSTRUCTOR(_ltype, _gtype) \ + RowGatherer<_ltype>::RowGatherer( \ + std::shared_ptr exec, \ + std::shared_ptr coll_comm, \ + const index_map<_ltype, _gtype>& imap) + +GKO_INSTANTIATE_FOR_EACH_LOCAL_GLOBAL_INDEX_TYPE( + GKO_DECLARE_ROW_GATHERER_CONSTRUCTOR); + +#undef GKO_DECLARE_ROW_GATHERER_CONSTRUCTOR +} // namespace distributed +} // namespace experimental +} // namespace gko diff --git a/core/matrix/diagonal.cpp b/core/matrix/diagonal.cpp index 2bf11f77128..cf798cae4c6 100644 --- a/core/matrix/diagonal.cpp +++ b/core/matrix/diagonal.cpp @@ -376,7 +376,7 @@ Diagonal::Diagonal(std::shared_ptr exec, : EnableLinOp(exec, dim<2>(size)), values_{exec, std::move(values)} { - GKO_ENSURE_IN_BOUNDS(size - 1, values_.get_size()); + GKO_ENSURE_COMPATIBLE_BOUNDS(size, values_.get_size()); } diff --git a/core/multigrid/pgm.cpp b/core/multigrid/pgm.cpp index fddefd3ddf0..bbff7f67a64 100644 --- a/core/multigrid/pgm.cpp +++ b/core/multigrid/pgm.cpp @@ -279,18 +279,15 @@ array Pgm::communicate_non_local_agg( { auto exec = matrix->get_executor(); const auto comm = matrix->get_communicator(); - auto send_sizes = matrix->send_sizes_; - auto recv_sizes = matrix->recv_sizes_; - auto send_offsets = matrix->send_offsets_; - auto recv_offsets = matrix->recv_offsets_; - auto gather_idxs = matrix->gather_idxs_; - auto total_send_size = send_offsets.back(); - auto total_recv_size = recv_offsets.back(); + auto coll_comm = matrix->row_gatherer_->get_collective_communicator(); + auto total_send_size = coll_comm->get_send_size(); + auto total_recv_size = coll_comm->get_recv_size(); + auto row_gatherer = matrix->row_gatherer_; array send_agg(exec, total_send_size); exec->run(pgm::make_gather_index( send_agg.get_size(), local_agg.get_const_data(), - gather_idxs.get_const_data(), send_agg.get_data())); + row_gatherer->get_const_send_idxs(), send_agg.get_data())); // There is no index map on the coarse level yet, so map the local indices // to global indices on the coarse level manually @@ -312,16 +309,16 @@ array Pgm::communicate_non_local_agg( send_global_agg.get_data(), host_send_buffer.get_data()); } - auto type = experimental::mpi::type_impl::get_type(); const auto send_ptr = use_host_buffer ? host_send_buffer.get_const_data() : send_global_agg.get_const_data(); auto recv_ptr = use_host_buffer ? host_recv_buffer.get_data() : non_local_agg.get_data(); exec->synchronize(); - comm.all_to_all_v(use_host_buffer ? exec->get_master() : exec, send_ptr, - send_sizes.data(), send_offsets.data(), type, recv_ptr, - recv_sizes.data(), recv_offsets.data(), type); + coll_comm + ->i_all_to_all_v(use_host_buffer ? exec->get_master() : exec, send_ptr, + recv_ptr) + .wait(); if (use_host_buffer) { exec->copy_from(exec->get_master(), total_recv_size, recv_ptr, non_local_agg.get_data()); diff --git a/core/test/base/segmented_array.cpp b/core/test/base/segmented_array.cpp index 2741990036f..e01d21fe73c 100644 --- a/core/test/base/segmented_array.cpp +++ b/core/test/base/segmented_array.cpp @@ -1,4 +1,4 @@ -// SPDX-FileCopyrightText: 2017 - 2024 The Ginkgo authors +// SPDX-FileCopyrightText: 2017 - 2025 The Ginkgo authors // // SPDX-License-Identifier: BSD-3-Clause @@ -11,6 +11,19 @@ #include "core/test/utils.hpp" +class DummyExecutor : public gko::ReferenceExecutor { +public: + DummyExecutor() : ReferenceExecutor(std::make_shared()) + {} + +protected: + bool verify_memory_to(const ReferenceExecutor* other) const override + { + return false; + } +}; + + template gko::array get_flat_array(gko::segmented_array& arr) { @@ -143,6 +156,40 @@ TYPED_TEST(SegmentedArray, CanBeMoved) } +TYPED_TEST(SegmentedArray, CanCreateTemporaryClone) +{ + using value_type = typename TestFixture::value_type; + auto other_exec = std::make_shared(); + auto buffer = gko::array(this->exec, {1, 2, 2, 3, 3, 3}); + auto offsets = gko::array(this->exec, {0, 1, 3, 6}); + auto arr = + gko::segmented_array::create_from_offsets(buffer, offsets); + + auto copy = gko::make_temporary_clone(other_exec, &arr); + + ASSERT_EQ(copy->get_executor(), other_exec); + ASSERT_NE(copy->get_executor(), arr.get_executor()); + GKO_ASSERT_SEGMENTED_ARRAY_EQ(*copy, arr); +} + + +TYPED_TEST(SegmentedArray, TemporaryCloneIsNoopForSameExec) +{ + using value_type = typename TestFixture::value_type; + auto buffer = gko::array(this->exec, {1, 2, 2, 3, 3, 3}); + auto offsets = gko::array(this->exec, {0, 1, 3, 6}); + auto arr = + gko::segmented_array::create_from_offsets(buffer, offsets); + + auto copy = gko::make_temporary_clone(this->exec, &arr); + + ASSERT_EQ(copy->get_executor(), arr.get_executor()); + ASSERT_EQ(copy->get_flat_data(), arr.get_flat_data()); + ASSERT_EQ(copy->get_offsets().get_const_data(), + arr.get_offsets().get_const_data()); +} + + TYPED_TEST(SegmentedArray, ThrowsIfBufferSizeDoesntMatchSizes) { using value_type = typename TestFixture::value_type; diff --git a/core/test/gtest/ginkgo_mpi_main.cpp b/core/test/gtest/ginkgo_mpi_main.cpp index 07a1c2c343d..83af86b681f 100644 --- a/core/test/gtest/ginkgo_mpi_main.cpp +++ b/core/test/gtest/ginkgo_mpi_main.cpp @@ -1,4 +1,4 @@ -// SPDX-FileCopyrightText: 2017 - 2024 The Ginkgo authors +// SPDX-FileCopyrightText: 2017 - 2025 The Ginkgo authors // // SPDX-License-Identifier: BSD-3-Clause diff --git a/core/test/matrix/diagonal.cpp b/core/test/matrix/diagonal.cpp index de03a9350bb..a7b26980584 100644 --- a/core/test/matrix/diagonal.cpp +++ b/core/test/matrix/diagonal.cpp @@ -1,4 +1,4 @@ -// SPDX-FileCopyrightText: 2017 - 2024 The Ginkgo authors +// SPDX-FileCopyrightText: 2017 - 2025 The Ginkgo authors // // SPDX-License-Identifier: BSD-3-Clause @@ -71,6 +71,16 @@ TYPED_TEST(Diagonal, CanBeEmpty) } +TYPED_TEST(Diagonal, CanBeEmptyFromArray) +{ + using Diag = typename TestFixture::Diag; + using value_type = typename Diag::value_type; + auto diag = Diag::create(this->exec, 0, gko::array{this->exec}); + + this->assert_empty(diag.get()); +} + + TYPED_TEST(Diagonal, CanBeCreatedFromExistingData) { using Diag = typename TestFixture::Diag; diff --git a/core/test/mpi/distributed/CMakeLists.txt b/core/test/mpi/distributed/CMakeLists.txt index 4186a6c5617..7d0cd5c748f 100644 --- a/core/test/mpi/distributed/CMakeLists.txt +++ b/core/test/mpi/distributed/CMakeLists.txt @@ -1,6 +1,7 @@ ginkgo_create_test(helpers MPI_SIZE 1 LABELS distributed) ginkgo_create_test(matrix MPI_SIZE 1 LABELS distributed) ginkgo_create_test(collective_communicator MPI_SIZE 6 LABELS distributed) +ginkgo_create_test(row_gatherer MPI_SIZE 6 LABELS distributed) ginkgo_create_test(vector_cache MPI_SIZE 3 LABELS distributed) add_subdirectory(preconditioner) diff --git a/core/test/mpi/distributed/row_gatherer.cpp b/core/test/mpi/distributed/row_gatherer.cpp new file mode 100644 index 00000000000..b0908006903 --- /dev/null +++ b/core/test/mpi/distributed/row_gatherer.cpp @@ -0,0 +1,142 @@ +// SPDX-FileCopyrightText: 2017 - 2025 The Ginkgo authors +// +// SPDX-License-Identifier: BSD-3-Clause + +#include + +#include +#include +#include +#include + +#include "core/test/utils.hpp" +#include "core/test/utils/assertions.hpp" + + +using CollCommType = +#if GINKGO_HAVE_OPENMPI_PRE_4_1_X + gko::experimental::mpi::DenseCommunicator; +#else + gko::experimental::mpi::NeighborhoodCommunicator; +#endif + + +template +class RowGatherer : public ::testing::Test { +protected: + using index_type = IndexType; + using part_type = + gko::experimental::distributed::Partition; + using map_type = + gko::experimental::distributed::index_map; + using row_gatherer_type = + gko::experimental::distributed::RowGatherer; + + void SetUp() override { ASSERT_EQ(comm.size(), 6); } + + std::array, 6> create_recv_connections() + { + return {gko::array{ref, {3, 5, 10, 11}}, + gko::array{ref, {0, 1, 7, 12, 13}}, + gko::array{ref, {3, 4, 17}}, + gko::array{ref, {1, 2, 12, 14}}, + gko::array{ref, {4, 5, 9, 10, 15, 16}}, + gko::array{ref, {8, 12, 13, 14}}}; + } + + gko::size_type recv_connections_size() + { + gko::size_type size = 0; + for (auto& recv_connections : create_recv_connections()) { + size += recv_connections.get_size(); + } + return size; + } + + std::shared_ptr ref = gko::ReferenceExecutor::create(); + gko::experimental::mpi::communicator comm = MPI_COMM_WORLD; + std::shared_ptr part = part_type::build_from_global_size_uniform( + this->ref, this->comm.size(), this->comm.size() * 3); + map_type imap = map_type{ref, part, comm.rank(), + create_recv_connections()[comm.rank()]}; + std::shared_ptr coll_comm = + std::make_shared(this->comm, imap); +}; + +TYPED_TEST_SUITE(RowGatherer, gko::test::IndexTypes, TypenameNameGenerator); + + +TYPED_TEST(RowGatherer, CanDefaultConstruct) +{ + using RowGatherer = typename TestFixture::row_gatherer_type; + + auto rg = RowGatherer::create(this->ref, this->comm); + + GKO_ASSERT_EQUAL_DIMENSIONS(rg, gko::dim<2>()); +} + + +TYPED_TEST(RowGatherer, CanConstructWithEmptyCollectiveCommAndIndexMap) +{ + using RowGatherer = typename TestFixture::row_gatherer_type; + using IndexMap = typename TestFixture::map_type; + auto coll_comm = std::make_shared(this->comm); + auto map = IndexMap{this->ref}; + + auto rg = RowGatherer::create(this->ref, coll_comm, map); + + GKO_ASSERT_EQUAL_DIMENSIONS(rg, gko::dim<2>()); +} + + +TYPED_TEST(RowGatherer, CanConstructFromCollectiveCommAndIndexMap) +{ + using RowGatherer = typename TestFixture::row_gatherer_type; + + auto rg = RowGatherer::create(this->ref, this->coll_comm, this->imap); + + gko::dim<2> size{this->recv_connections_size(), 18}; + GKO_ASSERT_EQUAL_DIMENSIONS(rg, size); +} + + +TYPED_TEST(RowGatherer, CanCopy) +{ + using RowGatherer = typename TestFixture::row_gatherer_type; + auto rg = RowGatherer::create(this->ref, this->coll_comm, this->imap); + + auto copy = gko::clone(rg); + + GKO_ASSERT_EQUAL_DIMENSIONS(rg, copy); + auto copy_coll_comm = std::dynamic_pointer_cast( + copy->get_collective_communicator()); + ASSERT_EQ(*this->coll_comm, *copy_coll_comm); + auto send_idxs = gko::make_const_array_view( + rg->get_executor(), rg->get_num_send_idxs(), rg->get_const_send_idxs()); + auto copy_send_idxs = gko::make_const_array_view( + copy->get_executor(), copy->get_num_send_idxs(), + copy->get_const_send_idxs()); + GKO_ASSERT_ARRAY_EQ(send_idxs, copy_send_idxs); +} + + +TYPED_TEST(RowGatherer, CanMove) +{ + using RowGatherer = typename TestFixture::row_gatherer_type; + auto rg = RowGatherer::create(this->ref, this->coll_comm, this->imap); + auto orig_send_idxs = rg->get_const_send_idxs(); + auto orig_coll_comm = rg->get_collective_communicator(); + auto copy = gko::clone(rg); + + auto move = RowGatherer::create(this->ref, this->comm); + move->move_from(rg); + + GKO_ASSERT_EQUAL_DIMENSIONS(move, copy); + GKO_ASSERT_EQUAL_DIMENSIONS(rg, gko::dim<2>()); + ASSERT_EQ(orig_send_idxs, move->get_const_send_idxs()); + ASSERT_EQ(orig_coll_comm, move->get_collective_communicator()); + ASSERT_EQ(copy->get_num_send_idxs(), move->get_num_send_idxs()); + ASSERT_EQ(rg->get_const_send_idxs(), nullptr); + ASSERT_EQ(rg->get_num_send_idxs(), 0); + ASSERT_NE(rg->get_collective_communicator(), nullptr); +} diff --git a/include/ginkgo/core/base/mpi.hpp b/include/ginkgo/core/base/mpi.hpp index 5c33fca03c1..1215d58b123 100644 --- a/include/ginkgo/core/base/mpi.hpp +++ b/include/ginkgo/core/base/mpi.hpp @@ -380,7 +380,6 @@ class request { return status; } - private: MPI_Request req_; }; diff --git a/include/ginkgo/core/base/segmented_array.hpp b/include/ginkgo/core/base/segmented_array.hpp index b34605cc902..b74ab82e596 100644 --- a/include/ginkgo/core/base/segmented_array.hpp +++ b/include/ginkgo/core/base/segmented_array.hpp @@ -1,4 +1,4 @@ -// SPDX-FileCopyrightText: 2017 - 2024 The Ginkgo authors +// SPDX-FileCopyrightText: 2017 - 2025 The Ginkgo authors // // SPDX-License-Identifier: BSD-3-Clause @@ -152,12 +152,11 @@ struct temporary_clone_helper> { bool copy_data) { if (copy_data) { - return std::make_unique>( - make_array_view(exec, ptr->get_size(), ptr->get_flat_data()), - ptr->get_offsets()); + return std::make_unique>(std::move(exec), *ptr); } else { - return std::make_unique>(std::move(exec), - ptr->get_offsets()); + return std::unique_ptr>( + new segmented_array(segmented_array::create_from_offsets( + array(std::move(exec), ptr->get_offsets())))); } } }; @@ -168,9 +167,7 @@ struct temporary_clone_helper> { std::shared_ptr exec, const segmented_array* ptr, bool) { - return std::make_unique>( - make_array_view(exec, ptr->get_size(), ptr->get_const_flat_data()), - ptr->get_offsets()); + return std::make_unique>(std::move(exec), *ptr); } }; diff --git a/include/ginkgo/core/distributed/collective_communicator.hpp b/include/ginkgo/core/distributed/collective_communicator.hpp index 2dfcb893e6f..afddc96212f 100644 --- a/include/ginkgo/core/distributed/collective_communicator.hpp +++ b/include/ginkgo/core/distributed/collective_communicator.hpp @@ -11,9 +11,10 @@ #if GINKGO_BUILD_MPI +#include #include -#include +#include namespace gko { @@ -29,6 +30,14 @@ namespace mpi { */ class CollectiveCommunicator { public: + /** + * All allowed index_map types (as const *) + */ + using index_map_ptr = + std::variant*, + const distributed::index_map*, + const distributed::index_map*>; + virtual ~CollectiveCommunicator() = default; explicit CollectiveCommunicator(communicator base = MPI_COMM_NULL); @@ -73,8 +82,7 @@ class CollectiveCommunicator { * @return a CollectiveCommunicator with the same dynamic type */ [[nodiscard]] virtual std::unique_ptr - create_with_same_type(communicator base, - const distributed::index_map_variant& imap) const = 0; + create_with_same_type(communicator base, index_map_ptr imap) const = 0; /** * Creates a CollectiveCommunicator with the inverse communication pattern @@ -87,16 +95,16 @@ class CollectiveCommunicator { create_inverse() const = 0; /** - * Get the total number of received elements this communication patterns - * expects. + * Get the number of elements received by this process within this + * communication pattern. * * @return number of received elements. */ [[nodiscard]] virtual comm_index_type get_recv_size() const = 0; /** - * Get the total number of sent elements this communication patterns - * expects. + * Get the number of elements sent by this process within this communication + * pattern. * * @return number of sent elements. */ diff --git a/include/ginkgo/core/distributed/dense_communicator.hpp b/include/ginkgo/core/distributed/dense_communicator.hpp index 1f600a93f18..221de351752 100644 --- a/include/ginkgo/core/distributed/dense_communicator.hpp +++ b/include/ginkgo/core/distributed/dense_communicator.hpp @@ -65,8 +65,7 @@ class DenseCommunicator final : public CollectiveCommunicator { const distributed::index_map& imap); [[nodiscard]] std::unique_ptr create_with_same_type( - communicator base, - const distributed::index_map_variant& imap) const override; + communicator base, index_map_ptr imap) const override; /** * Creates the inverse DenseCommunicator by switching sources diff --git a/include/ginkgo/core/distributed/index_map.hpp b/include/ginkgo/core/distributed/index_map.hpp index 09037d303a3..093f907e494 100644 --- a/include/ginkgo/core/distributed/index_map.hpp +++ b/include/ginkgo/core/distributed/index_map.hpp @@ -90,7 +90,7 @@ class index_map { * @param index_space_v the index space in which the passed-in local * indices are defined * - * @return the mapped global indices. Any local index, that is not in the + * @return the mapped global indices. Any local index that is not in the * specified index space is mapped to invalid_index */ array map_to_global( @@ -98,7 +98,7 @@ class index_map { index_space index_space_v) const; /** - * \brief get size of index_space::local + * \brief get size of the global index space */ size_type get_global_size() const; diff --git a/include/ginkgo/core/distributed/index_map_fwd.hpp b/include/ginkgo/core/distributed/index_map_fwd.hpp deleted file mode 100644 index 8781fbfffd6..00000000000 --- a/include/ginkgo/core/distributed/index_map_fwd.hpp +++ /dev/null @@ -1,30 +0,0 @@ -// SPDX-FileCopyrightText: 2017 - 2025 The Ginkgo authors -// -// SPDX-License-Identifier: BSD-3-Clause - -#ifndef GKO_PUBLIC_CORE_INDEX_MAP_FWD_HPP -#define GKO_PUBLIC_CORE_INDEX_MAP_FWD_HPP - -#include - -#include - - -namespace gko { -namespace experimental { -namespace distributed { - - -template -class index_map; - -using index_map_variant = - std::variant, index_map, - index_map>; - - -} // namespace distributed -} // namespace experimental -} // namespace gko - -#endif // GKO_PUBLIC_CORE_INDEX_MAP_FWD_HPP diff --git a/include/ginkgo/core/distributed/matrix.hpp b/include/ginkgo/core/distributed/matrix.hpp index 4bb6d1881b8..68f149939ec 100644 --- a/include/ginkgo/core/distributed/matrix.hpp +++ b/include/ginkgo/core/distributed/matrix.hpp @@ -18,6 +18,8 @@ #include #include #include +#include +#include namespace gko { @@ -689,33 +691,17 @@ class Matrix std::shared_ptr local_linop, std::shared_ptr non_local_linop); - /** - * Starts a non-blocking communication of the values of b that are shared - * with other processors. - * - * @param local_b The full local vector to be communicated. The subset of - * shared values is automatically extracted. - * @return MPI request for the non-blocking communication. - */ - mpi::request communicate(const local_vector_type* local_b) const; - void apply_impl(const LinOp* b, LinOp* x) const override; void apply_impl(const LinOp* alpha, const LinOp* b, const LinOp* beta, LinOp* x) const override; private: + std::shared_ptr> row_gatherer_; index_map imap_; - std::vector send_offsets_; - std::vector send_sizes_; - std::vector recv_offsets_; - std::vector recv_sizes_; - array gather_idxs_; gko::detail::DenseCache one_scalar_; - gko::detail::DenseCache host_send_buffer_; - gko::detail::DenseCache host_recv_buffer_; - gko::detail::DenseCache send_buffer_; - gko::detail::DenseCache recv_buffer_; + detail::VectorCache recv_buffer_; + detail::VectorCache host_recv_buffer_; std::shared_ptr local_mtx_; std::shared_ptr non_local_mtx_; }; diff --git a/include/ginkgo/core/distributed/neighborhood_communicator.hpp b/include/ginkgo/core/distributed/neighborhood_communicator.hpp index ee8d937eba5..0e69a97383a 100644 --- a/include/ginkgo/core/distributed/neighborhood_communicator.hpp +++ b/include/ginkgo/core/distributed/neighborhood_communicator.hpp @@ -69,8 +69,7 @@ class NeighborhoodCommunicator final : public CollectiveCommunicator { const distributed::index_map& imap); std::unique_ptr create_with_same_type( - communicator base, - const distributed::index_map_variant& imap) const override; + communicator base, index_map_ptr imap) const override; /** * Creates the inverse NeighborhoodCommunicator by switching sources diff --git a/include/ginkgo/core/distributed/row_gatherer.hpp b/include/ginkgo/core/distributed/row_gatherer.hpp new file mode 100644 index 00000000000..7623875e69f --- /dev/null +++ b/include/ginkgo/core/distributed/row_gatherer.hpp @@ -0,0 +1,200 @@ +// SPDX-FileCopyrightText: 2017 - 2025 The Ginkgo authors +// +// SPDX-License-Identifier: BSD-3-Clause + +#ifndef GKO_PUBLIC_CORE_DISTRIBUTED_ROW_GATHERER_HPP_ +#define GKO_PUBLIC_CORE_DISTRIBUTED_ROW_GATHERER_HPP_ + + +#include + + +#if GINKGO_BUILD_MPI + + +#include +#include +#include +#include +#include +#include + + +namespace gko { +namespace experimental { +namespace distributed { + + +/** + * The distributed::RowGatherer gathers the rows of distributed::Vector that + * are located on other processes. + * + * Example usage: + * ```c++ + * auto coll_comm = std::make_shared(comm, + * imap); + * auto rg = distributed::RowGatherer::create(exec, coll_comm, imap); + * + * auto b = distributed::Vector::create(...); + * auto x = matrix::Dense::create(...); + * + * auto req = rg->apply_async(b, x); + * // users can do some computation that doesn't modify b, or access x + * req.wait(); + * // x now contains the gathered rows of b + * ``` + * + * @note The output vector for the apply_async functions *must* use an executor + * that is compatible with the MPI implementation. In particular, if the + * MPI implementation is not GPU aware, then the output vector *must* use + * a CPU executor. Otherwise, an exception will be thrown. + * + * @tparam LocalIndexType the index type for the stored indices + */ +template +class RowGatherer final + : public EnablePolymorphicObject>, + public EnablePolymorphicAssignment>, + public DistributedBase { + friend class EnablePolymorphicObject; + +public: + /** + * Asynchronous version of LinOp::apply. + * + * @warning Only one mpi::request can be active at any given time. This + * function will throw if another request is already active. + * + * @param b the input distributed::Vector. + * @param x the output matrix::Dense with the rows gathered from b. Its + * executor has to be compatible with the MPI implementation, see + * the class documentation. + * + * @return a mpi::request for this task. The task is guaranteed to + * be completed only after `.wait()` has been called on it. + */ + mpi::request apply_async(ptr_param b, + ptr_param x) const; + + /** + * Asynchronous version of LinOp::apply. + * + * @warning Calling this multiple times with the same workspace and without + * waiting on each previous request will lead to incorrect + * data transfers. + * + * @param b the input distributed::Vector. + * @param x the output matrix::Dense with the rows gathered from b. Its + * executor has to be compatible with the MPI implementation, see + * the class documentation. + * @param workspace a workspace to store temporary data for the operation. + * This might not be modified before the request is + * waited on. + * + * @return a mpi::request for this task. The task is guaranteed to + * be completed only after `.wait()` has been called on it. + */ + mpi::request apply_async(ptr_param b, ptr_param x, + array& workspace) const; + + /** + * Returns the size of the row gatherer. + */ + dim<2> get_size() const; + + /** + * Get the used collective communicator. + */ + std::shared_ptr + get_collective_communicator() const; + + /** + * Read access to the (local) rows indices + */ + const LocalIndexType* get_const_send_idxs() const; + + /** + * Returns the number of (local) row indices. + */ + size_type get_num_send_idxs() const; + + /** + * Creates a distributed::RowGatherer from a given collective communicator + * and index map. + * + * @TODO: using a segmented array instead of the imap would probably be + * more general + * + * @tparam GlobalIndexType the global index type of the index map + * + * @param exec the executor + * @param coll_comm the collective communicator + * @param imap the index map defining which rows to gather + * + * @note The coll_comm and imap have to be compatible. The coll_comm must + * send and recv exactly as many rows as the imap defines. + * @note This is a collective operation, all participating processes have + * to execute this operation. + * + * @return a shared_ptr to the created distributed::RowGatherer + */ + template = + sizeof(LocalIndexType)>> + static std::unique_ptr create( + std::shared_ptr exec, + std::shared_ptr coll_comm, + const index_map& imap) + { + return std::unique_ptr( + new RowGatherer(std::move(exec), std::move(coll_comm), imap)); + } + + /* + * Create method for an empty RowGatherer. + */ + static std::unique_ptr create( + std::shared_ptr exec, mpi::communicator comm); + + RowGatherer(const RowGatherer& o); + + RowGatherer(RowGatherer&& o) noexcept; + + RowGatherer& operator=(const RowGatherer& o); + + RowGatherer& operator=(RowGatherer&& o); + +private: + /** + * @copydoc RowGatherer::create(std::shared_ptr, std::shared_ptr, + * const index_map&) + */ + template + RowGatherer(std::shared_ptr exec, + std::shared_ptr coll_comm, + const index_map& imap); + + /** + * @copydoc RowGatherer::create(std::shared_ptr, mpi::communicator) + */ + RowGatherer(std::shared_ptr exec, mpi::communicator comm); + + dim<2> size_; + std::shared_ptr coll_comm_; + array send_idxs_; + mutable array send_workspace_; + // This object might not hold an actual MPI request, so we can't use the + // always owning mpi::request. Its destructor would otherwise make the + // program crash. + mutable MPI_Request req_listener_{MPI_REQUEST_NULL}; +}; + + +} // namespace distributed +} // namespace experimental +} // namespace gko + +#endif +#endif // GKO_PUBLIC_CORE_DISTRIBUTED_ROW_GATHERER_HPP_ diff --git a/test/mpi/CMakeLists.txt b/test/mpi/CMakeLists.txt index 346aba200f7..93e6c5c451b 100644 --- a/test/mpi/CMakeLists.txt +++ b/test/mpi/CMakeLists.txt @@ -1,8 +1,4 @@ -ginkgo_create_common_and_reference_test(assembly MPI_SIZE 3 LABELS distributed) -ginkgo_create_common_and_reference_test(matrix MPI_SIZE 3 LABELS distributed) -ginkgo_create_common_and_reference_test(partition_helpers MPI_SIZE 3 LABELS distributed) -ginkgo_create_common_and_reference_test(vector MPI_SIZE 3 LABELS distributed) - +add_subdirectory(distributed) add_subdirectory(preconditioner) add_subdirectory(solver) add_subdirectory(multigrid) diff --git a/test/mpi/distributed/CMakeLists.txt b/test/mpi/distributed/CMakeLists.txt new file mode 100644 index 00000000000..73424c9757f --- /dev/null +++ b/test/mpi/distributed/CMakeLists.txt @@ -0,0 +1,6 @@ +ginkgo_create_common_and_reference_test(assembly MPI_SIZE 3 LABELS distributed) +ginkgo_create_common_and_reference_test(matrix MPI_SIZE 3 LABELS distributed) +ginkgo_create_common_and_reference_test(partition_helpers MPI_SIZE 3 LABELS distributed) +ginkgo_create_common_and_reference_test(vector MPI_SIZE 3 LABELS distributed) +# reduce the number of OpenMP threads per MPI rank to 2, so that in total 12 cores are used +ginkgo_create_common_and_reference_test(row_gatherer MPI_SIZE 6 LABELS distributed RESOURCE_LOCAL_CORES 2) diff --git a/test/mpi/assembly.cpp b/test/mpi/distributed/assembly.cpp similarity index 100% rename from test/mpi/assembly.cpp rename to test/mpi/distributed/assembly.cpp diff --git a/test/mpi/matrix.cpp b/test/mpi/distributed/matrix.cpp similarity index 94% rename from test/mpi/matrix.cpp rename to test/mpi/distributed/matrix.cpp index d1dfa362ad8..0c7457ff184 100644 --- a/test/mpi/matrix.cpp +++ b/test/mpi/distributed/matrix.cpp @@ -572,7 +572,11 @@ TYPED_TEST(Matrix, CanApplyToMultipleVectors) this->dist_mat->apply(this->x, this->y); - GKO_ASSERT_MTX_NEAR(this->y->get_local_vector(), result[rank], 0); + auto eps = std::is_same_v || + std::is_same_v> + ? r::value + : gko::remove_complex{0.0}; + GKO_ASSERT_MTX_NEAR(this->y->get_local_vector(), result[rank], eps); } @@ -860,6 +864,8 @@ bool needs_transfers(std::shared_ptr exec) class HostToDeviceLogger : public gko::log::Logger { public: + mutable int transfer_count = 0; + void on_copy_started(const gko::Executor* exec_from, const gko::Executor* exec_to, const gko::uintptr& loc_from, @@ -867,28 +873,27 @@ class HostToDeviceLogger : public gko::log::Logger { const gko::size_type& num_bytes) const override { if (exec_from != exec_to) { - transfer_count_++; + transfer_count++; } } +}; - int get_transfer_count() const { return transfer_count_; } - static std::unique_ptr create() - { - return std::unique_ptr(new HostToDeviceLogger()); - } +class AllocationLogger : public gko::log::Logger { +public: + mutable int count = 0; protected: - explicit HostToDeviceLogger() - : gko::log::Logger(gko::log::Logger::copy_started_mask) - {} - -private: - mutable int transfer_count_ = 0; + void on_allocation_completed(const gko::Executor* exec, + const gko::size_type& num_bytes, + const gko::uintptr& location) const override + { + ++count; + } }; -class MatrixGpuAwareCheck : public CommonMpiTestFixture { +class MatrixInternalBuffers : public CommonMpiTestFixture { public: using local_index_type = gko::int32; using global_index_type = gko::int64; @@ -898,14 +903,16 @@ class MatrixGpuAwareCheck : public CommonMpiTestFixture { using dist_vec_type = gko::experimental::distributed::Vector; using dense_vec_type = gko::matrix::Dense; - MatrixGpuAwareCheck() - : logger(gko::share(HostToDeviceLogger::create())), engine(42) + MatrixInternalBuffers() { - exec->add_logger(logger); + exec->add_logger(copy_logger); + exec->add_logger(alloc_logger); mat = dist_mtx_type::create(exec, comm); x = dist_vec_type::create(exec, comm); y = dist_vec_type::create(exec, comm); + factor = dist_vec_type::create(exec, comm, gko::dim<2>{0, 1}, + gko::dim<2>{0, 1}); alpha = dense_vec_type::create(exec, gko::dim<2>{1, 1}); beta = dense_vec_type::create(exec, gko::dim<2>{1, 1}); @@ -916,33 +923,70 @@ class MatrixGpuAwareCheck : public CommonMpiTestFixture { std::unique_ptr x; std::unique_ptr y; + std::unique_ptr factor; std::unique_ptr alpha; std::unique_ptr beta; - std::shared_ptr logger; + std::shared_ptr copy_logger = + std::make_shared(); + std::shared_ptr alloc_logger = + std::make_shared(); - std::default_random_engine engine; + std::default_random_engine engine{42}; }; -TEST_F(MatrixGpuAwareCheck, ApplyCopiesToHostOnlyIfNecessary) +TEST_F(MatrixInternalBuffers, ApplyCopiesToHostOnlyIfNecessary) { - auto transfer_count_before = logger->get_transfer_count(); + auto transfer_count_before = copy_logger->transfer_count; mat->apply(x, y); - ASSERT_EQ(logger->get_transfer_count() > transfer_count_before, + ASSERT_EQ(copy_logger->transfer_count > transfer_count_before, needs_transfers(exec)); } -TEST_F(MatrixGpuAwareCheck, AdvancedApplyCopiesToHostOnlyIfNecessary) +TEST_F(MatrixInternalBuffers, AdvancedApplyCopiesToHostOnlyIfNecessary) { - auto transfer_count_before = logger->get_transfer_count(); + auto transfer_count_before = copy_logger->transfer_count; mat->apply(alpha, x, beta, y); - ASSERT_EQ(logger->get_transfer_count() > transfer_count_before, + ASSERT_EQ(copy_logger->transfer_count > transfer_count_before, needs_transfers(exec)); } + + +TEST_F(MatrixInternalBuffers, ApplyAllocatesBuffersOnlyOnce) +{ + mat->apply(x, y); + + auto alloc_count_before = alloc_logger->count; + mat->apply(x, y); + + ASSERT_EQ(alloc_logger->count, alloc_count_before); +} + + +TEST_F(MatrixInternalBuffers, AdvancedApplyAllocatesBuffersOnlyOnce) +{ + mat->apply(alpha, x, beta, y); + + auto alloc_count_before = alloc_logger->count; + mat->apply(alpha, x, beta, y); + + ASSERT_EQ(alloc_logger->count, alloc_count_before); +} + + +TEST_F(MatrixInternalBuffers, ColScaleAllocatesBuffersOnlyOnce) +{ + mat->col_scale(factor); + + auto alloc_count_before = alloc_logger->count; + mat->col_scale(factor); + + ASSERT_EQ(alloc_logger->count, alloc_count_before); +} diff --git a/test/mpi/partition_helpers.cpp b/test/mpi/distributed/partition_helpers.cpp similarity index 98% rename from test/mpi/partition_helpers.cpp rename to test/mpi/distributed/partition_helpers.cpp index 43b4783d896..9a2baeabca5 100644 --- a/test/mpi/partition_helpers.cpp +++ b/test/mpi/distributed/partition_helpers.cpp @@ -1,4 +1,4 @@ -// SPDX-FileCopyrightText: 2017 - 2024 The Ginkgo authors +// SPDX-FileCopyrightText: 2017 - 2025 The Ginkgo authors // // SPDX-License-Identifier: BSD-3-Clause diff --git a/test/mpi/distributed/row_gatherer.cpp b/test/mpi/distributed/row_gatherer.cpp new file mode 100644 index 00000000000..0ddffaf6ec1 --- /dev/null +++ b/test/mpi/distributed/row_gatherer.cpp @@ -0,0 +1,242 @@ +// SPDX-FileCopyrightText: 2017 - 2025 The Ginkgo authors +// +// SPDX-License-Identifier: BSD-3-Clause + +#include +#include +#include + +#include + +#include + +#include +#include +#include +#include + +#include "core/test/utils.hpp" +#include "ginkgo/core/base/exception.hpp" +#include "test/utils/mpi/common_fixture.hpp" + + +#if GINKGO_HAVE_OPENMPI_PRE_4_1_X +using CollCommType = gko::experimental::mpi::DenseCommunicator; +#else +using CollCommType = gko::experimental::mpi::NeighborhoodCommunicator; +#endif + + +template +class RowGatherer : public CommonMpiTestFixture { +protected: + using index_type = IndexType; + using part_type = + gko::experimental::distributed::Partition; + using map_type = + gko::experimental::distributed::index_map; + using row_gatherer_type = + gko::experimental::distributed::RowGatherer; + + RowGatherer() + { + int rank = comm.rank(); + auto part = gko::share(part_type::build_from_global_size_uniform( + exec, comm.size(), comm.size() * 3)); + auto recv_connections = create_recv_connections()[rank]; + auto imap = map_type{exec, part, comm.rank(), recv_connections}; + auto coll_comm = std::make_shared(comm, imap); + rg = row_gatherer_type::create(exec, coll_comm, imap); + } + + void SetUp() override { ASSERT_EQ(comm.size(), 6); } + + template + std::array, 6> create_recv_connections() + { + return {gko::array{exec, {3, 5, 10, 11}}, + gko::array{exec, {0, 1, 7, 12, 13}}, + gko::array{exec, {3, 4, 17}}, + gko::array{exec, {1, 2, 12, 14}}, + gko::array{exec, {4, 5, 9, 10, 15, 16}}, + gko::array{exec, {8, 12, 13, 14}}}; + } + + std::shared_ptr host_exec = exec->get_master(); + std::shared_ptr mpi_exec = + gko::experimental::mpi::requires_host_buffer(exec, comm) ? host_exec + : exec; + std::shared_ptr rg; +}; + +TYPED_TEST_SUITE(RowGatherer, gko::test::IndexTypes, TypenameNameGenerator); + + +TYPED_TEST(RowGatherer, CanApplyAsync) +{ + using Dense = gko::matrix::Dense; + using Vector = gko::experimental::distributed::Vector; + int rank = this->comm.rank(); + auto offset = static_cast(rank * 3); + auto b = Vector::create( + this->exec, this->comm, gko::dim<2>{18, 1}, + gko::initialize({offset, offset + 1, offset + 2}, this->exec)); + auto expected = this->template create_recv_connections()[rank]; + auto x = Vector::create(this->mpi_exec, this->comm, + gko::dim<2>{this->rg->get_size()[0], 1}, + gko::dim<2>{expected.get_size(), 1}); + + auto req = this->rg->apply_async(b, x); + req.wait(); + + auto expected_vec = Vector::create( + this->mpi_exec, this->comm, gko::dim<2>{this->rg->get_size()[0], 1}, + Dense::create(this->mpi_exec, gko::dim<2>{expected.get_size(), 1}, + expected, 1)); + GKO_ASSERT_MTX_NEAR(x->get_local_vector(), expected_vec->get_local_vector(), + 0.0); +} + + +TYPED_TEST(RowGatherer, CanApplyAsyncConsequetively) +{ + using Dense = gko::matrix::Dense; + using Vector = gko::experimental::distributed::Vector; + int rank = this->comm.rank(); + auto offset = static_cast(rank * 3); + auto b = Vector::create( + this->exec, this->comm, gko::dim<2>{18, 1}, + gko::initialize({offset, offset + 1, offset + 2}, this->exec)); + auto expected = this->template create_recv_connections()[rank]; + auto x = Vector::create(this->mpi_exec, this->comm, + gko::dim<2>{this->rg->get_size()[0], 1}, + gko::dim<2>{expected.get_size(), 1}); + + this->rg->apply_async(b, x).wait(); + this->rg->apply_async(b, x).wait(); + + auto expected_vec = Vector::create( + this->mpi_exec, this->comm, gko::dim<2>{this->rg->get_size()[0], 1}, + Dense::create(this->mpi_exec, gko::dim<2>{expected.get_size(), 1}, + expected, 1)); + GKO_ASSERT_MTX_NEAR(x->get_local_vector(), expected_vec->get_local_vector(), + 0.0); +} + + +TYPED_TEST(RowGatherer, CanApplyAsyncWithWorkspace) +{ + using Dense = gko::matrix::Dense; + using Vector = gko::experimental::distributed::Vector; + int rank = this->comm.rank(); + auto offset = static_cast(rank * 3); + auto b = Vector::create( + this->exec, this->comm, gko::dim<2>{18, 1}, + gko::initialize({offset, offset + 1, offset + 2}, this->exec)); + auto expected = this->template create_recv_connections()[rank]; + auto x = Vector::create(this->mpi_exec, this->comm, + gko::dim<2>{this->rg->get_size()[0], 1}, + gko::dim<2>{expected.get_size(), 1}); + gko::array workspace; + + auto req = this->rg->apply_async(b, x, workspace); + req.wait(); + + auto expected_vec = Vector::create( + this->mpi_exec, this->comm, gko::dim<2>{this->rg->get_size()[0], 1}, + Dense::create(this->mpi_exec, gko::dim<2>{expected.get_size(), 1}, + expected, 1)); + GKO_ASSERT_MTX_NEAR(x->get_local_vector(), expected_vec->get_local_vector(), + 0.0); + ASSERT_GT(workspace.get_size(), 0); +} + + +TYPED_TEST(RowGatherer, CanApplyAsyncMultipleTimesWithWorkspace) +{ + using Dense = gko::matrix::Dense; + using Vector = gko::experimental::distributed::Vector; + int rank = this->comm.rank(); + auto offset = static_cast(rank * 3); + auto b1 = Vector::create( + this->exec, this->comm, gko::dim<2>{18, 1}, + gko::initialize({offset, offset + 1, offset + 2}, this->exec)); + auto b2 = gko::clone(b1); + b2->scale(gko::initialize({-1}, this->exec)); + auto expected = this->template create_recv_connections()[rank]; + auto x1 = Vector::create(this->mpi_exec, this->comm, + gko::dim<2>{this->rg->get_size()[0], 1}, + gko::dim<2>{expected.get_size(), 1}); + auto x2 = gko::clone(x1); + gko::array workspace1; + gko::array workspace2; + + auto req1 = this->rg->apply_async(b1, x1, workspace1); + auto req2 = this->rg->apply_async(b2, x2, workspace2); + req1.wait(); + req2.wait(); + + auto expected_vec1 = Vector::create( + this->mpi_exec, this->comm, gko::dim<2>{this->rg->get_size()[0], 1}, + Dense::create(this->mpi_exec, gko::dim<2>{expected.get_size(), 1}, + expected, 1)); + auto expected_vec2 = gko::clone(expected_vec1); + expected_vec2->scale(gko::initialize({-1}, this->exec)); + GKO_ASSERT_MTX_NEAR(x1->get_local_vector(), + expected_vec1->get_local_vector(), 0.0); + GKO_ASSERT_MTX_NEAR(x2->get_local_vector(), + expected_vec2->get_local_vector(), 0.0); +} + + +TYPED_TEST(RowGatherer, CanApplyAsyncWithMultipleColumns) +{ + using Dense = gko::matrix::Dense; + using Vector = gko::experimental::distributed::Vector; + int rank = this->comm.rank(); + auto offset = static_cast(rank * 3); + auto b = Vector::create( + this->exec, this->comm, gko::dim<2>{18, 2}, + gko::initialize({{offset, offset * offset}, + {offset + 1, offset * offset + 1}, + {offset + 2, offset * offset + 2}}, + this->exec)); + gko::array expected[] = { + gko::array{this->mpi_exec, {3, 9, 5, 11, 10, 82, 11, 83}}, + gko::array{this->mpi_exec, + {0, 0, 1, 1, 7, 37, 12, 144, 13, 145}}, + gko::array{this->mpi_exec, {3, 9, 4, 10, 17, 227}}, + gko::array{this->mpi_exec, {1, 1, 2, 2, 12, 144, 14, 146}}, + gko::array{this->mpi_exec, + {4, 10, 5, 11, 9, 81, 10, 82, 15, 225, 16, 226}}, + gko::array{this->mpi_exec, {8, 38, 12, 144, 13, 145, 14, 146}}}; + auto x = Vector::create(this->mpi_exec, this->comm, + gko::dim<2>{this->rg->get_size()[0], 2}, + gko::dim<2>{expected[rank].get_size() / 2, 2}); + + this->rg->apply_async(b, x).wait(); + + auto expected_vec = Vector::create( + this->mpi_exec, this->comm, gko::dim<2>{this->rg->get_size()[0], 2}, + Dense::create(this->mpi_exec, + gko::dim<2>{expected[rank].get_size() / 2, 2}, + expected[rank], 2)); + GKO_ASSERT_MTX_NEAR(x->get_local_vector(), expected_vec->get_local_vector(), + 0.0); +} + + +TYPED_TEST(RowGatherer, ThrowsOnNonMatchingExecutor) +{ + if (this->mpi_exec == this->exec) { + GTEST_SKIP(); + } + + using RowGatherer = typename TestFixture::row_gatherer_type; + using Vector = gko::experimental::distributed::Vector; + auto rg = RowGatherer::create(this->exec, this->comm); + auto b = Vector::create(this->exec, this->comm); + auto x = Vector::create(this->exec, this->comm); + + ASSERT_THROW(rg->apply_async(b, x).wait(), gko::InvalidStateError); +} diff --git a/test/mpi/vector.cpp b/test/mpi/distributed/vector.cpp similarity index 100% rename from test/mpi/vector.cpp rename to test/mpi/distributed/vector.cpp