Skip to content

Commit 8023312

Browse files
authored
Merge pull request #13222 from amd-nithyavs/30Apr2025_shm_impl
coll/acoll: Bcast/Barrier enhancements and bug fixes.
2 parents c346328 + 07302f6 commit 8023312

11 files changed

+460
-41
lines changed

ompi/mca/coll/acoll/README

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ $HEADER$
88

99
===========================================================================
1010

11-
The collective component, AMD Coll (“acoll”), is a high-performant MPI collective component for the OpenMPI library that is optimized for AMD "Zen"-based processors. “acoll” is optimized for communications within a single node of AMD “Zen”-based processors and provides the following commonly used collective algorithms: boardcast (MPI_Bcast), allreduce (MPI_Allreduce), reduce (MPI_Reduce), gather (MPI_Gather), allgather (MPI_Allgather), and barrier (MPI_Barrier).
11+
The collective component, AMD Coll (“acoll”), is a high-performant MPI collective component for the OpenMPI library that is optimized for AMD "Zen"-based processors. “acoll” is optimized for communications within a single node of AMD “Zen”-based processors and provides the following commonly used collective algorithms: boardcast (MPI_Bcast), allreduce (MPI_Allreduce), reduce (MPI_Reduce), gather (MPI_Gather), allgather (MPI_Allgather), alltoall (MPI_Alltoall), and barrier (MPI_Barrier).
1212

13-
At present, “acoll” has been tested with OpenMPI v5.0.2 and can be built as part of OpenMPI.
13+
At present, “acoll” has been tested with OpenMPI main branch and can be built as part of OpenMPI.
1414

1515
To run an application with acoll, use the following command line parameters
1616
- mpirun <common mpi runtime parameters> --mca coll acoll,tuned,libnbc,basic --mca coll_acoll_priority 40 <executable>

ompi/mca/coll/acoll/coll_acoll.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include <xpmem.h>
2727
#endif
2828

29+
#include "opal/mca/accelerator/accelerator.h"
2930
#include "opal/mca/shmem/base/base.h"
3031
#include "opal/mca/shmem/shmem.h"
3132

@@ -40,6 +41,7 @@ extern int mca_coll_acoll_sg_scale;
4041
extern int mca_coll_acoll_node_size;
4142
extern int mca_coll_acoll_force_numa;
4243
extern int mca_coll_acoll_use_dynamic_rules;
44+
extern int mca_coll_acoll_disable_shmbcast;
4345
extern int mca_coll_acoll_mnode_enable;
4446
extern int mca_coll_acoll_bcast_lin0;
4547
extern int mca_coll_acoll_bcast_lin1;
@@ -201,6 +203,7 @@ typedef struct coll_acoll_subcomms {
201203
coll_acoll_data_t *data;
202204
bool initialized_data;
203205
bool initialized_shm_data;
206+
int barrier_algo;
204207
#ifdef HAVE_XPMEM_H
205208
uint64_t xpmem_buf_size;
206209
int without_xpmem;
@@ -233,6 +236,7 @@ struct mca_coll_acoll_module_t {
233236
int log2_node_cnt;
234237
int force_numa;
235238
int use_dyn_rules;
239+
int disable_shmbcast;
236240
// Todo: Use substructure for every API related ones
237241
int use_mnode;
238242
int use_lin0;

ompi/mca/coll/acoll/coll_acoll_allgather.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ static inline int log_sg_bcast_intra(void *buff, size_t count, struct ompi_datat
2929
mca_coll_base_module_t *module, ompi_request_t **preq,
3030
int *nreqs)
3131
{
32-
int msb_pos, sub_rank, peer, err;
32+
int msb_pos, sub_rank, peer, err = MPI_SUCCESS;
3333
int i, mask;
3434
int end_sg, end_peer;
3535

@@ -92,7 +92,7 @@ static inline int lin_sg_bcast_intra(void *buff, size_t count, struct ompi_datat
9292
int *nreqs)
9393
{
9494
int peer;
95-
int err;
95+
int err = MPI_SUCCESS;
9696
int sg_end;
9797

9898
sg_end = sg_start + sg_size - 1;

ompi/mca/coll/acoll/coll_acoll_allreduce.c

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -450,7 +450,21 @@ int mca_coll_acoll_allreduce_intra(const void *sbuf, void *rbuf, size_t count,
450450
ompi_datatype_type_size(dtype, &dsize);
451451
total_dsize = dsize * count;
452452

453-
if (1 == size) {
453+
/* Disable shm/xpmem based optimizations if: */
454+
/* - datatype is not a predefined type */
455+
/* - it's a gpu buffer */
456+
uint64_t flags = 0;
457+
int dev_id;
458+
bool is_opt = true;
459+
if (!OMPI_COMM_CHECK_ASSERT_NO_ACCEL_BUF(comm)) {
460+
if (!ompi_datatype_is_predefined(dtype)
461+
|| (0 < opal_accelerator.check_addr(sbuf, &dev_id, &flags))
462+
|| (0 < opal_accelerator.check_addr(rbuf, &dev_id, &flags))) {
463+
is_opt = false;
464+
}
465+
}
466+
467+
if ((1 == size) && is_opt) {
454468
if (MPI_IN_PLACE != sbuf) {
455469
memcpy((char *) rbuf, sbuf, total_dsize);
456470
}
@@ -486,7 +500,7 @@ int mca_coll_acoll_allreduce_intra(const void *sbuf, void *rbuf, size_t count,
486500
if (total_dsize < 32) {
487501
return ompi_coll_base_allreduce_intra_recursivedoubling(sbuf, rbuf, count, dtype, op,
488502
comm, module);
489-
} else if (total_dsize < 512) {
503+
} else if ((total_dsize < 512) && is_opt) {
490504
return mca_coll_acoll_allreduce_small_msgs_h(sbuf, rbuf, count, dtype, op, comm, module,
491505
subc, 1);
492506
} else if (total_dsize <= 2048) {
@@ -505,7 +519,7 @@ int mca_coll_acoll_allreduce_intra(const void *sbuf, void *rbuf, size_t count,
505519
}
506520
} else if (total_dsize < 4194304) {
507521
#ifdef HAVE_XPMEM_H
508-
if (((subc->xpmem_use_sr_buf != 0) || (subc->xpmem_buf_size > 2 * total_dsize)) && (subc->without_xpmem != 1)) {
522+
if (((subc->xpmem_use_sr_buf != 0) || (subc->xpmem_buf_size > 2 * total_dsize)) && (subc->without_xpmem != 1) && is_opt) {
509523
return mca_coll_acoll_allreduce_xpmem_f(sbuf, rbuf, count, dtype, op, comm, module, subc);
510524
} else {
511525
return ompi_coll_base_allreduce_intra_redscat_allgather(sbuf, rbuf, count, dtype,
@@ -517,7 +531,7 @@ int mca_coll_acoll_allreduce_intra(const void *sbuf, void *rbuf, size_t count,
517531
#endif
518532
} else if (total_dsize <= 16777216) {
519533
#ifdef HAVE_XPMEM_H
520-
if (((subc->xpmem_use_sr_buf != 0) || (subc->xpmem_buf_size > 2 * total_dsize)) && (subc->without_xpmem != 1)) {
534+
if (((subc->xpmem_use_sr_buf != 0) || (subc->xpmem_buf_size > 2 * total_dsize)) && (subc->without_xpmem != 1) && is_opt) {
521535
mca_coll_acoll_reduce_xpmem_h(sbuf, rbuf, count, dtype, op, comm, module, subc);
522536
return mca_coll_acoll_bcast(rbuf, count, dtype, 0, comm, module);
523537
} else {
@@ -530,7 +544,7 @@ int mca_coll_acoll_allreduce_intra(const void *sbuf, void *rbuf, size_t count,
530544
#endif
531545
} else {
532546
#ifdef HAVE_XPMEM_H
533-
if (((subc->xpmem_use_sr_buf != 0) || (subc->xpmem_buf_size > 2 * total_dsize)) && (subc->without_xpmem != 1)) {
547+
if (((subc->xpmem_use_sr_buf != 0) || (subc->xpmem_buf_size > 2 * total_dsize)) && (subc->without_xpmem != 1) && is_opt) {
534548
return mca_coll_acoll_allreduce_xpmem_f(sbuf, rbuf, count, dtype, op, comm, module, subc);
535549
} else {
536550
return ompi_coll_base_allreduce_intra_redscat_allgather(sbuf, rbuf, count, dtype,

ompi/mca/coll/acoll/coll_acoll_alltoall.c

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -529,14 +529,8 @@ int mca_coll_acoll_alltoall
529529
struct ompi_communicator_t *split_comm;
530530

531531
/* Select the right split_comm. */
532-
int pow2_idx = -2;
533-
int tmp_grp_split_f = grp_split_f;
534-
while (tmp_grp_split_f > 0)
535-
{
536-
pow2_idx += 1;
537-
tmp_grp_split_f = tmp_grp_split_f / 2;
538-
}
539-
split_comm = subc->split_comm[pow2_idx];
532+
int comm_idx = grp_split_f > 2 ? opal_cube_dim(grp_split_f/2) : 0;
533+
split_comm = subc->split_comm[comm_idx];
540534

541535
error = mca_coll_acoll_base_alltoall_dispatcher
542536
(sbuf, (grp_split_f * scount), sdtype,

ompi/mca/coll/acoll/coll_acoll_barrier.c

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,13 @@
2222
#include "coll_acoll.h"
2323
#include "coll_acoll_utils.h"
2424

25+
26+
27+
#define PROGRESS_COUNT 10000
28+
29+
int mca_coll_acoll_barrier_shm_h(struct ompi_communicator_t *comm, mca_coll_base_module_t *module, coll_acoll_subcomms_t *subc);
30+
int mca_coll_acoll_barrier_shm_f(struct ompi_communicator_t *comm, mca_coll_base_module_t *module, coll_acoll_subcomms_t *subc);
31+
2532
static int mca_coll_acoll_barrier_recv_subc(struct ompi_communicator_t *comm,
2633
mca_coll_base_module_t *module, ompi_request_t **reqs,
2734
int *nreqs, int root)
@@ -106,6 +113,170 @@ static int mca_coll_acoll_barrier_send_subc(struct ompi_communicator_t *comm,
106113
return err;
107114
}
108115

116+
int mca_coll_acoll_barrier_shm_h(struct ompi_communicator_t *comm, mca_coll_base_module_t *module, coll_acoll_subcomms_t *subc)
117+
{
118+
int err = MPI_SUCCESS;
119+
int root = 0;
120+
int rank = ompi_comm_rank(comm);
121+
int size = ompi_comm_size(comm);
122+
mca_coll_acoll_module_t *acoll_module = (mca_coll_acoll_module_t *) module;
123+
coll_acoll_init(module, comm, subc->data, subc, root);
124+
coll_acoll_data_t *data = subc->data;
125+
126+
if (NULL == data) {
127+
return -1;
128+
}
129+
130+
int l1_gp_size = data->l1_gp_size;
131+
int *l1_gp = data->l1_gp;
132+
int *l2_gp = data->l2_gp;
133+
int l2_gp_size = data->l2_gp_size;
134+
/* 16 * 1024 + 2 * 64 * size + 8 * 1024 * size */
135+
int offset_barrier = LEADER_SHM_SIZE + 2 * CACHE_LINE_SIZE * size + PER_RANK_SHM_SIZE * size
136+
+ CACHE_LINE_SIZE * size;
137+
138+
volatile int *root_rank_offset = (int *) ((char *) data->allshmmmap_sbuf[root] + offset_barrier
139+
+ CACHE_LINE_SIZE * rank);
140+
volatile int *l1_rank_offset = (int *) ((char *) data->allshmmmap_sbuf[l1_gp[0]]
141+
+ offset_barrier + CACHE_LINE_SIZE * rank);
142+
143+
volatile int *leader_shm;
144+
volatile int *my_leader_shm;
145+
leader_shm = (int *) ((char *) data->allshmmmap_sbuf[root] + offset_barrier
146+
+ CACHE_LINE_SIZE * root);
147+
my_leader_shm = (int *) ((char *) data->allshmmmap_sbuf[l1_gp[0]] + offset_barrier
148+
+ CACHE_LINE_SIZE * l1_gp[0]);
149+
int ready;
150+
int count = 0;
151+
if (rank == root) {
152+
ready = *leader_shm;
153+
for (int i = 0; i < l2_gp_size; i++) {
154+
if (l2_gp[i] == root)
155+
continue;
156+
volatile int *val = (int *) ((char *) data->allshmmmap_sbuf[root] + offset_barrier
157+
+ CACHE_LINE_SIZE * l2_gp[i]);
158+
while (*val != ready + 1) {
159+
count++;
160+
if (count == PROGRESS_COUNT) {
161+
count = 0;
162+
opal_progress();
163+
}
164+
}
165+
}
166+
ready++;
167+
for (int i = 0; i < l1_gp_size; i++) {
168+
if (l1_gp[i] == root)
169+
continue;
170+
volatile int *val = (int *) ((char *) data->allshmmmap_sbuf[root] + offset_barrier
171+
+ CACHE_LINE_SIZE * l1_gp[i]);
172+
while (*val != ready) {
173+
count++;
174+
if (count == PROGRESS_COUNT) {
175+
count = 0;
176+
opal_progress();
177+
}
178+
}
179+
}
180+
*leader_shm = ready;
181+
} else if (rank == l1_gp[0]) {
182+
int val = *l1_rank_offset;
183+
for (int i = 0; i < l1_gp_size; i++) {
184+
if (l1_gp[i] == l1_gp[0])
185+
continue;
186+
volatile int *vali = (int *) ((char *) data->allshmmmap_sbuf[l1_gp[0]] + offset_barrier
187+
+ CACHE_LINE_SIZE
188+
* l1_gp[i]); // do we need atomic_load here?
189+
while (*vali != val + 1) {
190+
count++;
191+
if (PROGRESS_COUNT == count) {
192+
count = 0;
193+
opal_progress();
194+
}
195+
}
196+
}
197+
val++;
198+
*root_rank_offset = val;
199+
while (*leader_shm != val) {
200+
count++;
201+
if (PROGRESS_COUNT == count) {
202+
count = 0;
203+
opal_progress();
204+
}
205+
}
206+
*l1_rank_offset = val;
207+
} else {
208+
209+
int done = *l1_rank_offset;
210+
done++;
211+
*l1_rank_offset = done;
212+
while (done != *my_leader_shm) {
213+
count++;
214+
if (10000 == count) {
215+
count = 0;
216+
opal_progress();
217+
}
218+
}
219+
}
220+
return err;
221+
}
222+
223+
224+
int mca_coll_acoll_barrier_shm_f(struct ompi_communicator_t *comm, mca_coll_base_module_t *module, coll_acoll_subcomms_t *subc)
225+
{
226+
int err = MPI_SUCCESS;
227+
int root = 0;
228+
int rank = ompi_comm_rank(comm);
229+
int size = ompi_comm_size(comm);
230+
mca_coll_acoll_module_t *acoll_module = (mca_coll_acoll_module_t *) module;
231+
232+
coll_acoll_init(module, comm, subc->data, subc, root);
233+
coll_acoll_data_t *data = subc->data;
234+
235+
if (NULL == data) {
236+
return -1;
237+
}
238+
239+
/* 16 * 1024 + 2 * 64 * size + 8 * 1024 * size */
240+
int offset_barrier = LEADER_SHM_SIZE + 2 * CACHE_LINE_SIZE * size + PER_RANK_SHM_SIZE * size
241+
+ CACHE_LINE_SIZE * size;
242+
243+
volatile int *root_rank_offset = (int *) ((char *) data->allshmmmap_sbuf[root] + offset_barrier
244+
+ CACHE_LINE_SIZE * rank);
245+
246+
volatile int *leader_shm;
247+
leader_shm = (int *) ((char *) data->allshmmmap_sbuf[root] + offset_barrier
248+
+ CACHE_LINE_SIZE * root);
249+
250+
int ready = *leader_shm;
251+
int count = 0;
252+
if (rank == root) {
253+
for (int i = 0; i < size; i++) {
254+
if (i == root)
255+
continue;
256+
volatile int *val = (int *) ((char *) data->allshmmmap_sbuf[root] + offset_barrier
257+
+ CACHE_LINE_SIZE * i);
258+
while (*val != ready + 1) {
259+
count++;
260+
if (count == PROGRESS_COUNT) {
261+
count = 0;
262+
opal_progress();
263+
}
264+
}
265+
}
266+
(*leader_shm)++;
267+
} else {
268+
int val = ++(*root_rank_offset);
269+
while (*leader_shm != val) {
270+
count++;
271+
if (PROGRESS_COUNT == count) {
272+
count = 0;
273+
opal_progress();
274+
}
275+
}
276+
}
277+
return err;
278+
}
279+
109280
/*
110281
* mca_coll_acoll_barrier_intra
111282
*
@@ -152,6 +323,16 @@ int mca_coll_acoll_barrier_intra(struct ompi_communicator_t *comm, mca_coll_base
152323
}
153324
num_nodes = size > 1 ? subc->num_nodes : 1;
154325

326+
/* Default barrier for intra-node case - shared memory hierarchical */
327+
/* ToDo: Need to check how this works with inter-case */
328+
if (1 == num_nodes) {
329+
if (0 == subc->barrier_algo) {
330+
return mca_coll_acoll_barrier_shm_h(comm, module, subc);
331+
} else if (1 == subc->barrier_algo) {
332+
return mca_coll_acoll_barrier_shm_f(comm, module, subc);
333+
}
334+
}
335+
155336
reqs = ompi_coll_base_comm_get_reqs(module->base_data, size);
156337
if (NULL == reqs) {
157338
return OMPI_ERR_OUT_OF_RESOURCE;

0 commit comments

Comments
 (0)