Skip to content

Commit 5e8c048

Browse files
authored
Support for input vectors bigger that 2GB (#728)
Use of 64-bit API from MPI 4 (or some workaround if not available during compilation)
1 parent 39e3ddc commit 5e8c048

File tree

4 files changed

+99
-47
lines changed

4 files changed

+99
-47
lines changed

benchmarks/gbench/common/sort.cpp

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,29 +15,38 @@ template <rng::forward_range X> void fill_random(X &&x) {
1515
class DRSortFixture : public benchmark::Fixture {
1616
protected:
1717
xhp::distributed_vector<T> *a;
18+
xhp::distributed_vector<T> *vec;
19+
std::vector<T> local_vec;
1820

1921
public:
2022
void SetUp(::benchmark::State &) {
2123
a = new xhp::distributed_vector<T>(default_vector_size);
22-
std::vector<T> local(default_vector_size);
23-
fill_random(local);
24-
xhp::copy(local, rng::begin(*a));
24+
vec = new xhp::distributed_vector<T>(default_vector_size);
25+
local_vec = std::vector<T>(default_vector_size);
26+
fill_random(local_vec);
27+
xhp::copy(local_vec, rng::begin(*a));
2528
}
2629

27-
void TearDown(::benchmark::State &) { delete a; }
30+
void TearDown(::benchmark::State &state) {
31+
// copy back to check if last sort really sorted
32+
xhp::copy(*vec, rng::begin(local_vec));
33+
delete a;
34+
delete vec;
35+
36+
if (!rng::is_sorted(local_vec)) {
37+
state.SkipWithError("mhp sort did not sort the vector");
38+
}
39+
}
2840
};
2941

3042
BENCHMARK_DEFINE_F(DRSortFixture, Sort_DR)(benchmark::State &state) {
3143
Stats stats(state, sizeof(T) * a->size());
32-
xhp::distributed_vector<T> vec(a->size());
3344
for (auto _ : state) {
3445
state.PauseTiming();
35-
xhp::copy(*a, rng::begin(vec));
46+
xhp::copy(*a, rng::begin(*vec));
3647
stats.rep();
3748
state.ResumeTiming();
38-
39-
// sort not implemented in mhp yet
40-
xhp::sort(vec);
49+
xhp::sort(*vec);
4150
}
4251
}
4352

include/dr/detail/communicator.hpp

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44

55
#pragma once
66

7+
#define MPI_SUPPORTS_RGET_C \
8+
(MPI_VERSION >= 4) || \
9+
(defined(I_MPI_NUMVERSION) && (I_MPI_NUMVERSION > 20211200000))
10+
711
namespace dr {
812

913
class communicator {
@@ -193,10 +197,10 @@ class communicator {
193197
assert(rng::size(recvcnt) == size_);
194198
assert(rng::size(recvdsp) == size_);
195199

196-
std::vector<int> _sendcnt(size_);
197-
std::vector<int> _senddsp(size_);
198-
std::vector<int> _recvcnt(size_);
199-
std::vector<int> _recvdsp(size_);
200+
std::vector<MPI_Count> _sendcnt(size_);
201+
std::vector<MPI_Aint> _senddsp(size_);
202+
std::vector<MPI_Count> _recvcnt(size_);
203+
std::vector<MPI_Aint> _recvdsp(size_);
200204

201205
rng::transform(sendcnt, _sendcnt.begin(),
202206
[](auto e) { return e * sizeof(valT); });
@@ -207,9 +211,10 @@ class communicator {
207211
rng::transform(recvdsp, _recvdsp.begin(),
208212
[](auto e) { return e * sizeof(valT); });
209213

210-
MPI_Alltoallv(rng::data(sendbuf), rng::data(_sendcnt), rng::data(_senddsp),
211-
MPI_BYTE, rng::data(recvbuf), rng::data(_recvcnt),
212-
rng::data(_recvdsp), MPI_BYTE, mpi_comm_);
214+
MPI_Alltoallv_c(rng::data(sendbuf), rng::data(_sendcnt),
215+
rng::data(_senddsp), MPI_BYTE, rng::data(recvbuf),
216+
rng::data(_recvcnt), rng::data(_recvdsp), MPI_BYTE,
217+
mpi_comm_);
213218
}
214219

215220
bool operator==(const communicator &other) const {
@@ -254,7 +259,15 @@ class rma_window {
254259
std::size_t disp) const {
255260
DRLOG("MPI comm get:: ({}:{}:{})", rank, disp, size);
256261
MPI_Request request;
262+
#if (MPI_VERSION >= 4) || \
263+
(defined(I_MPI_NUMVERSION) && (I_MPI_NUMVERSION > 20211200000))
264+
MPI_Rget_c(dst, size, MPI_BYTE, rank, disp, size, MPI_BYTE, win_, &request);
265+
#else
266+
assert(
267+
size <= (std::size_t)INT_MAX &&
268+
"MPI API requires origin_count to be positive signed 32-bit integer");
257269
MPI_Rget(dst, size, MPI_BYTE, rank, disp, size, MPI_BYTE, win_, &request);
270+
#endif
258271
MPI_Wait(&request, MPI_STATUS_IGNORE);
259272
}
260273

@@ -266,7 +279,18 @@ class rma_window {
266279
std::size_t disp) const {
267280
DRLOG("MPI comm put:: ({}:{}:{})", rank, disp, size);
268281
MPI_Request request;
282+
283+
#if (MPI_VERSION >= 4) || \
284+
(defined(I_MPI_NUMVERSION) && (I_MPI_NUMVERSION > 20211200000))
285+
MPI_Rput_c(src, size, MPI_BYTE, rank, disp, size, MPI_BYTE, win_, &request);
286+
#else
287+
// MPI_Rput origin_count is 32-bit signed int - check range
288+
assert(
289+
size <= (std::size_t)INT_MAX &&
290+
"MPI API requires origin_count to be positive signed 32-bit integer");
269291
MPI_Rput(src, size, MPI_BYTE, rank, disp, size, MPI_BYTE, win_, &request);
292+
#endif
293+
270294
DRLOG("MPI comm wait:: ({}:{}:{})", rank, disp, size);
271295
MPI_Wait(&request, MPI_STATUS_IGNORE);
272296
DRLOG("MPI comm wait finished:: ({}:{}:{})", rank, disp, size);

include/dr/mhp/algorithms/sort.hpp

Lines changed: 23 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,6 @@ template <typename R, typename Compare> void local_sort(R &r, Compare &&comp) {
139139
template <typename Compare>
140140
void _find_split_idx(std::size_t &vidx, std::size_t &segidx, Compare &&comp,
141141
auto &ls, auto &vec_v, auto &vec_i, auto &vec_s) {
142-
143142
while (vidx < default_comm().size() && segidx < rng::size(ls)) {
144143
if (comp(vec_v[vidx - 1], ls[segidx])) {
145144
vec_i[vidx] = segidx;
@@ -231,26 +230,26 @@ void splitters(Seg &lsegment, Compare &&comp,
231230
}
232231

233232
template <typename valT>
234-
void shift_data(const int shift_left, const int shift_right,
233+
void shift_data(const int64_t shift_left, const int64_t shift_right,
235234
buffer<valT> &vec_recvdata, buffer<valT> &vec_left,
236235
buffer<valT> &vec_right) {
237-
238236
const std::size_t _comm_rank = default_comm().rank();
239237

240238
MPI_Request req_l, req_r;
241239
MPI_Status stat_l, stat_r;
242240

243-
assert(static_cast<int>(rng::size(vec_left)) == std::max(0, shift_left));
244-
assert(static_cast<int>(rng::size(vec_right)) == std::max(0, shift_right));
241+
assert(static_cast<int64_t>(rng::size(vec_left)) == std::max(0L, shift_left));
242+
assert(static_cast<int64_t>(rng::size(vec_right)) ==
243+
std::max(0L, shift_right));
245244

246-
if (static_cast<int>(rng::size(vec_recvdata)) < -shift_left) {
245+
if (static_cast<int64_t>(rng::size(vec_recvdata)) < -shift_left) {
247246
// Too little data in recv buffer to shift left - first get from right,
248247
// then send left
249248
DRLOG("Get from right first, recvdata size {} shift left {}",
250249
rng::size(vec_recvdata), shift_left);
251250
// ** This will never happen, because values eq to split go left **
252251
assert(false);
253-
} else if (static_cast<int>(rng::size(vec_recvdata)) < -shift_right) {
252+
} else if (static_cast<int64_t>(rng::size(vec_recvdata)) < -shift_right) {
254253
// Too little data in buffer to shift right - first get from left, then
255254
// send right
256255
assert(shift_left > 0);
@@ -280,26 +279,23 @@ void shift_data(const int shift_left, const int shift_right,
280279
MPI_Wait(&req_r, &stat_r);
281280
} else {
282281
// enough data in recv buffer
283-
284282
if (shift_left < 0) {
285283
default_comm().isend(rng::data(vec_recvdata), -shift_left, _comm_rank - 1,
286284
&req_l);
287285
} else if (shift_left > 0) {
288-
assert(shift_left == static_cast<int>(rng::size(vec_left)));
286+
assert(shift_left == static_cast<int64_t>(rng::size(vec_left)));
289287
default_comm().irecv(rng::data(vec_left), rng::size(vec_left),
290288
_comm_rank - 1, &req_l);
291289
}
292-
293290
if (shift_right > 0) {
294-
assert(shift_right == static_cast<int>(rng::size(vec_right)));
291+
assert(shift_right == static_cast<int64_t>(rng::size(vec_right)));
295292
default_comm().irecv(rng::data(vec_right), rng::size(vec_right),
296293
_comm_rank + 1, &req_r);
297294
} else if (shift_right < 0) {
298295
default_comm().isend(rng::data(vec_recvdata) + rng::size(vec_recvdata) +
299296
shift_right,
300297
-shift_right, _comm_rank + 1, &req_r);
301298
}
302-
303299
if (shift_left != 0)
304300
MPI_Wait(&req_l, &stat_l);
305301
if (shift_right != 0)
@@ -308,11 +304,11 @@ void shift_data(const int shift_left, const int shift_right,
308304
}
309305

310306
template <typename valT>
311-
void copy_results(auto &lsegment, const int shift_left, const int shift_right,
312-
buffer<valT> &vec_recvdata, buffer<valT> &vec_left,
313-
buffer<valT> &vec_right) {
314-
const std::size_t invalidate_left = std::max(-shift_left, 0);
315-
const std::size_t invalidate_right = std::max(-shift_right, 0);
307+
void copy_results(auto &lsegment, const int64_t shift_left,
308+
const int64_t shift_right, buffer<valT> &vec_recvdata,
309+
buffer<valT> &vec_left, buffer<valT> &vec_right) {
310+
const std::size_t invalidate_left = std::max(-shift_left, 0L);
311+
const std::size_t invalidate_right = std::max(-shift_right, 0L);
316312

317313
const std::size_t size_l = rng::size(vec_left);
318314
const std::size_t size_r = rng::size(vec_right);
@@ -355,7 +351,6 @@ void copy_results(auto &lsegment, const int shift_left, const int shift_right,
355351

356352
template <dr::distributed_range R, typename Compare>
357353
void dist_sort(R &r, Compare &&comp) {
358-
359354
using valT = typename R::value_type;
360355

361356
const std::size_t _comm_rank = default_comm().rank();
@@ -370,6 +365,8 @@ void dist_sort(R &r, Compare &&comp) {
370365
std::vector<std::size_t> vec_recv_elems(_comm_size, 0);
371366
std::size_t _total_elems = 0;
372367

368+
DRLOG("Rank {}: Dist sort, local segment size {}", default_comm().rank(),
369+
rng::size(lsegment));
373370
__detail::local_sort(lsegment, comp);
374371

375372
/* find splitting values - limits of areas to send to other processes */
@@ -383,12 +380,8 @@ void dist_sort(R &r, Compare &&comp) {
383380

384381
/* send and receive data belonging to each node, then redistribute
385382
* data to achieve size of data equal to size of local segment */
386-
387-
/* TODO: all_gather() below can be asynchronous - to be verified in CI
388-
* (currently hangs in CI unit tests, but going well when started manually)
389-
*/
383+
/* async all_gather causes problems on some systems */
390384
// MPI_Request req_recvelems;
391-
// default_comm().i_all_gather(_recv_elems, vec_recv_elems, &req_recvelems);
392385
default_comm().all_gather(_recv_elems, vec_recv_elems);
393386

394387
/* buffer for received data */
@@ -402,13 +395,12 @@ void dist_sort(R &r, Compare &&comp) {
402395
/* TODO: vec recvdata is partially sorted, implementation of merge on GPU is
403396
* desirable */
404397
__detail::local_sort(vec_recvdata, comp);
405-
406398
// MPI_Wait(&req_recvelems, MPI_STATUS_IGNORE);
407399

408400
_total_elems = std::reduce(vec_recv_elems.begin(), vec_recv_elems.end());
409401

410402
/* prepare data for shift to neighboring processes */
411-
std::vector<int> vec_shift(_comm_size - 1);
403+
std::vector<int64_t> vec_shift(_comm_size - 1);
412404

413405
const auto desired_elems_num = (_total_elems + _comm_size - 1) / _comm_size;
414406

@@ -417,12 +409,12 @@ void dist_sort(R &r, Compare &&comp) {
417409
vec_shift[_i] = vec_shift[_i - 1] + desired_elems_num - vec_recv_elems[_i];
418410
}
419411

420-
const int shift_left = _comm_rank == 0 ? 0 : -vec_shift[_comm_rank - 1];
421-
const int shift_right =
412+
const int64_t shift_left = _comm_rank == 0 ? 0 : -vec_shift[_comm_rank - 1];
413+
const int64_t shift_right =
422414
_comm_rank == _comm_size - 1 ? 0 : vec_shift[_comm_rank];
423415

424-
buffer<valT> vec_left(std::max(shift_left, 0));
425-
buffer<valT> vec_right(std::max(shift_right, 0));
416+
buffer<valT> vec_left(std::max(shift_left, 0L));
417+
buffer<valT> vec_right(std::max(shift_right, 0L));
426418

427419
/* shift data if necessary, to have exactly the number of elements equal to
428420
* lsegment size */
@@ -432,7 +424,6 @@ void dist_sort(R &r, Compare &&comp) {
432424
/* copy results to distributed vector's local segment */
433425
__detail::copy_results<valT>(lsegment, shift_left, shift_right, vec_recvdata,
434426
vec_left, vec_right);
435-
436427
} // __detail::dist_sort
437428

438429
} // namespace __detail
@@ -446,14 +437,15 @@ void sort(R &r, Compare &&comp = Compare()) {
446437
std::size_t _comm_size = default_comm().size(); // dr-style ignore
447438

448439
if (_comm_size == 1) {
440+
DRLOG("mhp::sort() - one node only");
449441
auto &&lsegment = local_segment(r);
450442
__detail::local_sort(lsegment, comp);
451443

452444
} else if (rng::size(r) <= (_comm_size - 1) * (_comm_size - 1)) {
453445
/* Distributed vector of size <= (comm_size-1) * (comm_size-1) may have
454446
* 0-size local segments. It is also small enough to prefer sequential sort
455447
*/
456-
DRLOG("mhp::sort() - local sort");
448+
DRLOG("mhp::sort() - local sort on node 0");
457449

458450
std::vector<valT> vec_recvdata(rng::size(r));
459451
dr::mhp::copy(0, r, rng::begin(vec_recvdata));

include/dr/mhp/containers/distributed_vector.hpp

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,42 @@ class MpiBackend {
3838
"segm_offset:{}, size:{}, peer:{})",
3939
dst, offset, datalen, segment_index);
4040

41+
#if (MPI_VERSION >= 4) || \
42+
(defined(I_MPI_NUMVERSION) && (I_MPI_NUMVERSION > 20211200000))
43+
// 64-bit API inside
4144
win_.get(dst, datalen, segment_index, offset);
45+
#else
46+
for (std::size_t remainder = datalen, off = 0UL; remainder > 0;) {
47+
std::size_t s = std::min(remainder, (std::size_t)INT_MAX);
48+
DRLOG("{}:{} win_.get total {} now {} bytes at off {}, dst offset {}",
49+
default_comm().rank(), __LINE__, datalen, s, off, offset + off);
50+
win_.get((uint8_t *)dst + off, s, segment_index, offset + off);
51+
off += s;
52+
remainder -= s;
53+
}
54+
#endif
4255
}
4356

4457
void putmem(void const *src, std::size_t offset, std::size_t datalen,
4558
int segment_index) {
4659
DRLOG("calling MPI put(segm_offset:{}, "
4760
"src:{}, size:{}, peer:{})",
4861
offset, src, datalen, segment_index);
62+
63+
#if (MPI_VERSION >= 4) || \
64+
(defined(I_MPI_NUMVERSION) && (I_MPI_NUMVERSION > 20211200000))
65+
// 64-bit API inside
4966
win_.put(src, datalen, segment_index, offset);
67+
#else
68+
for (std::size_t remainder = datalen, off = 0UL; remainder > 0;) {
69+
std::size_t s = std::min(remainder, (std::size_t)INT_MAX);
70+
DRLOG("{}:{} win_.put {} bytes at off {}, dst offset {}",
71+
default_comm().rank(), __LINE__, s, off, offset + off);
72+
win_.put((uint8_t *)src + off, s, segment_index, offset + off);
73+
off += s;
74+
remainder -= s;
75+
}
76+
#endif
5077
}
5178

5279
std::size_t getrank() { return win_.communicator().rank(); }

0 commit comments

Comments
 (0)