Skip to content

Commit 7017a07

Browse files
Merge pull request #8462 from EmmanuelBRELLE/patch_Han_Bull_2020H2-master-rebased
master: Bull 2020 update of coll/han
2 parents d3cba3e + ca663de commit 7017a07

18 files changed

+574
-98
lines changed

ompi/mca/coll/han/Makefile.am

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ coll_han.h \
1414
coll_han_trigger.h \
1515
coll_han_dynamic.h \
1616
coll_han_dynamic_file.h \
17+
coll_han_barrier.c \
1718
coll_han_bcast.c \
1819
coll_han_reduce.c \
1920
coll_han_scatter.c \

ompi/mca/coll/han/coll_han.h

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,24 @@
1010
* $HEADER$
1111
*/
1212

13+
/**
14+
* @file
15+
*
16+
* This component provides hierarchical implementations of MPI collectives.
17+
* Hierarchical approach is efficient in case of too many process wanting a remote
18+
* access to the same local or remote resource (high message rate).
19+
* Some components are also better at local scale (for example with shared memory)
20+
* where others provide scalable implementations. Hierarchical implementation
21+
* enable a fallback on other components for intermediary operation.
22+
* For example a MPI_Bcast will be divided into a sequence of bcasts from the
23+
* highest to the lowest topological level.
24+
* Some algorithms introduce more advanced feature (such as noise resiliency)
25+
* some just link topological levels. The last ones are called 'simple'.
26+
* To perform sub-communications, extra communicators are initialised for
27+
* each topological level.
28+
*/
29+
30+
1331
#ifndef MCA_COLL_HAN_EXPORT_H
1432
#define MCA_COLL_HAN_EXPORT_H
1533

@@ -198,7 +216,7 @@ typedef struct mca_coll_han_component_t {
198216
/* whether we need reproducible results
199217
* (but disables topological optimisations)
200218
*/
201-
uint32_t han_reproducible;
219+
bool han_reproducible;
202220
bool use_simple_algorithm[COLLCOUNT];
203221

204222
/* Dynamic configuration rules */
@@ -214,7 +232,6 @@ typedef struct mca_coll_han_component_t {
214232
int max_dynamic_errors;
215233
} mca_coll_han_component_t;
216234

217-
typedef void (*previous_dummy_fn_t) (void);
218235

219236
/*
220237
* Structure used to store what is necessary for the collective operations
@@ -225,11 +242,11 @@ typedef struct mca_coll_han_single_collective_fallback_s {
225242
mca_coll_base_module_allgather_fn_t allgather;
226243
mca_coll_base_module_allgatherv_fn_t allgatherv;
227244
mca_coll_base_module_allreduce_fn_t allreduce;
245+
mca_coll_base_module_barrier_fn_t barrier;
228246
mca_coll_base_module_bcast_fn_t bcast;
229247
mca_coll_base_module_gather_fn_t gather;
230248
mca_coll_base_module_reduce_fn_t reduce;
231249
mca_coll_base_module_scatter_fn_t scatter;
232-
previous_dummy_fn_t dummy;
233250
};
234251
mca_coll_base_module_t* module;
235252
} mca_coll_han_single_collective_fallback_t;
@@ -243,6 +260,7 @@ typedef struct mca_coll_han_collectives_fallback_s {
243260
mca_coll_han_single_collective_fallback_t allgather;
244261
mca_coll_han_single_collective_fallback_t allgatherv;
245262
mca_coll_han_single_collective_fallback_t allreduce;
263+
mca_coll_han_single_collective_fallback_t barrier;
246264
mca_coll_han_single_collective_fallback_t bcast;
247265
mca_coll_han_single_collective_fallback_t reduce;
248266
mca_coll_han_single_collective_fallback_t gather;
@@ -256,7 +274,9 @@ typedef struct mca_coll_han_module_t {
256274

257275
/* Whether this module has been lazily initialized or not yet */
258276
bool enabled;
277+
int recursive_free_depth;
259278

279+
struct ompi_communicator_t *cached_comm;
260280
struct ompi_communicator_t **cached_low_comms;
261281
struct ompi_communicator_t **cached_up_comms;
262282
int *cached_vranks;
@@ -305,6 +325,9 @@ OBJ_CLASS_DECLARATION(mca_coll_han_module_t);
305325
#define previous_allreduce fallback.allreduce.allreduce
306326
#define previous_allreduce_module fallback.allreduce.module
307327

328+
#define previous_barrier fallback.barrier.barrier
329+
#define previous_barrier_module fallback.barrier.module
330+
308331
#define previous_bcast fallback.bcast.bcast
309332
#define previous_bcast_module fallback.bcast.module
310333

@@ -333,6 +356,7 @@ OBJ_CLASS_DECLARATION(mca_coll_han_module_t);
333356
/* macro to correctly load /all/ fallback collectives */
334357
#define HAN_LOAD_FALLBACK_COLLECTIVES(HANM, COMM) \
335358
do { \
359+
HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, barrier); \
336360
HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, bcast); \
337361
HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, scatter); \
338362
HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, gather); \
@@ -404,6 +428,9 @@ int
404428
mca_coll_han_allreduce_intra_dynamic(ALLREDUCE_BASE_ARGS,
405429
mca_coll_base_module_t *module);
406430
int
431+
mca_coll_han_barrier_intra_dynamic(BARRIER_BASE_ARGS,
432+
mca_coll_base_module_t *module);
433+
int
407434
mca_coll_han_bcast_intra_dynamic(BCAST_BASE_ARGS,
408435
mca_coll_base_module_t *module);
409436
int
@@ -416,6 +443,8 @@ int
416443
mca_coll_han_scatter_intra_dynamic(SCATTER_BASE_ARGS,
417444
mca_coll_base_module_t *module);
418445

446+
int mca_coll_han_barrier_intra_simple(struct ompi_communicator_t *comm,
447+
mca_coll_base_module_t *module);
419448
/* Bcast */
420449
int mca_coll_han_bcast_intra_simple(void *buff,
421450
int count,
@@ -494,6 +523,14 @@ mca_coll_han_scatter_intra(const void *sbuf, int scount,
494523
struct ompi_datatype_t *rdtype,
495524
int root,
496525
struct ompi_communicator_t *comm, mca_coll_base_module_t * module);
526+
int
527+
mca_coll_han_scatter_intra_simple(const void *sbuf, int scount,
528+
struct ompi_datatype_t *sdtype,
529+
void *rbuf, int rcount,
530+
struct ompi_datatype_t *rdtype,
531+
int root,
532+
struct ompi_communicator_t *comm,
533+
mca_coll_base_module_t * module);
497534

498535
/* Gather */
499536
int

ompi/mca/coll/han/coll_han_allgather.c

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,12 @@
1010
* $HEADER$
1111
*/
1212

13+
/**
14+
* @file
15+
*
16+
* This files contains all the hierarchical implementations of allgather
17+
*/
18+
1319
#include "coll_han.h"
1420
#include "ompi/mca/coll/base/coll_base_functions.h"
1521
#include "ompi/mca/coll/base/coll_tags.h"
@@ -57,6 +63,10 @@ mca_coll_han_set_allgather_args(mca_coll_han_allgather_t * args,
5763
args->req = req;
5864
}
5965

66+
67+
/**
68+
* Main function for taskified allgather: calls lg task, a gather on low comm
69+
*/
6070
int
6171
mca_coll_han_allgather_intra(const void *sbuf, int scount,
6272
struct ompi_datatype_t *sdtype,
@@ -91,7 +101,7 @@ mca_coll_han_allgather_intra(const void *sbuf, int scount,
91101
comm, comm->c_coll->coll_allgather_module);
92102
}
93103

94-
ompi_request_t *temp_request = NULL;
104+
ompi_request_t *temp_request;
95105
/* Set up request */
96106
temp_request = OBJ_NEW(ompi_request_t);
97107
temp_request->req_state = OMPI_REQUEST_ACTIVE;
@@ -276,6 +286,10 @@ int mca_coll_han_allgather_lb_task(void *task_args)
276286

277287
}
278288

289+
/**
290+
* Short implementation of allgather that only does hierarchical
291+
* communications without tasks.
292+
*/
279293
int
280294
mca_coll_han_allgather_intra_simple(const void *sbuf, int scount,
281295
struct ompi_datatype_t *sdtype,

ompi/mca/coll/han/coll_han_allreduce.c

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,13 @@
1212
* $HEADER$
1313
*/
1414

15+
/**
16+
* @file
17+
*
18+
* This files contains all the hierarchical implementations of allreduce
19+
* Only work with regular situation (each node has equal number of processes)
20+
*/
21+
1522
#include "coll_han.h"
1623
#include "ompi/mca/coll/base/coll_base_functions.h"
1724
#include "ompi/mca/coll/base/coll_tags.h"
@@ -172,16 +179,16 @@ mca_coll_han_allreduce_intra(const void *sbuf,
172179
issue_task(t3);
173180

174181
while (t->completed[0] != t->num_segments) {
175-
/* Create t3 tasks for the current segment */
176-
mca_coll_task_t *t3 = OBJ_NEW(mca_coll_task_t);
177-
/* Setup up t3 task arguments */
178-
t->cur_task = t3;
182+
/* Create t_next_seg tasks for the current segment */
183+
mca_coll_task_t *t_next_seg = OBJ_NEW(mca_coll_task_t);
184+
/* Setup up t_next_seg task arguments */
185+
t->cur_task = t_next_seg;
179186
t->sbuf = (char *) t->sbuf + extent * t->seg_count;
180187
t->rbuf = (char *) t->rbuf + extent * t->seg_count;
181188
t->cur_seg = t->cur_seg + 1;
182-
/* Init t3 task */
183-
init_task(t3, mca_coll_han_allreduce_t3_task, (void *) t);
184-
issue_task(t3);
189+
/* Init t_next_seg task */
190+
init_task(t_next_seg, mca_coll_han_allreduce_t3_task, (void *) t);
191+
issue_task(t_next_seg);
185192
}
186193
free(t->completed);
187194
t->completed = NULL;
@@ -194,7 +201,7 @@ mca_coll_han_allreduce_intra(const void *sbuf,
194201
comm, han_module->previous_allreduce_module);
195202
}
196203

197-
/* t0 task */
204+
/* t0 task that performs a local reduction */
198205
int mca_coll_han_allreduce_t0_task(void *task_args)
199206
{
200207
mca_coll_han_allreduce_args_t *t = (mca_coll_han_allreduce_args_t *) task_args;
@@ -224,7 +231,7 @@ int mca_coll_han_allreduce_t0_task(void *task_args)
224231
return OMPI_SUCCESS;
225232
}
226233

227-
/* t1 task */
234+
/* t1 task that performs a ireduce on top communicator */
228235
int mca_coll_han_allreduce_t1_task(void *task_args)
229236
{
230237
mca_coll_han_allreduce_args_t *t = (mca_coll_han_allreduce_args_t *) task_args;
@@ -326,7 +333,7 @@ int mca_coll_han_allreduce_t2_task(void *task_args)
326333
return OMPI_SUCCESS;
327334
}
328335

329-
/* t3 task */
336+
/* t3 task that performs broadcasts */
330337
int mca_coll_han_allreduce_t3_task(void *task_args)
331338
{
332339
mca_coll_han_allreduce_args_t *t = (mca_coll_han_allreduce_args_t *) task_args;
@@ -397,6 +404,10 @@ int mca_coll_han_allreduce_t3_task(void *task_args)
397404
return OMPI_SUCCESS;
398405
}
399406

407+
/*
408+
* Short implementation of allreduce that only does hierarchical
409+
* communications without tasks.
410+
*/
400411
int
401412
mca_coll_han_allreduce_intra_simple(const void *sbuf,
402413
void *rbuf,

ompi/mca/coll/han/coll_han_barrier.c

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright (c) 2018-2020 The University of Tennessee and The University
3+
* of Tennessee Research Foundation. All rights
4+
* reserved.
5+
* Copyright (c) 2020 Bull S.A.S. All rights reserved.
6+
* $COPYRIGHT$
7+
*
8+
* Additional copyrights may follow
9+
*
10+
* $HEADER$
11+
*/
12+
13+
/**
14+
* @file
15+
*
16+
* This files contains all the hierarchical implementations of barrier
17+
*/
18+
19+
#include "coll_han.h"
20+
#include "ompi/mca/coll/base/coll_base_functions.h"
21+
#include "ompi/mca/coll/base/coll_tags.h"
22+
23+
24+
/**
25+
* Short implementation of barrier that only does hierarchical
26+
* communications without tasks.
27+
*/
28+
int
29+
mca_coll_han_barrier_intra_simple(struct ompi_communicator_t *comm,
30+
mca_coll_base_module_t *module)
31+
{
32+
mca_coll_han_module_t *han_module = (mca_coll_han_module_t *)module;
33+
ompi_communicator_t *low_comm, *up_comm;
34+
35+
/* create the subcommunicators */
36+
if( OMPI_SUCCESS != mca_coll_han_comm_create_new(comm, han_module) ) {
37+
OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output,
38+
"han cannot handle barrier with this communicator. Fall back on another component\n"));
39+
/* Put back the fallback collective support and call it once. All
40+
* future calls will then be automatically redirected.
41+
*/
42+
HAN_LOAD_FALLBACK_COLLECTIVES(han_module, comm);
43+
return comm->c_coll->coll_barrier(comm, comm->c_coll->coll_bcast_module);
44+
}
45+
46+
low_comm = han_module->sub_comm[INTRA_NODE];
47+
up_comm = han_module->sub_comm[INTER_NODE];
48+
49+
int low_rank = ompi_comm_rank(low_comm);
50+
int root_low_rank = 0; /* rank leader will be 0 on each node */
51+
52+
/* TODO: extend coll interface with half barrier */
53+
low_comm->c_coll->coll_barrier(low_comm,low_comm->c_coll->coll_barrier_module);
54+
55+
if (low_rank == root_low_rank) {
56+
up_comm->c_coll->coll_barrier(up_comm, up_comm->c_coll->coll_barrier_module);
57+
}
58+
59+
low_comm->c_coll->coll_barrier(low_comm,low_comm->c_coll->coll_barrier_module);
60+
61+
return OMPI_SUCCESS;
62+
}

ompi/mca/coll/han/coll_han_bcast.c

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,12 @@
1111
* $HEADER$
1212
*/
1313

14+
/**
15+
* @file
16+
*
17+
* This files contains all the hierarchical implementations of bcast
18+
*/
19+
1420
#include "coll_han.h"
1521
#include "ompi/mca/coll/base/coll_base_functions.h"
1622
#include "ompi/mca/coll/base/coll_tags.h"
@@ -71,7 +77,7 @@ mca_coll_han_bcast_intra(void *buff,
7177

7278
/* Create the subcommunicators */
7379
err = mca_coll_han_comm_create(comm, han_module);
74-
if( OMPI_SUCCESS != err ) { /* Let's hope the error is consistently returned across the entire communicator */
80+
if( OMPI_SUCCESS != err ) {
7581
OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output,
7682
"han cannot handle bcast with this communicator. Fall back on another component\n"));
7783
/* Put back the fallback collective support and call it once. All
@@ -211,6 +217,10 @@ int mca_coll_han_bcast_t1_task(void *task_args)
211217
return OMPI_SUCCESS;
212218
}
213219

220+
/*
221+
* Short implementation of bcast that only does hierarchical
222+
* communications without tasks.
223+
*/
214224
int
215225
mca_coll_han_bcast_intra_simple(void *buff,
216226
int count,
@@ -229,7 +239,7 @@ mca_coll_han_bcast_intra_simple(void *buff,
229239

230240
/* Create the subcommunicators */
231241
err = mca_coll_han_comm_create_new(comm, han_module);
232-
if( OMPI_SUCCESS != err ) { /* Let's hope the error is consistently returned across the entire communicator */
242+
if( OMPI_SUCCESS != err ) {
233243
OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output,
234244
"han cannot handle bcast with this communicator. Fall back on another component\n"));
235245
/* Put back the fallback collective support and call it once. All

0 commit comments

Comments
 (0)