Skip to content

Commit e1c1ab0

Browse files
xinzhao3Tomislav Janjusic
authored andcommitted
ompi/oshmem/spml/ucx: defer clean up shmem_ctx to shmem_finalize
Signed-off-by: Tomislav Janjusic <tomislavj@mellanox.com>
1 parent 48033ac commit e1c1ab0

File tree

5 files changed

+80
-51
lines changed

5 files changed

+80
-51
lines changed

opal/mca/common/ucx/common_ucx.c

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,10 @@ void opal_common_ucx_mca_proc_added(void)
151151
}
152152
}
153153
#endif
154+
155+
OPAL_DECLSPEC int opal_common_ucx_mca_pmix_fence_nb(int *fenced)
156+
{
157+
return opal_pmix.fence_nb(NULL, 0, opal_common_ucx_mca_fence_complete_cb, (void *)fenced);
154158
}
155159

156160
OPAL_DECLSPEC int opal_common_ucx_mca_pmix_fence(ucp_worker_h worker)
@@ -182,9 +186,8 @@ static void opal_common_ucx_wait_all_requests(void **reqs, int count, ucp_worker
182186
}
183187
}
184188

185-
OPAL_DECLSPEC int opal_common_ucx_del_procs(opal_common_ucx_del_proc_t *procs, size_t count,
186-
size_t my_rank, size_t max_disconnect, ucp_worker_h worker)
187-
{
189+
OPAL_DECLSPEC int opal_common_ucx_del_procs_nofence(opal_common_ucx_del_proc_t *procs, size_t count,
190+
size_t my_rank, size_t max_disconnect, ucp_worker_h worker) {
188191
size_t num_reqs;
189192
size_t max_reqs;
190193
void *dreq, **dreqs;
@@ -232,10 +235,14 @@ OPAL_DECLSPEC int opal_common_ucx_del_procs(opal_common_ucx_del_proc_t *procs, s
232235
opal_common_ucx_wait_all_requests(dreqs, num_reqs, worker);
233236
free(dreqs);
234237

235-
if (OPAL_SUCCESS != (ret = opal_common_ucx_mca_pmix_fence(worker))) {
236-
return ret;
237-
}
238-
239238
return OPAL_SUCCESS;
240239
}
241240

241+
OPAL_DECLSPEC int opal_common_ucx_del_procs(opal_common_ucx_del_proc_t *procs, size_t count,
242+
size_t my_rank, size_t max_disconnect, ucp_worker_h worker)
243+
{
244+
opal_common_ucx_del_procs_nofence(procs, count, my_rank, max_disconnect, worker);
245+
246+
return opal_common_ucx_mca_pmix_fence(worker);
247+
}
248+

opal/mca/common/ucx/common_ucx.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,11 @@ OPAL_DECLSPEC void opal_common_ucx_mca_deregister(void);
105105
OPAL_DECLSPEC void opal_common_ucx_mca_proc_added(void);
106106
OPAL_DECLSPEC void opal_common_ucx_empty_complete_cb(void *request, ucs_status_t status);
107107
OPAL_DECLSPEC int opal_common_ucx_mca_pmix_fence(ucp_worker_h worker);
108+
OPAL_DECLSPEC int opal_common_ucx_mca_pmix_fence_nb(int *fenced);
108109
OPAL_DECLSPEC int opal_common_ucx_del_procs(opal_common_ucx_del_proc_t *procs, size_t count,
109110
size_t my_rank, size_t max_disconnect, ucp_worker_h worker);
111+
OPAL_DECLSPEC int opal_common_ucx_del_procs_nofence(opal_common_ucx_del_proc_t *procs, size_t count,
112+
size_t my_rank, size_t max_disconnect, ucp_worker_h worker);
110113
OPAL_DECLSPEC void opal_common_ucx_mca_var_register(const mca_base_component_t *component);
111114

112115
static inline

oshmem/mca/spml/ucx/spml_ucx.c

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -654,30 +654,7 @@ void mca_spml_ucx_ctx_destroy(shmem_ctx_t ctx)
654654
mca_spml_ucx_ctx_list_item_t) {
655655
if ((shmem_ctx_t)(&ctx_item->ctx) == ctx) {
656656
opal_list_remove_item(&(mca_spml_ucx.ctx_list), &ctx_item->super);
657-
658-
opal_common_ucx_del_proc_t *del_procs;
659-
del_procs = malloc(sizeof(*del_procs) * nprocs);
660-
661-
for (i = 0; i < nprocs; ++i) {
662-
for (j = 0; j < MCA_MEMHEAP_SEG_COUNT; j++) {
663-
if (ctx_item->ctx.ucp_peers[i].mkeys[j].key.rkey != NULL) {
664-
ucp_rkey_destroy(ctx_item->ctx.ucp_peers[i].mkeys[j].key.rkey);
665-
}
666-
}
667-
668-
del_procs[i].ep = ctx_item->ctx.ucp_peers[i].ucp_conn;
669-
del_procs[i].vpid = i;
670-
ctx_item->ctx.ucp_peers[i].ucp_conn = NULL;
671-
}
672-
673-
opal_common_ucx_del_procs(del_procs, nprocs, oshmem_my_proc_id(),
674-
mca_spml_ucx.num_disconnect,
675-
ctx_item->ctx.ucp_worker);
676-
free(del_procs);
677-
free(ctx_item->ctx.ucp_peers);
678-
679-
ucp_worker_destroy(ctx_item->ctx.ucp_worker);
680-
OBJ_RELEASE(ctx_item);
657+
opal_list_append(&(mca_spml_ucx.idle_ctx_list), &ctx_item->super);
681658
break;
682659
}
683660
}

oshmem/mca/spml/ucx/spml_ucx.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ struct mca_spml_ucx {
9191
mca_spml_ucx_get_mkey_slow_fn_t get_mkey_slow;
9292
char **remote_addrs_tbl;
9393
opal_list_t ctx_list;
94+
opal_list_t idle_ctx_list;
9495
int priority; /* component priority */
9596
shmem_internal_mutex_t internal_mutex;
9697
};

oshmem/mca/spml/ucx/spml_ucx_component.c

Lines changed: 61 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ static int spml_ucx_init(void)
176176
}
177177

178178
OBJ_CONSTRUCT(&(mca_spml_ucx.ctx_list), opal_list_t);
179+
OBJ_CONSTRUCT(&(mca_spml_ucx.idle_ctx_list), opal_list_t);
179180
SHMEM_MUTEX_INIT(mca_spml_ucx.internal_mutex);
180181

181182
wkr_params.field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE;
@@ -224,42 +225,81 @@ mca_spml_ucx_component_init(int* priority,
224225
return &mca_spml_ucx.super;
225226
}
226227

228+
static void _ctx_cleanup(mca_spml_ucx_ctx_list_item_t *ctx_item)
229+
{
230+
int i, j, nprocs = oshmem_num_procs();
231+
opal_common_ucx_del_proc_t *del_procs;
232+
233+
del_procs = malloc(sizeof(*del_procs) * nprocs);
234+
235+
for (i = 0; i < nprocs; ++i) {
236+
for (j = 0; j < MCA_MEMHEAP_SEG_COUNT; j++) {
237+
if (ctx_item->ctx.ucp_peers[i].mkeys[j].key.rkey != NULL) {
238+
ucp_rkey_destroy(ctx_item->ctx.ucp_peers[i].mkeys[j].key.rkey);
239+
}
240+
}
241+
242+
del_procs[i].ep = ctx_item->ctx.ucp_peers[i].ucp_conn;
243+
del_procs[i].vpid = i;
244+
ctx_item->ctx.ucp_peers[i].ucp_conn = NULL;
245+
}
246+
247+
opal_common_ucx_del_procs_nofence(del_procs, nprocs, oshmem_my_proc_id(),
248+
mca_spml_ucx.num_disconnect,
249+
ctx_item->ctx.ucp_worker);
250+
free(del_procs);
251+
free(ctx_item->ctx.ucp_peers);
252+
}
253+
227254
static int mca_spml_ucx_component_fini(void)
228255
{
229256
mca_spml_ucx_ctx_list_item_t *ctx_item, *next;
230-
size_t i, j, nprocs = oshmem_num_procs();
257+
int fenced = 0;
258+
int ret = OSHMEM_SUCCESS;
231259

232260
opal_progress_unregister(spml_ucx_progress);
233261

234262
if(!mca_spml_ucx.enabled)
235263
return OSHMEM_SUCCESS; /* never selected.. return success.. */
236264

237265
/* delete context objects from list */
238-
OPAL_LIST_FOREACH_SAFE(ctx_item, next, &(mca_spml_ucx.ctx_list),
266+
OPAL_LIST_FOREACH_SAFE(ctx_item, next, &(mca_spml_ucx.idle_ctx_list),
239267
mca_spml_ucx_ctx_list_item_t) {
240-
opal_list_remove_item(&(mca_spml_ucx.ctx_list), &ctx_item->super);
268+
_ctx_cleanup(ctx_item);
269+
}
241270

242-
opal_common_ucx_del_proc_t *del_procs;
243-
del_procs = malloc(sizeof(*del_procs) * nprocs);
271+
OPAL_LIST_FOREACH_SAFE(ctx_item, next, &(mca_spml_ucx.ctx_list),
272+
mca_spml_ucx_ctx_list_item_t) {
273+
_ctx_cleanup(ctx_item);
274+
}
244275

245-
for (i = 0; i < nprocs; ++i) {
246-
for (j = 0; j < MCA_MEMHEAP_SEG_COUNT; j++) {
247-
if (ctx_item->ctx.ucp_peers[i].mkeys[j].key.rkey != NULL) {
248-
ucp_rkey_destroy(ctx_item->ctx.ucp_peers[i].mkeys[j].key.rkey);
249-
}
250-
}
276+
ret = opal_common_ucx_mca_pmix_fence_nb(&fenced);
277+
if (OPAL_SUCCESS != ret) {
278+
return ret;
279+
}
251280

252-
del_procs[i].ep = ctx_item->ctx.ucp_peers[i].ucp_conn;
253-
del_procs[i].vpid = i;
254-
ctx_item->ctx.ucp_peers[i].ucp_conn = NULL;
281+
while (!fenced) {
282+
OPAL_LIST_FOREACH_SAFE(ctx_item, next, &(mca_spml_ucx.ctx_list),
283+
mca_spml_ucx_ctx_list_item_t) {
284+
ucp_worker_progress(ctx_item->ctx.ucp_worker);
255285
}
286+
OPAL_LIST_FOREACH_SAFE(ctx_item, next, &(mca_spml_ucx.idle_ctx_list),
287+
mca_spml_ucx_ctx_list_item_t) {
288+
ucp_worker_progress(ctx_item->ctx.ucp_worker);
289+
}
290+
ucp_worker_progress(mca_spml_ucx_ctx_default.ucp_worker);
291+
}
256292

257-
opal_common_ucx_del_procs(del_procs, nprocs, oshmem_my_proc_id(),
258-
mca_spml_ucx.num_disconnect,
259-
ctx_item->ctx.ucp_worker);
260-
free(del_procs);
261-
free(ctx_item->ctx.ucp_peers);
262-
293+
/* delete all workers */
294+
OPAL_LIST_FOREACH_SAFE(ctx_item, next, &(mca_spml_ucx.idle_ctx_list),
295+
mca_spml_ucx_ctx_list_item_t) {
296+
opal_list_remove_item(&(mca_spml_ucx.idle_ctx_list), &ctx_item->super);
297+
ucp_worker_destroy(ctx_item->ctx.ucp_worker);
298+
OBJ_RELEASE(ctx_item);
299+
}
300+
OPAL_LIST_FOREACH_SAFE(ctx_item, next, &(mca_spml_ucx.ctx_list),
301+
mca_spml_ucx_ctx_list_item_t) {
302+
opal_list_remove_item(&(mca_spml_ucx.ctx_list), &ctx_item->super);
263303
ucp_worker_destroy(ctx_item->ctx.ucp_worker);
264304
OBJ_RELEASE(ctx_item);
265305
}
@@ -271,6 +311,7 @@ static int mca_spml_ucx_component_fini(void)
271311
mca_spml_ucx.enabled = false; /* not anymore */
272312

273313
OBJ_DESTRUCT(&(mca_spml_ucx.ctx_list));
314+
OBJ_DESTRUCT(&(mca_spml_ucx.idle_ctx_list));
274315
SHMEM_MUTEX_DESTROY(mca_spml_ucx.internal_mutex);
275316

276317
if (mca_spml_ucx.ucp_context) {

0 commit comments

Comments
 (0)