Skip to content

Commit 4828663

Browse files
authored
Merge pull request #9422 from WiltonLoch/master
Adding a new algorithm for allgather and allgatherv
2 parents 18579b7 + 7d9aede commit 4828663

File tree

5 files changed

+304
-2
lines changed

5 files changed

+304
-2
lines changed

ompi/mca/coll/base/coll_base_allgather.c

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323

2424
#include "ompi_config.h"
2525

26+
#include "math.h"
27+
2628
#include "mpi.h"
2729
#include "opal/util/bit_ops.h"
2830
#include "ompi/constants.h"
@@ -338,7 +340,145 @@ ompi_coll_base_allgather_intra_recursivedoubling(const void *sbuf, int scount,
338340
return err;
339341
}
340342

343+
/*
344+
* ompi_coll_base_allgather_intra_sparbit
345+
*
346+
* Function: allgather using O(log(N)) steps.
347+
* Accepts: Same arguments as MPI_Allgather
348+
* Returns: MPI_SUCCESS or error code
349+
*
350+
* Description: Proposal of an allgather algorithm similar to Bruck but with inverted distances
351+
* and non-decreasing exchanged data sizes. Described in "Sparbit: a new
352+
* logarithmic-cost and data locality-aware MPI Allgather algorithm".
353+
*
354+
* Memory requirements:
355+
* Additional memory for N requests.
356+
*
357+
* Example on 6 nodes, with l representing the highest power of two smaller than N, in this case l =
358+
* 4 (more details can be found on the paper):
359+
* Initial state
360+
* # 0 1 2 3 4 5
361+
* [0] [ ] [ ] [ ] [ ] [ ]
362+
* [ ] [1] [ ] [ ] [ ] [ ]
363+
* [ ] [ ] [2] [ ] [ ] [ ]
364+
* [ ] [ ] [ ] [3] [ ] [ ]
365+
* [ ] [ ] [ ] [ ] [4] [ ]
366+
* [ ] [ ] [ ] [ ] [ ] [5]
367+
* Step 0: Each process sends its own block to process r + l and receives another from r - l.
368+
* # 0 1 2 3 4 5
369+
* [0] [ ] [ ] [ ] [0] [ ]
370+
* [ ] [1] [ ] [ ] [ ] [1]
371+
* [2] [ ] [2] [ ] [ ] [ ]
372+
* [ ] [3] [ ] [3] [ ] [ ]
373+
* [ ] [ ] [4] [ ] [4] [ ]
374+
* [ ] [ ] [ ] [5] [ ] [5]
375+
* Step 1: Each process sends its own block to process r + l/2 and receives another from r - l/2.
376+
* The block received on the previous step is ignored to avoid a future double-write.
377+
* # 0 1 2 3 4 5
378+
* [0] [ ] [0] [ ] [0] [ ]
379+
* [ ] [1] [ ] [1] [ ] [1]
380+
* [2] [ ] [2] [ ] [2] [ ]
381+
* [ ] [3] [ ] [3] [ ] [3]
382+
* [4] [ ] [4] [ ] [4] [ ]
383+
* [ ] [5] [ ] [5] [ ] [5]
384+
* Step 1: Each process sends all the data it has (3 blocks) to process r + l/4 and similarly
385+
* receives all the data from process r - l/4.
386+
* # 0 1 2 3 4 5
387+
* [0] [0] [0] [0] [0] [0]
388+
* [1] [1] [1] [1] [1] [1]
389+
* [2] [2] [2] [2] [2] [2]
390+
* [3] [3] [3] [3] [3] [3]
391+
* [4] [4] [4] [4] [4] [4]
392+
* [5] [5] [5] [5] [5] [5]
393+
*/
341394

395+
int ompi_coll_base_allgather_intra_sparbit(const void *sbuf, int scount,
396+
struct ompi_datatype_t *sdtype,
397+
void* rbuf, int rcount,
398+
struct ompi_datatype_t *rdtype,
399+
struct ompi_communicator_t *comm,
400+
mca_coll_base_module_t *module)
401+
{
402+
/* ################# VARIABLE DECLARATION, BUFFER CREATION AND PREPARATION FOR THE ALGORITHM ######################## */
403+
404+
/* list of variable declaration */
405+
int rank = 0, comm_size = 0, comm_log = 0, exclusion = 0, data_expected = 1, transfer_count = 0;
406+
int sendto, recvfrom, send_disp, recv_disp;
407+
uint32_t last_ignore, ignore_steps, distance = 1;
408+
409+
int err = 0;
410+
int line = -1;
411+
412+
ptrdiff_t rlb, rext;
413+
414+
char *tmpsend = NULL, *tmprecv = NULL;
415+
416+
MPI_Request *requests = NULL;
417+
418+
/* algorithm choice information printing */
419+
OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
420+
"coll:base:allgather_intra_sparbit rank %d", rank));
421+
422+
comm_size = ompi_comm_size(comm);
423+
rank = ompi_comm_rank(comm);
424+
425+
err = ompi_datatype_get_extent(rdtype, &rlb, &rext);
426+
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
427+
428+
/* if the MPI_IN_PLACE condition is not set, copy the send buffer to the receive buffer to perform the sends (all the data is extracted and forwarded from the recv buffer)*/
429+
/* tmprecv and tmpsend are used as abstract pointers to simplify send and receive buffer choice */
430+
tmprecv = (char *) rbuf;
431+
if(MPI_IN_PLACE != sbuf){
432+
tmpsend = (char *) sbuf;
433+
err = ompi_datatype_sndrcv(tmpsend, scount, sdtype, tmprecv + (ptrdiff_t) rank * rcount * rext, rcount, rdtype);
434+
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
435+
}
436+
tmpsend = tmprecv;
437+
438+
requests = (MPI_Request *) malloc(comm_size * sizeof(MPI_Request));
439+
440+
/* ################# ALGORITHM LOGIC ######################## */
441+
442+
/* calculate log2 of the total process count */
443+
comm_log = ceil(log(comm_size)/log(2));
444+
distance <<= comm_log - 1;
445+
446+
last_ignore = __builtin_ctz(comm_size);
447+
ignore_steps = (~((uint32_t) comm_size >> last_ignore) | 1) << last_ignore;
448+
449+
/* perform the parallel binomial tree distribution steps */
450+
for (int i = 0; i < comm_log; ++i) {
451+
sendto = (rank + distance) % comm_size;
452+
recvfrom = (rank - distance + comm_size) % comm_size;
453+
exclusion = (distance & ignore_steps) == distance;
454+
455+
for (transfer_count = 0; transfer_count < data_expected - exclusion; transfer_count++) {
456+
send_disp = (rank - 2 * transfer_count * distance + comm_size) % comm_size;
457+
recv_disp = (rank - (2 * transfer_count + 1) * distance + comm_size) % comm_size;
458+
459+
/* Since each process sends several non-contiguos blocks of data, each block sent (and therefore each send and recv call) needs a different tag. */
460+
/* As base OpenMPI only provides one tag for allgather, we are forced to use a tag space from other components in the send and recv calls */
461+
MCA_PML_CALL(isend(tmpsend + (ptrdiff_t) send_disp * scount * rext, scount, rdtype, sendto, MCA_COLL_BASE_TAG_HCOLL_BASE - send_disp, MCA_PML_BASE_SEND_STANDARD, comm, requests + transfer_count));
462+
MCA_PML_CALL(irecv(tmprecv + (ptrdiff_t) recv_disp * rcount * rext, rcount, rdtype, recvfrom, MCA_COLL_BASE_TAG_HCOLL_BASE - recv_disp, comm, requests + data_expected - exclusion + transfer_count));
463+
}
464+
ompi_request_wait_all(transfer_count * 2, requests, MPI_STATUSES_IGNORE);
465+
466+
distance >>= 1;
467+
/* calculates the data expected for the next step, based on the current number of blocks and eventual exclusions */
468+
data_expected = (data_expected << 1) - exclusion;
469+
exclusion = 0;
470+
}
471+
472+
free(requests);
473+
474+
return OMPI_SUCCESS;
475+
476+
err_hndl:
477+
OPAL_OUTPUT((ompi_coll_base_framework.framework_output, "%s:%4d\tError occurred %d, rank %2d",
478+
__FILE__, line, err, rank));
479+
(void)line; // silence compiler warning
480+
return err;
481+
}
342482

343483
/*
344484
* ompi_coll_base_allgather_intra_ring

ompi/mca/coll/base/coll_base_allgatherv.c

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525

2626
#include "ompi_config.h"
2727

28+
#include "math.h"
29+
2830
#include "mpi.h"
2931
#include "ompi/constants.h"
3032
#include "ompi/datatype/ompi_datatype.h"
@@ -202,6 +204,154 @@ int ompi_coll_base_allgatherv_intra_bruck(const void *sbuf, int scount,
202204
return err;
203205
}
204206

207+
/*
208+
* ompi_coll_base_allgather_intra_sparbit
209+
*
210+
* Function: allgather using O(log(N)) steps.
211+
* Accepts: Same arguments as MPI_Allgather
212+
* Returns: MPI_SUCCESS or error code
213+
*
214+
* Description: Proposal of an allgather algorithm similar to Bruck but with inverted distances
215+
* and non-decreasing exchanged data sizes. Described in "Sparbit: a new
216+
* logarithmic-cost and data locality-aware MPI Allgather algorithm".
217+
*
218+
* Memory requirements:
219+
* Additional memory for N requests.
220+
*
221+
* Example on 6 nodes, with l representing the highest power of two smaller than N, in this case l =
222+
* 4 (more details can be found on the paper):
223+
* Initial state
224+
* # 0 1 2 3 4 5
225+
* [0] [ ] [ ] [ ] [ ] [ ]
226+
* [ ] [1] [ ] [ ] [ ] [ ]
227+
* [ ] [ ] [2] [ ] [ ] [ ]
228+
* [ ] [ ] [ ] [3] [ ] [ ]
229+
* [ ] [ ] [ ] [ ] [4] [ ]
230+
* [ ] [ ] [ ] [ ] [ ] [5]
231+
* Step 0: Each process sends its own block to process r + l and receives another from r - l.
232+
* # 0 1 2 3 4 5
233+
* [0] [ ] [ ] [ ] [0] [ ]
234+
* [ ] [1] [ ] [ ] [ ] [1]
235+
* [2] [ ] [2] [ ] [ ] [ ]
236+
* [ ] [3] [ ] [3] [ ] [ ]
237+
* [ ] [ ] [4] [ ] [4] [ ]
238+
* [ ] [ ] [ ] [5] [ ] [5]
239+
* Step 1: Each process sends its own block to process r + l/2 and receives another from r - l/2.
240+
* The block received on the previous step is ignored to avoid a future double-write.
241+
* # 0 1 2 3 4 5
242+
* [0] [ ] [0] [ ] [0] [ ]
243+
* [ ] [1] [ ] [1] [ ] [1]
244+
* [2] [ ] [2] [ ] [2] [ ]
245+
* [ ] [3] [ ] [3] [ ] [3]
246+
* [4] [ ] [4] [ ] [4] [ ]
247+
* [ ] [5] [ ] [5] [ ] [5]
248+
* Step 1: Each process sends all the data it has (3 blocks) to process r + l/4 and similarly
249+
* receives all the data from process r - l/4.
250+
* # 0 1 2 3 4 5
251+
* [0] [0] [0] [0] [0] [0]
252+
* [1] [1] [1] [1] [1] [1]
253+
* [2] [2] [2] [2] [2] [2]
254+
* [3] [3] [3] [3] [3] [3]
255+
* [4] [4] [4] [4] [4] [4]
256+
* [5] [5] [5] [5] [5] [5]
257+
*/
258+
259+
int ompi_coll_base_allgatherv_intra_sparbit(const void *sbuf, int scount,
260+
struct ompi_datatype_t *sdtype,
261+
void* rbuf, const int *rcounts,
262+
const int *rdispls,
263+
struct ompi_datatype_t *rdtype,
264+
struct ompi_communicator_t *comm,
265+
mca_coll_base_module_t *module)
266+
{
267+
/* ################# VARIABLE DECLARATION, BUFFER CREATION AND PREPARATION FOR THE ALGORITHM ######################## */
268+
269+
/* list of variable declaration */
270+
int rank = 0, comm_size = 0, comm_log = 0, exclusion = 0;
271+
int data_expected = 1, transfer_count = 0, step_requests = 0;
272+
int sendto, recvfrom, send_disp, recv_disp;
273+
uint32_t last_ignore, ignore_steps, distance = 1;
274+
275+
int err = 0;
276+
int line = -1;
277+
278+
ptrdiff_t rlb, rext;
279+
280+
char *tmpsend = NULL, *tmprecv = NULL;
281+
282+
MPI_Request *requests = NULL;
283+
284+
/* printf("utilizando o allgatherv novo!!\n"); */
285+
286+
/* algorithm choice information printing */
287+
OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
288+
"coll:sparbit:allgather_sync_intra rank %d", rank));
289+
290+
comm_size = ompi_comm_size(comm);
291+
rank = ompi_comm_rank(comm);
292+
293+
err = ompi_datatype_get_extent(rdtype, &rlb, &rext);
294+
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
295+
296+
/* if the MPI_IN_PLACE condition is not set, copy the send buffer to the receive buffer to perform the sends (all the data is extracted and forwarded from the recv buffer)*/
297+
/* tmprecv and tmpsend are used as abstract pointers to simplify send and receive buffer choice */
298+
tmprecv = (char *) rbuf;
299+
if(MPI_IN_PLACE != sbuf){
300+
tmpsend = (char *) sbuf;
301+
err = ompi_datatype_sndrcv(tmpsend, scount, sdtype, tmprecv + (ptrdiff_t) rdispls[rank] * rext, scount, rdtype);
302+
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
303+
}
304+
tmpsend = tmprecv;
305+
306+
requests = (MPI_Request *) malloc(comm_size * sizeof(MPI_Request));
307+
308+
/* ################# ALGORITHM LOGIC ######################## */
309+
310+
/* calculate log2 of the total process count */
311+
comm_log = ceil(log(comm_size)/log(2));
312+
distance <<= comm_log - 1;
313+
314+
last_ignore = __builtin_ctz(comm_size);
315+
ignore_steps = (~((uint32_t) comm_size >> last_ignore) | 1) << last_ignore;
316+
317+
/* perform the parallel binomial tree distribution steps */
318+
for (int i = 0; i < comm_log; ++i) {
319+
sendto = (rank + distance) % comm_size;
320+
recvfrom = (rank - distance + comm_size) % comm_size;
321+
exclusion = (distance & ignore_steps) == distance;
322+
323+
for (transfer_count = 0; transfer_count < data_expected - exclusion; transfer_count++) {
324+
send_disp = (rank - 2 * transfer_count * distance + comm_size) % comm_size;
325+
recv_disp = (rank - (2 * transfer_count + 1) * distance + comm_size) % comm_size;
326+
327+
/* Since each process sends several non-contiguos blocks of data to the same destination,
328+
* each block sent (and therefore each send and recv call) needs a different tag. */
329+
/* As base OpenMPI only provides one tag for allgather, we are forced to use a tag space
330+
* from other components in the send and recv calls */
331+
if(rcounts[send_disp] > 0)
332+
MCA_PML_CALL(isend(tmpsend + (ptrdiff_t) rdispls[send_disp] * rext, rcounts[send_disp], rdtype, sendto, MCA_COLL_BASE_TAG_HCOLL_BASE - send_disp, MCA_PML_BASE_SEND_STANDARD, comm, requests + step_requests++));
333+
if(rcounts[recv_disp] > 0)
334+
MCA_PML_CALL(irecv(tmprecv + (ptrdiff_t) rdispls[recv_disp] * rext, rcounts[recv_disp], rdtype, recvfrom, MCA_COLL_BASE_TAG_HCOLL_BASE - recv_disp, comm, requests + step_requests++));
335+
}
336+
ompi_request_wait_all(step_requests, requests, MPI_STATUSES_IGNORE);
337+
338+
distance >>= 1;
339+
/* calculates the data expected for the next step, based on the current number of blocks and eventual exclusions */
340+
data_expected = (data_expected << 1) - exclusion;
341+
exclusion = step_requests = 0;
342+
}
343+
344+
free(requests);
345+
346+
return OMPI_SUCCESS;
347+
348+
err_hndl:
349+
OPAL_OUTPUT((ompi_coll_base_framework.framework_output, "%s:%4d\tError occurred %d, rank %2d",
350+
__FILE__, line, err, rank));
351+
(void)line; // silence compiler warning
352+
return err;
353+
354+
}
205355

206356
/*
207357
* ompi_coll_base_allgatherv_intra_ring

ompi/mca/coll/base/coll_base_functions.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,13 +189,15 @@ BEGIN_C_DECLS
189189
/* All Gather */
190190
int ompi_coll_base_allgather_intra_bruck(ALLGATHER_ARGS);
191191
int ompi_coll_base_allgather_intra_recursivedoubling(ALLGATHER_ARGS);
192+
int ompi_coll_base_allgather_intra_sparbit(ALLGATHER_ARGS);
192193
int ompi_coll_base_allgather_intra_ring(ALLGATHER_ARGS);
193194
int ompi_coll_base_allgather_intra_neighborexchange(ALLGATHER_ARGS);
194195
int ompi_coll_base_allgather_intra_basic_linear(ALLGATHER_ARGS);
195196
int ompi_coll_base_allgather_intra_two_procs(ALLGATHER_ARGS);
196197

197198
/* All GatherV */
198199
int ompi_coll_base_allgatherv_intra_bruck(ALLGATHERV_ARGS);
200+
int ompi_coll_base_allgatherv_intra_sparbit(ALLGATHERV_ARGS);
199201
int ompi_coll_base_allgatherv_intra_ring(ALLGATHERV_ARGS);
200202
int ompi_coll_base_allgatherv_intra_neighborexchange(ALLGATHERV_ARGS);
201203
int ompi_coll_base_allgatherv_intra_basic_default(ALLGATHERV_ARGS);

ompi/mca/coll/tuned/coll_tuned_allgather_decision.c

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ static const mca_base_var_enum_value_t allgather_algorithms[] = {
4040
{4, "ring"},
4141
{5, "neighbor"},
4242
{6, "two_proc"},
43+
{7, "sparbit"},
4344
{0, NULL}
4445
};
4546

@@ -78,7 +79,7 @@ ompi_coll_tuned_allgather_intra_check_forced_init(coll_tuned_force_algorithm_mca
7879
mca_param_indices->algorithm_param_index =
7980
mca_base_component_var_register(&mca_coll_tuned_component.super.collm_version,
8081
"allgather_algorithm",
81-
"Which allgather algorithm is used. Can be locked down to choice of: 0 ignore, 1 basic linear, 2 bruck, 3 recursive doubling, 4 ring, 5 neighbor exchange, 6: two proc only. "
82+
"Which allgather algorithm is used. Can be locked down to choice of: 0 ignore, 1 basic linear, 2 bruck, 3 recursive doubling, 4 ring, 5 neighbor exchange, 6: two proc only, 7: sparbit. "
8283
"Only relevant if coll_tuned_use_dynamic_rules is true.",
8384
MCA_BASE_VAR_TYPE_INT, new_enum, 0, MCA_BASE_VAR_FLAG_SETTABLE,
8485
OPAL_INFO_LVL_5,
@@ -163,6 +164,10 @@ int ompi_coll_tuned_allgather_intra_do_this(const void *sbuf, int scount,
163164
return ompi_coll_base_allgather_intra_two_procs(sbuf, scount, sdtype,
164165
rbuf, rcount, rdtype,
165166
comm, module);
167+
case (7):
168+
return ompi_coll_base_allgather_intra_sparbit(sbuf, scount, sdtype,
169+
rbuf, rcount, rdtype,
170+
comm, module);
166171
} /* switch */
167172
OPAL_OUTPUT((ompi_coll_tuned_stream,
168173
"coll:tuned:allgather_intra_do_this attempt to select algorithm %d when only 0-%d is valid?",

ompi/mca/coll/tuned/coll_tuned_allgatherv_decision.c

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ static const mca_base_var_enum_value_t allgatherv_algorithms[] = {
3939
{3, "ring"},
4040
{4, "neighbor"},
4141
{5, "two_proc"},
42+
{6, "sparbit"},
4243
{0, NULL}
4344
};
4445

@@ -77,7 +78,7 @@ ompi_coll_tuned_allgatherv_intra_check_forced_init(coll_tuned_force_algorithm_mc
7778
mca_param_indices->algorithm_param_index =
7879
mca_base_component_var_register(&mca_coll_tuned_component.super.collm_version,
7980
"allgatherv_algorithm",
80-
"Which allgatherv algorithm is used. Can be locked down to choice of: 0 ignore, 1 default (allgathervv + bcast), 2 bruck, 3 ring, 4 neighbor exchange, 5: two proc only. "
81+
"Which allgatherv algorithm is used. Can be locked down to choice of: 0 ignore, 1 default (allgathervv + bcast), 2 bruck, 3 ring, 4 neighbor exchange, 5: two proc only, 6: sparbit. "
8182
"Only relevant if coll_tuned_use_dynamic_rules is true.",
8283
MCA_BASE_VAR_TYPE_INT, new_enum, 0, MCA_BASE_VAR_FLAG_SETTABLE,
8384
OPAL_INFO_LVL_5,
@@ -160,6 +161,10 @@ int ompi_coll_tuned_allgatherv_intra_do_this(const void *sbuf, int scount,
160161
return ompi_coll_base_allgatherv_intra_two_procs(sbuf, scount, sdtype,
161162
rbuf, rcounts, rdispls, rdtype,
162163
comm, module);
164+
case (6):
165+
return ompi_coll_base_allgatherv_intra_sparbit(sbuf, scount, sdtype,
166+
rbuf, rcounts, rdispls, rdtype,
167+
comm, module);
163168
} /* switch */
164169
OPAL_OUTPUT((ompi_coll_tuned_stream,
165170
"coll:tuned:allgatherv_intra_do_this attempt to select algorithm %d when only 0-%d is valid?",

0 commit comments

Comments
 (0)