Skip to content

Commit 48c125e

Browse files
committed
coll/han: implement hierarchical gatherv
Add gatherv implementation to optimize large-scale communications on multiple nodes and multiple processes per node, by avoiding high-incast traffic on the root process. Because *V collectives do not have equal datatype/count on every process, it does not natively support message-size based tuning without an additional global communication. Similar to gather and allgather, the hierarchical gatherv requires a temporary buffer and memory copy to handle out-of-order data, or non-contiguous placement on the output buffer, which results in worse performance for large messages compared to the linear implementation. Signed-off-by: Wenduo Wang <wenduwan@amazon.com>
1 parent b22e5fa commit 48c125e

10 files changed

+620
-2
lines changed

ompi/mca/coll/han/Makefile.am

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ coll_han_bcast.c \
2222
coll_han_reduce.c \
2323
coll_han_scatter.c \
2424
coll_han_gather.c \
25+
coll_han_gatherv.c \
2526
coll_han_allreduce.c \
2627
coll_han_allgather.c \
2728
coll_han_component.c \
@@ -31,7 +32,8 @@ coll_han_algorithms.c \
3132
coll_han_dynamic.c \
3233
coll_han_dynamic_file.c \
3334
coll_han_topo.c \
34-
coll_han_subcomms.c
35+
coll_han_subcomms.c \
36+
coll_han_utils.c
3537

3638
# Make the output library in this directory, and name it either
3739
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la

ompi/mca/coll/han/coll_han.h

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ typedef struct mca_coll_han_op_module_name_t {
191191
mca_coll_han_op_up_low_module_name_t allreduce;
192192
mca_coll_han_op_up_low_module_name_t allgather;
193193
mca_coll_han_op_up_low_module_name_t gather;
194+
mca_coll_han_op_up_low_module_name_t gatherv;
194195
mca_coll_han_op_up_low_module_name_t scatter;
195196
} mca_coll_han_op_module_name_t;
196197

@@ -235,6 +236,10 @@ typedef struct mca_coll_han_component_t {
235236
uint32_t han_gather_up_module;
236237
/* low level module for gather */
237238
uint32_t han_gather_low_module;
239+
/* up level module for gatherv */
240+
uint32_t han_gatherv_up_module;
241+
/* low level module for gatherv */
242+
uint32_t han_gatherv_low_module;
238243
/* up level module for scatter */
239244
uint32_t han_scatter_up_module;
240245
/* low level module for scatter */
@@ -279,6 +284,7 @@ typedef struct mca_coll_han_single_collective_fallback_s {
279284
mca_coll_base_module_barrier_fn_t barrier;
280285
mca_coll_base_module_bcast_fn_t bcast;
281286
mca_coll_base_module_gather_fn_t gather;
287+
mca_coll_base_module_gatherv_fn_t gatherv;
282288
mca_coll_base_module_reduce_fn_t reduce;
283289
mca_coll_base_module_scatter_fn_t scatter;
284290
} module_fn;
@@ -298,6 +304,7 @@ typedef struct mca_coll_han_collectives_fallback_s {
298304
mca_coll_han_single_collective_fallback_t bcast;
299305
mca_coll_han_single_collective_fallback_t reduce;
300306
mca_coll_han_single_collective_fallback_t gather;
307+
mca_coll_han_single_collective_fallback_t gatherv;
301308
mca_coll_han_single_collective_fallback_t scatter;
302309
} mca_coll_han_collectives_fallback_t;
303310

@@ -371,6 +378,9 @@ OBJ_CLASS_DECLARATION(mca_coll_han_module_t);
371378
#define previous_gather fallback.gather.module_fn.gather
372379
#define previous_gather_module fallback.gather.module
373380

381+
#define previous_gatherv fallback.gatherv.module_fn.gatherv
382+
#define previous_gatherv_module fallback.gatherv.module
383+
374384
#define previous_scatter fallback.scatter.module_fn.scatter
375385
#define previous_scatter_module fallback.scatter.module
376386

@@ -394,6 +404,7 @@ OBJ_CLASS_DECLARATION(mca_coll_han_module_t);
394404
HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, bcast); \
395405
HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, scatter); \
396406
HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, gather); \
407+
HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, gatherv); \
397408
HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, reduce); \
398409
HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, allreduce); \
399410
HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, allgather); \
@@ -476,6 +487,9 @@ int
476487
mca_coll_han_gather_intra_dynamic(GATHER_BASE_ARGS,
477488
mca_coll_base_module_t *module);
478489
int
490+
mca_coll_han_gatherv_intra_dynamic(GATHERV_BASE_ARGS,
491+
mca_coll_base_module_t *module);
492+
int
479493
mca_coll_han_reduce_intra_dynamic(REDUCE_BASE_ARGS,
480494
mca_coll_base_module_t *module);
481495
int
@@ -493,4 +507,10 @@ ompi_coll_han_reorder_gather(const void *sbuf,
493507
struct ompi_communicator_t *comm,
494508
int * topo);
495509

510+
size_t
511+
coll_han_utils_gcd(const size_t *numerators, const size_t size);
512+
513+
int
514+
coll_han_utils_create_contiguous_datatype(size_t count, const ompi_datatype_t *oldType,
515+
ompi_datatype_t **newType);
496516
#endif /* MCA_COLL_HAN_EXPORT_H */

ompi/mca/coll/han/coll_han_algorithms.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ mca_coll_han_algorithm_value_t* mca_coll_han_available_algorithms[COLLCOUNT] =
6464
{"simple", (fnptr_t) &mca_coll_han_gather_intra_simple}, // 2-level
6565
{ 0 }
6666
},
67+
[GATHERV] = (mca_coll_han_algorithm_value_t[]){
68+
{"intra", (fnptr_t) &mca_coll_han_gatherv_intra}, // 2-level
69+
{ 0 }
70+
},
6771
[ALLGATHER] = (mca_coll_han_algorithm_value_t[]){
6872
{"intra", (fnptr_t)&mca_coll_han_allgather_intra}, // 2-level
6973
{"simple", (fnptr_t)&mca_coll_han_allgather_intra_simple}, // 2-level

ompi/mca/coll/han/coll_han_algorithms.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,13 @@ mca_coll_han_gather_intra_simple(const void *sbuf, int scount,
176176
struct ompi_communicator_t *comm,
177177
mca_coll_base_module_t *module);
178178

179+
/* Gatherv */
180+
int
181+
mca_coll_han_gatherv_intra(const void *sbuf, int scount, struct ompi_datatype_t *sdtype,
182+
void *rbuf, const int *rcounts, const int *displs,
183+
struct ompi_datatype_t *rdtype, int root,
184+
struct ompi_communicator_t *comm, mca_coll_base_module_t *module);
185+
179186
/* Allgather */
180187
int
181188
mca_coll_han_allgather_intra(const void *sbuf, int scount,

ompi/mca/coll/han/coll_han_component.c

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,11 @@ static int han_close(void)
146146
free(mca_coll_han_component.han_op_module_name.gather.han_op_low_module_name);
147147
mca_coll_han_component.han_op_module_name.gather.han_op_low_module_name = NULL;
148148

149+
free(mca_coll_han_component.han_op_module_name.gatherv.han_op_up_module_name);
150+
mca_coll_han_component.han_op_module_name.gatherv.han_op_up_module_name = NULL;
151+
free(mca_coll_han_component.han_op_module_name.gatherv.han_op_low_module_name);
152+
mca_coll_han_component.han_op_module_name.gatherv.han_op_low_module_name = NULL;
153+
149154
free(mca_coll_han_component.han_op_module_name.scatter.han_op_up_module_name);
150155
mca_coll_han_component.han_op_module_name.scatter.han_op_up_module_name = NULL;
151156
free(mca_coll_han_component.han_op_module_name.scatter.han_op_low_module_name);
@@ -344,6 +349,18 @@ static int han_register(void)
344349
OPAL_INFO_LVL_9, &cs->han_gather_low_module,
345350
&cs->han_op_module_name.gather.han_op_low_module_name);
346351

352+
cs->han_gatherv_up_module = 0;
353+
(void) mca_coll_han_query_module_from_mca(c, "gatherv_up_module",
354+
"up level module for gatherv, 0 basic",
355+
OPAL_INFO_LVL_9, &cs->han_gatherv_up_module,
356+
&cs->han_op_module_name.gatherv.han_op_up_module_name);
357+
358+
cs->han_gatherv_low_module = 0;
359+
(void) mca_coll_han_query_module_from_mca(c, "gatherv_low_module",
360+
"low level module for gatherv, 0 basic",
361+
OPAL_INFO_LVL_9, &cs->han_gatherv_low_module,
362+
&cs->han_op_module_name.gatherv.han_op_low_module_name);
363+
347364
cs->han_scatter_up_module = 0;
348365
(void) mca_coll_han_query_module_from_mca(c, "scatter_up_module",
349366
"up level module for scatter, 0 libnbc, 1 adapt",

ompi/mca/coll/han/coll_han_dynamic.c

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
#include "ompi/mca/coll/han/coll_han_algorithms.h"
2727
#include "ompi/mca/coll/base/coll_base_util.h"
2828

29+
#define MCA_COLL_HAN_ANY_MESSAGE_SIZE 0
30+
2931
/*
3032
* Tests if a dynamic collective is implemented
3133
* Useful for file reading warnings and MCA parameter generation
@@ -41,6 +43,7 @@ bool mca_coll_han_is_coll_dynamic_implemented(COLLTYPE_T coll_id)
4143
case BARRIER:
4244
case BCAST:
4345
case GATHER:
46+
case GATHERV:
4447
case REDUCE:
4548
case SCATTER:
4649
return true;
@@ -1045,6 +1048,103 @@ mca_coll_han_gather_intra_dynamic(const void *sbuf, int scount,
10451048
sub_module);
10461049
}
10471050

1051+
/*
1052+
* Gatherv selector:
1053+
* On a sub-communicator, checks the stored rules to find the module to use
1054+
* On the global communicator, calls the han collective implementation, or
1055+
* calls the correct module if fallback mechanism is activated
1056+
*/
1057+
int mca_coll_han_gatherv_intra_dynamic(const void *sbuf, int scount, struct ompi_datatype_t *sdtype,
1058+
void *rbuf, const int *rcounts, const int *displs,
1059+
struct ompi_datatype_t *rdtype, int root,
1060+
struct ompi_communicator_t *comm,
1061+
mca_coll_base_module_t *module)
1062+
{
1063+
mca_coll_han_module_t *han_module = (mca_coll_han_module_t*) module;
1064+
TOPO_LVL_T topo_lvl = han_module->topologic_level;
1065+
mca_coll_base_module_gatherv_fn_t gatherv;
1066+
mca_coll_base_module_t *sub_module;
1067+
int rank, verbosity = 0;
1068+
1069+
if (!han_module->enabled) {
1070+
return han_module->previous_gatherv(sbuf, scount, sdtype, rbuf, rcounts, displs, rdtype,
1071+
root, comm, han_module->previous_gatherv_module);
1072+
}
1073+
1074+
/* v collectives do not support message-size based dynamic rules */
1075+
sub_module = get_module(GATHERV, MCA_COLL_HAN_ANY_MESSAGE_SIZE, comm, han_module);
1076+
1077+
/* First errors are always printed by rank 0 */
1078+
rank = ompi_comm_rank(comm);
1079+
if( (0 == rank) && (han_module->dynamic_errors < mca_coll_han_component.max_dynamic_errors) ) {
1080+
verbosity = 30;
1081+
}
1082+
1083+
if(NULL == sub_module) {
1084+
/*
1085+
* No valid collective module from dynamic rules
1086+
* nor from mca parameter
1087+
*/
1088+
han_module->dynamic_errors++;
1089+
opal_output_verbose(verbosity, mca_coll_han_component.han_output,
1090+
"coll:han:mca_coll_han_gatherv_intra_dynamic "
1091+
"HAN did not find any valid module for collective %d (%s) "
1092+
"with topological level %d (%s) on communicator (%s/%s). "
1093+
"Please check dynamic file/mca parameters\n",
1094+
GATHERV, mca_coll_base_colltype_to_str(GATHERV),
1095+
topo_lvl, mca_coll_han_topo_lvl_to_str(topo_lvl),
1096+
ompi_comm_print_cid(comm), comm->c_name);
1097+
OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output,
1098+
"HAN/GATHERV: No module found for the sub-communicator. "
1099+
"Falling back to another component\n"));
1100+
gatherv = han_module->previous_gatherv;
1101+
sub_module = han_module->previous_gatherv_module;
1102+
} else if (NULL == sub_module->coll_gatherv) {
1103+
/*
1104+
* No valid collective from dynamic rules
1105+
* nor from mca parameter
1106+
*/
1107+
han_module->dynamic_errors++;
1108+
opal_output_verbose(verbosity, mca_coll_han_component.han_output,
1109+
"coll:han:mca_coll_han_gatherv_intra_dynamic "
1110+
"HAN found valid module for collective %d (%s) "
1111+
"with topological level %d (%s) on communicator (%s/%s) "
1112+
"but this module cannot handle this collective. "
1113+
"Please check dynamic file/mca parameters\n",
1114+
GATHERV, mca_coll_base_colltype_to_str(GATHERV),
1115+
topo_lvl, mca_coll_han_topo_lvl_to_str(topo_lvl),
1116+
ompi_comm_print_cid(comm), comm->c_name);
1117+
OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output,
1118+
"HAN/GATHERV: the module found for the sub-"
1119+
"communicator cannot handle the GATHERV operation. "
1120+
"Falling back to another component\n"));
1121+
gatherv = han_module->previous_gatherv;
1122+
sub_module = han_module->previous_gatherv_module;
1123+
} else if (GLOBAL_COMMUNICATOR == topo_lvl && sub_module == module) {
1124+
/*
1125+
* No fallback mechanism activated for this configuration
1126+
* sub_module is valid
1127+
* sub_module->coll_gatherv is valid and point to this function
1128+
* Call han topological collective algorithm
1129+
*/
1130+
int algorithm_id = get_algorithm(GATHERV, MCA_COLL_HAN_ANY_MESSAGE_SIZE, comm, han_module);
1131+
gatherv = (mca_coll_base_module_gatherv_fn_t) mca_coll_han_algorithm_id_to_fn(GATHERV, algorithm_id);
1132+
if (NULL == gatherv) { /* default behaviour */
1133+
gatherv = mca_coll_han_gatherv_intra;
1134+
}
1135+
} else {
1136+
/*
1137+
* If we get here:
1138+
* sub_module is valid
1139+
* sub_module->coll_gatherv is valid
1140+
* They points to the collective to use, according to the dynamic rules
1141+
* Selector's job is done, call the collective
1142+
*/
1143+
gatherv = sub_module->coll_gatherv;
1144+
}
1145+
return gatherv(sbuf, scount, sdtype, rbuf, rcounts, displs, rdtype, root, comm, sub_module);
1146+
}
1147+
10481148

10491149
/*
10501150
* Reduce selector:

0 commit comments

Comments
 (0)