Skip to content

Commit 85fd1bc

Browse files
authored
Merge pull request #12439 from wenduwan/han_gatherv_noncontiguous_datatype_fix
Han gatherv noncontiguous datatype fix
2 parents 02bc0c5 + 8bb3e3d commit 85fd1bc

File tree

4 files changed

+48
-26
lines changed

4 files changed

+48
-26
lines changed

ompi/mca/coll/han/coll_han.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,7 @@ typedef struct mca_coll_han_module_t {
331331
int *cached_topo;
332332
bool is_mapbycore;
333333
bool are_ppn_imbalanced;
334+
bool is_heterogeneous;
334335

335336
/* To be able to fallback when the cases are not supported */
336337
struct mca_coll_han_collectives_fallback_s fallback;

ompi/mca/coll/han/coll_han_gatherv.c

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -149,11 +149,8 @@ int mca_coll_han_gatherv_intra(const void *sbuf, int scount, struct ompi_datatyp
149149
root_low_rank, low_comm,
150150
low_comm->c_coll->coll_gatherv_module);
151151

152-
size_t rdsize;
153152
char *tmp_rbuf = rbuf;
154153

155-
ompi_datatype_type_size(rdtype, &rdsize);
156-
157154
up_rcounts = calloc(up_size, sizeof(int));
158155
up_displs = malloc(up_size * sizeof(int));
159156
up_peer_ub = calloc(up_size, sizeof(int));
@@ -210,7 +207,9 @@ int mca_coll_han_gatherv_intra(const void *sbuf, int scount, struct ompi_datatyp
210207
}
211208

212209
if (need_bounce_buf) {
213-
bounce_buf = malloc(rdsize * total_up_rcounts);
210+
ptrdiff_t rsize, rgap;
211+
rsize = opal_datatype_span(&rdtype->super, total_up_rcounts, &rgap);
212+
bounce_buf = malloc(rsize);
214213
if (!bounce_buf) {
215214
err = OMPI_ERR_OUT_OF_RESOURCE;
216215
goto root_out;
@@ -222,7 +221,7 @@ int mca_coll_han_gatherv_intra(const void *sbuf, int scount, struct ompi_datatyp
222221
: 0;
223222
}
224223

225-
tmp_rbuf = bounce_buf;
224+
tmp_rbuf = bounce_buf - rgap;
226225
}
227226

228227
/* Up Gatherv */
@@ -231,7 +230,8 @@ int mca_coll_han_gatherv_intra(const void *sbuf, int scount, struct ompi_datatyp
231230

232231
/* Use a temp buffer to reorder the output buffer if needed */
233232
if (need_bounce_buf) {
234-
ptrdiff_t offset = 0;
233+
ptrdiff_t offset = 0, rdext;
234+
ompi_datatype_type_extent(rdtype, &rdext);
235235

236236
for (int i = 0; i < w_size; ++i) {
237237
up_peer = topo[2 * i];
@@ -242,10 +242,9 @@ int mca_coll_han_gatherv_intra(const void *sbuf, int scount, struct ompi_datatyp
242242
w_peer = topo[2 * i + 1];
243243

244244
ompi_datatype_copy_content_same_ddt(rdtype, (size_t) rcounts[w_peer],
245-
(char *) rbuf
246-
+ (size_t) displs[w_peer] * rdsize,
245+
(char *) rbuf + (size_t) displs[w_peer] * rdext,
247246
bounce_buf + offset);
248-
offset += rdsize * (size_t) rcounts[w_peer];
247+
offset += rdext * (size_t) rcounts[w_peer];
249248
}
250249
}
251250

ompi/mca/coll/han/coll_han_scatterv.c

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,12 @@
5555
* to send the data in the correct order even if the process are NOT mapped by core.
5656
* 2. In the send buffer, other than the root's node, data destined to the same node are continuous
5757
* - it is ok if data to different nodes has gap.
58+
*
59+
* Limitation:
60+
* The node leader acts as a broker between the Root and node followers, but it cannot match the
61+
* exact type signature of the followers; instead it forwards the intermediate data from Root in its
62+
* packed form of MPI_BYTE type. This works for Gatherv but NOT for Scatterv provided that the Root
63+
* has a different architecture, e.g. endianness, integer representation, etc.
5864
*/
5965
int mca_coll_han_scatterv_intra(const void *sbuf, const int *scounts, const int *displs,
6066
struct ompi_datatype_t *sdtype, void *rbuf, int rcount,
@@ -94,6 +100,14 @@ int mca_coll_han_scatterv_intra(const void *sbuf, const int *scounts, const int
94100
return han_module->previous_scatterv(sbuf, scounts, displs, sdtype, rbuf, rcount, rdtype,
95101
root, comm, han_module->previous_scatterv_module);
96102
}
103+
if (han_module->is_heterogeneous) {
104+
OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output,
105+
"han cannot handle scatterv with this communicator (heterogeneous). Fall "
106+
"back on another component\n"));
107+
HAN_LOAD_FALLBACK_COLLECTIVE(han_module, comm, scatterv);
108+
return han_module->previous_scatterv(sbuf, scounts, displs, sdtype, rbuf, rcount, rdtype,
109+
root, comm, han_module->previous_scatterv_module);
110+
}
97111

98112
w_rank = ompi_comm_rank(comm);
99113
w_size = ompi_comm_size(comm);
@@ -125,7 +139,6 @@ int mca_coll_han_scatterv_intra(const void *sbuf, const int *scounts, const int
125139
int need_bounce_buf = 0, total_up_scounts = 0, *up_displs = NULL, *up_scounts = NULL,
126140
*up_peer_lb = NULL, *up_peer_ub = NULL;
127141
char *reorder_sbuf = (char *) sbuf, *bounce_buf = NULL;
128-
size_t sdsize;
129142

130143
low_scounts = malloc(low_size * sizeof(int));
131144
low_displs = malloc(low_size * sizeof(int));
@@ -144,8 +157,6 @@ int mca_coll_han_scatterv_intra(const void *sbuf, const int *scounts, const int
144157
low_scounts[low_peer] = scounts[w_peer];
145158
}
146159

147-
ompi_datatype_type_size(sdtype, &sdsize);
148-
149160
up_scounts = calloc(up_size, sizeof(int));
150161
up_displs = malloc(up_size * sizeof(int));
151162
up_peer_ub = calloc(up_size, sizeof(int));
@@ -201,11 +212,14 @@ int mca_coll_han_scatterv_intra(const void *sbuf, const int *scounts, const int
201212
}
202213

203214
if (need_bounce_buf) {
204-
bounce_buf = malloc(sdsize * total_up_scounts);
215+
ptrdiff_t ssize, sgap;
216+
ssize = opal_datatype_span(&rdtype->super, total_up_scounts, &sgap);
217+
bounce_buf = malloc(ssize);
205218
if (!bounce_buf) {
206219
err = OMPI_ERR_OUT_OF_RESOURCE;
207220
goto root_out;
208221
}
222+
reorder_sbuf = bounce_buf - sgap;
209223

210224
/* Calculate displacements for the inter-node scatterv */
211225
for (up_peer = 0; up_peer < up_size; ++up_peer) {
@@ -214,7 +228,8 @@ int mca_coll_han_scatterv_intra(const void *sbuf, const int *scounts, const int
214228
}
215229

216230
/* Use a temp buffer to reorder the send buffer if needed */
217-
ptrdiff_t offset = 0;
231+
ptrdiff_t offset = 0, sdext;
232+
ompi_datatype_type_extent(sdtype, &sdext);
218233

219234
for (int i = 0; i < w_size; ++i) {
220235
up_peer = topo[2 * i];
@@ -225,13 +240,11 @@ int mca_coll_han_scatterv_intra(const void *sbuf, const int *scounts, const int
225240
w_peer = topo[2 * i + 1];
226241

227242
ompi_datatype_copy_content_same_ddt(sdtype, (size_t) scounts[w_peer],
228-
bounce_buf + offset,
243+
reorder_sbuf + offset,
229244
(char *) sbuf
230-
+ (size_t) displs[w_peer] * sdsize);
231-
offset += sdsize * (size_t) scounts[w_peer];
245+
+ (size_t) displs[w_peer] * sdext);
246+
offset += sdext * (size_t) scounts[w_peer];
232247
}
233-
234-
reorder_sbuf = bounce_buf;
235248
}
236249

237250
/* Up Iscatterv */

ompi/mca/coll/han/coll_han_topo.c

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -92,12 +92,19 @@ mca_coll_han_topo_init(struct ompi_communicator_t *comm,
9292
}
9393
assert(up_comm != NULL && low_comm != NULL);
9494

95+
int up_rank = ompi_comm_rank(up_comm);
9596
int low_rank = ompi_comm_rank(low_comm);
9697
int low_size = ompi_comm_size(low_comm);
9798

99+
ompi_proc_t *up_proc = NULL;
100+
98101
int *topo = (int *)malloc(sizeof(int) * size * num_topo_level);
99-
int is_imbalanced = 1;
100-
int ranks_non_consecutive = 0;
102+
int is_imbalanced = 1, ranks_non_consecutive = 0, is_heterogeneous = 0;
103+
104+
if (0 != up_rank) {
105+
up_proc = ompi_comm_peer_lookup(up_comm, 0);
106+
is_heterogeneous = up_proc->super.proc_convertor->remoteArch != opal_local_arch;
107+
}
101108

102109
/* node leaders translate the node-local ranks to global ranks and check whether they are placed consecutively */
103110
if (0 == low_rank) {
@@ -116,15 +123,16 @@ mca_coll_han_topo_init(struct ompi_communicator_t *comm,
116123
}
117124
}
118125

119-
int reduce_vals[] = {ranks_non_consecutive, low_size, -low_size};
126+
int reduce_vals[] = {ranks_non_consecutive, low_size, -low_size, is_heterogeneous};
120127

121-
up_comm->c_coll->coll_allreduce(MPI_IN_PLACE, &reduce_vals, 3,
128+
up_comm->c_coll->coll_allreduce(MPI_IN_PLACE, &reduce_vals, 4,
122129
MPI_INT, MPI_MAX, up_comm,
123130
up_comm->c_coll->coll_allreduce_module);
124131

125132
/* is the distribution of processes balanced per node? */
126133
is_imbalanced = (reduce_vals[1] == -reduce_vals[2]) ? 0 : 1;
127134
ranks_non_consecutive = reduce_vals[0];
135+
is_heterogeneous = reduce_vals[3];
128136

129137
if ( ranks_non_consecutive && !is_imbalanced ) {
130138
/* kick off up_comm allgather to collect non-consecutive rank information at node leaders */
@@ -136,12 +144,13 @@ mca_coll_han_topo_init(struct ompi_communicator_t *comm,
136144
}
137145

138146

139-
/* broadcast balanced and consecutive properties from node leaders to remaining ranks */
140-
int bcast_vals[] = {is_imbalanced, ranks_non_consecutive};
141-
low_comm->c_coll->coll_bcast(bcast_vals, 2, MPI_INT, 0,
147+
/* broadcast balanced, consecutive and homogeneity properties from node leaders to remaining ranks */
148+
int bcast_vals[] = {is_imbalanced, ranks_non_consecutive, is_heterogeneous};
149+
low_comm->c_coll->coll_bcast(bcast_vals, 3, MPI_INT, 0,
142150
low_comm, low_comm->c_coll->coll_bcast_module);
143151
is_imbalanced = bcast_vals[0];
144152
ranks_non_consecutive = bcast_vals[1];
153+
han_module->is_heterogeneous = bcast_vals[2];
145154

146155
/* error out if the rank distribution is not balanced */
147156
if (is_imbalanced) {

0 commit comments

Comments
 (0)