Skip to content

Commit f8deca0

Browse files
amd-nithyavsMithunMohanKadavil
authored andcommitted
coll/acoll: Add shared memory based barrier, bcast
Shared memory based implementations for bcast and barrier are enabled for within a node communication. Signed-off-by: Nithya V S <Nithya.VS@amd.com>
1 parent c346328 commit f8deca0

File tree

6 files changed

+395
-12
lines changed

6 files changed

+395
-12
lines changed

ompi/mca/coll/acoll/coll_acoll.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include <xpmem.h>
2727
#endif
2828

29+
#include "opal/mca/accelerator/accelerator.h"
2930
#include "opal/mca/shmem/base/base.h"
3031
#include "opal/mca/shmem/shmem.h"
3132

@@ -40,6 +41,7 @@ extern int mca_coll_acoll_sg_scale;
4041
extern int mca_coll_acoll_node_size;
4142
extern int mca_coll_acoll_force_numa;
4243
extern int mca_coll_acoll_use_dynamic_rules;
44+
extern int mca_coll_acoll_disable_shmbcast;
4345
extern int mca_coll_acoll_mnode_enable;
4446
extern int mca_coll_acoll_bcast_lin0;
4547
extern int mca_coll_acoll_bcast_lin1;
@@ -201,6 +203,7 @@ typedef struct coll_acoll_subcomms {
201203
coll_acoll_data_t *data;
202204
bool initialized_data;
203205
bool initialized_shm_data;
206+
int barrier_algo;
204207
#ifdef HAVE_XPMEM_H
205208
uint64_t xpmem_buf_size;
206209
int without_xpmem;
@@ -233,6 +236,7 @@ struct mca_coll_acoll_module_t {
233236
int log2_node_cnt;
234237
int force_numa;
235238
int use_dyn_rules;
239+
int disable_shmbcast;
236240
// Todo: Use substructure for every API related ones
237241
int use_mnode;
238242
int use_lin0;

ompi/mca/coll/acoll/coll_acoll_barrier.c

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,13 @@
2222
#include "coll_acoll.h"
2323
#include "coll_acoll_utils.h"
2424

25+
26+
27+
#define PROGRESS_COUNT 10000
28+
29+
int mca_coll_acoll_barrier_shm_h(struct ompi_communicator_t *comm, mca_coll_base_module_t *module, coll_acoll_subcomms_t *subc);
30+
int mca_coll_acoll_barrier_shm_f(struct ompi_communicator_t *comm, mca_coll_base_module_t *module, coll_acoll_subcomms_t *subc);
31+
2532
static int mca_coll_acoll_barrier_recv_subc(struct ompi_communicator_t *comm,
2633
mca_coll_base_module_t *module, ompi_request_t **reqs,
2734
int *nreqs, int root)
@@ -106,6 +113,170 @@ static int mca_coll_acoll_barrier_send_subc(struct ompi_communicator_t *comm,
106113
return err;
107114
}
108115

116+
int mca_coll_acoll_barrier_shm_h(struct ompi_communicator_t *comm, mca_coll_base_module_t *module, coll_acoll_subcomms_t *subc)
117+
{
118+
int err = MPI_SUCCESS;
119+
int root = 0;
120+
int rank = ompi_comm_rank(comm);
121+
int size = ompi_comm_size(comm);
122+
mca_coll_acoll_module_t *acoll_module = (mca_coll_acoll_module_t *) module;
123+
coll_acoll_init(module, comm, subc->data, subc, root);
124+
coll_acoll_data_t *data = subc->data;
125+
126+
if (NULL == data) {
127+
return -1;
128+
}
129+
130+
int l1_gp_size = data->l1_gp_size;
131+
int *l1_gp = data->l1_gp;
132+
int *l2_gp = data->l2_gp;
133+
int l2_gp_size = data->l2_gp_size;
134+
/* 16 * 1024 + 2 * 64 * size + 8 * 1024 * size */
135+
int offset_barrier = LEADER_SHM_SIZE + 2 * CACHE_LINE_SIZE * size + PER_RANK_SHM_SIZE * size
136+
+ CACHE_LINE_SIZE * size;
137+
138+
volatile int *root_rank_offset = (int *) ((char *) data->allshmmmap_sbuf[root] + offset_barrier
139+
+ CACHE_LINE_SIZE * rank);
140+
volatile int *l1_rank_offset = (int *) ((char *) data->allshmmmap_sbuf[l1_gp[0]]
141+
+ offset_barrier + CACHE_LINE_SIZE * rank);
142+
143+
volatile int *leader_shm;
144+
volatile int *my_leader_shm;
145+
leader_shm = (int *) ((char *) data->allshmmmap_sbuf[root] + offset_barrier
146+
+ CACHE_LINE_SIZE * root);
147+
my_leader_shm = (int *) ((char *) data->allshmmmap_sbuf[l1_gp[0]] + offset_barrier
148+
+ CACHE_LINE_SIZE * l1_gp[0]);
149+
int ready;
150+
int count = 0;
151+
if (rank == root) {
152+
ready = *leader_shm;
153+
for (int i = 0; i < l2_gp_size; i++) {
154+
if (l2_gp[i] == root)
155+
continue;
156+
volatile int *val = (int *) ((char *) data->allshmmmap_sbuf[root] + offset_barrier
157+
+ CACHE_LINE_SIZE * l2_gp[i]);
158+
while (*val != ready + 1) {
159+
count++;
160+
if (count == PROGRESS_COUNT) {
161+
count = 0;
162+
opal_progress();
163+
}
164+
}
165+
}
166+
ready++;
167+
for (int i = 0; i < l1_gp_size; i++) {
168+
if (l1_gp[i] == root)
169+
continue;
170+
volatile int *val = (int *) ((char *) data->allshmmmap_sbuf[root] + offset_barrier
171+
+ CACHE_LINE_SIZE * l1_gp[i]);
172+
while (*val != ready) {
173+
count++;
174+
if (count == PROGRESS_COUNT) {
175+
count = 0;
176+
opal_progress();
177+
}
178+
}
179+
}
180+
*leader_shm = ready;
181+
} else if (rank == l1_gp[0]) {
182+
int val = *l1_rank_offset;
183+
for (int i = 0; i < l1_gp_size; i++) {
184+
if (l1_gp[i] == l1_gp[0])
185+
continue;
186+
volatile int *vali = (int *) ((char *) data->allshmmmap_sbuf[l1_gp[0]] + offset_barrier
187+
+ CACHE_LINE_SIZE
188+
* l1_gp[i]); // do we need atomic_load here?
189+
while (*vali != val + 1) {
190+
count++;
191+
if (PROGRESS_COUNT == count) {
192+
count = 0;
193+
opal_progress();
194+
}
195+
}
196+
}
197+
val++;
198+
*root_rank_offset = val;
199+
while (*leader_shm != val) {
200+
count++;
201+
if (PROGRESS_COUNT == count) {
202+
count = 0;
203+
opal_progress();
204+
}
205+
}
206+
*l1_rank_offset = val;
207+
} else {
208+
209+
int done = *l1_rank_offset;
210+
done++;
211+
*l1_rank_offset = done;
212+
while (done != *my_leader_shm) {
213+
count++;
214+
if (10000 == count) {
215+
count = 0;
216+
opal_progress();
217+
}
218+
}
219+
}
220+
return err;
221+
}
222+
223+
224+
int mca_coll_acoll_barrier_shm_f(struct ompi_communicator_t *comm, mca_coll_base_module_t *module, coll_acoll_subcomms_t *subc)
225+
{
226+
int err = MPI_SUCCESS;
227+
int root = 0;
228+
int rank = ompi_comm_rank(comm);
229+
int size = ompi_comm_size(comm);
230+
mca_coll_acoll_module_t *acoll_module = (mca_coll_acoll_module_t *) module;
231+
232+
coll_acoll_init(module, comm, subc->data, subc, root);
233+
coll_acoll_data_t *data = subc->data;
234+
235+
if (NULL == data) {
236+
return -1;
237+
}
238+
239+
/* 16 * 1024 + 2 * 64 * size + 8 * 1024 * size */
240+
int offset_barrier = LEADER_SHM_SIZE + 2 * CACHE_LINE_SIZE * size + PER_RANK_SHM_SIZE * size
241+
+ CACHE_LINE_SIZE * size;
242+
243+
volatile int *root_rank_offset = (int *) ((char *) data->allshmmmap_sbuf[root] + offset_barrier
244+
+ CACHE_LINE_SIZE * rank);
245+
246+
volatile int *leader_shm;
247+
leader_shm = (int *) ((char *) data->allshmmmap_sbuf[root] + offset_barrier
248+
+ CACHE_LINE_SIZE * root);
249+
250+
int ready = *leader_shm;
251+
int count = 0;
252+
if (rank == root) {
253+
for (int i = 0; i < size; i++) {
254+
if (i == root)
255+
continue;
256+
volatile int *val = (int *) ((char *) data->allshmmmap_sbuf[root] + offset_barrier
257+
+ CACHE_LINE_SIZE * i);
258+
while (*val != ready + 1) {
259+
count++;
260+
if (count == PROGRESS_COUNT) {
261+
count = 0;
262+
opal_progress();
263+
}
264+
}
265+
}
266+
(*leader_shm)++;
267+
} else {
268+
int val = ++(*root_rank_offset);
269+
while (*leader_shm != val) {
270+
count++;
271+
if (PROGRESS_COUNT == count) {
272+
count = 0;
273+
opal_progress();
274+
}
275+
}
276+
}
277+
return err;
278+
}
279+
109280
/*
110281
* mca_coll_acoll_barrier_intra
111282
*
@@ -152,6 +323,16 @@ int mca_coll_acoll_barrier_intra(struct ompi_communicator_t *comm, mca_coll_base
152323
}
153324
num_nodes = size > 1 ? subc->num_nodes : 1;
154325

326+
/* Default barrier for intra-node case - shared memory hierarchical */
327+
/* ToDo: Need to check how this works with inter-case */
328+
if (1 == num_nodes) {
329+
if (0 == subc->barrier_algo) {
330+
return mca_coll_acoll_barrier_shm_h(comm, module, subc);
331+
} else if (1 == subc->barrier_algo) {
332+
return mca_coll_acoll_barrier_shm_f(comm, module, subc);
333+
}
334+
}
335+
155336
reqs = ompi_coll_base_comm_get_reqs(module->base_data, size);
156337
if (NULL == reqs) {
157338
return OMPI_ERR_OUT_OF_RESOURCE;

0 commit comments

Comments
 (0)