Skip to content

Commit 8bb3e3d

Browse files
committed
coll/han: disqualify hierarchical scatterv for heterogeneous communicators
Hierarchical *v collective algorithms may not work for heterogeneous communicators with different endianness, interger representation, etc., and thus require knowledge of the global communicator's homogeneity to disqualify the module. The hierarchical scatterv algorithm requires that every process have the same architecture as the Root due to the use of MPI_BYTE on node leaders. Heterogeneous communicators need additional logic to correctly pack and unpack the data at the cost of memory usage and performance. Signed-off-by: Wenduo Wang <wenduwan@amazon.com>
1 parent 73001e5 commit 8bb3e3d

File tree

3 files changed

+31
-7
lines changed

3 files changed

+31
-7
lines changed

ompi/mca/coll/han/coll_han.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,7 @@ typedef struct mca_coll_han_module_t {
331331
int *cached_topo;
332332
bool is_mapbycore;
333333
bool are_ppn_imbalanced;
334+
bool is_heterogeneous;
334335

335336
/* To be able to fallback when the cases are not supported */
336337
struct mca_coll_han_collectives_fallback_s fallback;

ompi/mca/coll/han/coll_han_scatterv.c

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,12 @@
5555
* to send the data in the correct order even if the process are NOT mapped by core.
5656
* 2. In the send buffer, other than the root's node, data destined to the same node are continuous
5757
* - it is ok if data to different nodes has gap.
58+
*
59+
* Limitation:
60+
* The node leader acts as a broker between the Root and node followers, but it cannot match the
61+
* exact type signature of the followers; instead it forwards the intermediate data from Root in its
62+
* packed form of MPI_BYTE type. This works for Gatherv but NOT for Scatterv provided that the Root
63+
* has a different architecture, e.g. endianness, integer representation, etc.
5864
*/
5965
int mca_coll_han_scatterv_intra(const void *sbuf, const int *scounts, const int *displs,
6066
struct ompi_datatype_t *sdtype, void *rbuf, int rcount,
@@ -94,6 +100,14 @@ int mca_coll_han_scatterv_intra(const void *sbuf, const int *scounts, const int
94100
return han_module->previous_scatterv(sbuf, scounts, displs, sdtype, rbuf, rcount, rdtype,
95101
root, comm, han_module->previous_scatterv_module);
96102
}
103+
if (han_module->is_heterogeneous) {
104+
OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output,
105+
"han cannot handle scatterv with this communicator (heterogeneous). Fall "
106+
"back on another component\n"));
107+
HAN_LOAD_FALLBACK_COLLECTIVE(han_module, comm, scatterv);
108+
return han_module->previous_scatterv(sbuf, scounts, displs, sdtype, rbuf, rcount, rdtype,
109+
root, comm, han_module->previous_scatterv_module);
110+
}
97111

98112
w_rank = ompi_comm_rank(comm);
99113
w_size = ompi_comm_size(comm);

ompi/mca/coll/han/coll_han_topo.c

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -92,12 +92,19 @@ mca_coll_han_topo_init(struct ompi_communicator_t *comm,
9292
}
9393
assert(up_comm != NULL && low_comm != NULL);
9494

95+
int up_rank = ompi_comm_rank(up_comm);
9596
int low_rank = ompi_comm_rank(low_comm);
9697
int low_size = ompi_comm_size(low_comm);
9798

99+
ompi_proc_t *up_proc = NULL;
100+
98101
int *topo = (int *)malloc(sizeof(int) * size * num_topo_level);
99-
int is_imbalanced = 1;
100-
int ranks_non_consecutive = 0;
102+
int is_imbalanced = 1, ranks_non_consecutive = 0, is_heterogeneous = 0;
103+
104+
if (0 != up_rank) {
105+
up_proc = ompi_comm_peer_lookup(up_comm, 0);
106+
is_heterogeneous = up_proc->super.proc_convertor->remoteArch != opal_local_arch;
107+
}
101108

102109
/* node leaders translate the node-local ranks to global ranks and check whether they are placed consecutively */
103110
if (0 == low_rank) {
@@ -116,15 +123,16 @@ mca_coll_han_topo_init(struct ompi_communicator_t *comm,
116123
}
117124
}
118125

119-
int reduce_vals[] = {ranks_non_consecutive, low_size, -low_size};
126+
int reduce_vals[] = {ranks_non_consecutive, low_size, -low_size, is_heterogeneous};
120127

121-
up_comm->c_coll->coll_allreduce(MPI_IN_PLACE, &reduce_vals, 3,
128+
up_comm->c_coll->coll_allreduce(MPI_IN_PLACE, &reduce_vals, 4,
122129
MPI_INT, MPI_MAX, up_comm,
123130
up_comm->c_coll->coll_allreduce_module);
124131

125132
/* is the distribution of processes balanced per node? */
126133
is_imbalanced = (reduce_vals[1] == -reduce_vals[2]) ? 0 : 1;
127134
ranks_non_consecutive = reduce_vals[0];
135+
is_heterogeneous = reduce_vals[3];
128136

129137
if ( ranks_non_consecutive && !is_imbalanced ) {
130138
/* kick off up_comm allgather to collect non-consecutive rank information at node leaders */
@@ -136,12 +144,13 @@ mca_coll_han_topo_init(struct ompi_communicator_t *comm,
136144
}
137145

138146

139-
/* broadcast balanced and consecutive properties from node leaders to remaining ranks */
140-
int bcast_vals[] = {is_imbalanced, ranks_non_consecutive};
141-
low_comm->c_coll->coll_bcast(bcast_vals, 2, MPI_INT, 0,
147+
/* broadcast balanced, consecutive and homogeneity properties from node leaders to remaining ranks */
148+
int bcast_vals[] = {is_imbalanced, ranks_non_consecutive, is_heterogeneous};
149+
low_comm->c_coll->coll_bcast(bcast_vals, 3, MPI_INT, 0,
142150
low_comm, low_comm->c_coll->coll_bcast_module);
143151
is_imbalanced = bcast_vals[0];
144152
ranks_non_consecutive = bcast_vals[1];
153+
han_module->is_heterogeneous = bcast_vals[2];
145154

146155
/* error out if the rank distribution is not balanced */
147156
if (is_imbalanced) {

0 commit comments

Comments
 (0)