Skip to content

Commit 2981249

Browse files
authored
Merge pull request #5402 from hoopoepg/topic/common-del-procs
MCA/COMMON/UCX: del_procs calls are unified to common module
2 parents b6b9552 + 920cc2e commit 2981249

File tree

4 files changed

+103
-112
lines changed

4 files changed

+103
-112
lines changed

ompi/mca/pml/ucx/pml_ucx.c

Lines changed: 12 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -368,74 +368,32 @@ static inline ucp_ep_h mca_pml_ucx_get_ep(ompi_communicator_t *comm, int rank)
368368
return NULL;
369369
}
370370

371-
static void mca_pml_ucx_waitall(void **reqs, int *count_p)
372-
{
373-
int i;
374-
375-
PML_UCX_VERBOSE(2, "waiting for %d disconnect requests", *count_p);
376-
for (i = 0; i < *count_p; ++i) {
377-
opal_common_ucx_wait_request(reqs[i], ompi_pml_ucx.ucp_worker, "ucp_disconnect_nb");
378-
reqs[i] = NULL;
379-
}
380-
381-
*count_p = 0;
382-
}
383-
384371
int mca_pml_ucx_del_procs(struct ompi_proc_t **procs, size_t nprocs)
385372
{
386373
ompi_proc_t *proc;
387-
int num_reqs;
388-
size_t max_reqs;
389-
void *dreq, **dreqs;
390-
ucp_ep_h ep;
374+
opal_common_ucx_del_proc_t *del_procs;
391375
size_t i;
376+
int ret;
392377

393-
max_reqs = ompi_pml_ucx.num_disconnect;
394-
if (max_reqs > nprocs) {
395-
max_reqs = nprocs;
396-
}
397-
398-
dreqs = malloc(sizeof(*dreqs) * max_reqs);
399-
if (dreqs == NULL) {
378+
del_procs = malloc(sizeof(*del_procs) * nprocs);
379+
if (del_procs == NULL) {
400380
return OMPI_ERR_OUT_OF_RESOURCE;
401381
}
402382

403-
num_reqs = 0;
404-
405383
for (i = 0; i < nprocs; ++i) {
406-
proc = procs[(i + OMPI_PROC_MY_NAME->vpid) % nprocs];
407-
ep = proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML];
408-
if (ep == NULL) {
409-
continue;
410-
}
384+
proc = procs[i];
385+
del_procs[i].ep = proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML];
386+
del_procs[i].vpid = proc->super.proc_name.vpid;
411387

388+
/* mark peer as disconnected */
412389
proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML] = NULL;
413-
414-
PML_UCX_VERBOSE(2, "disconnecting from rank %d", proc->super.proc_name.vpid);
415-
dreq = ucp_disconnect_nb(ep);
416-
if (dreq != NULL) {
417-
if (UCS_PTR_IS_ERR(dreq)) {
418-
PML_UCX_ERROR("ucp_disconnect_nb(%d) failed: %s",
419-
proc->super.proc_name.vpid,
420-
ucs_status_string(UCS_PTR_STATUS(dreq)));
421-
continue;
422-
} else {
423-
dreqs[num_reqs++] = dreq;
424-
if (num_reqs >= ompi_pml_ucx.num_disconnect) {
425-
mca_pml_ucx_waitall(dreqs, &num_reqs);
426-
}
427-
}
428-
}
429390
}
430-
/* num_reqs == 0 is processed by mca_pml_ucx_waitall routine,
431-
* so suppress coverity warning */
432-
/* coverity[uninit_use_in_call] */
433-
mca_pml_ucx_waitall(dreqs, &num_reqs);
434-
free(dreqs);
435391

436-
opal_common_ucx_mca_pmix_fence(ompi_pml_ucx.ucp_worker);
392+
ret = opal_common_ucx_del_procs(del_procs, nprocs, OMPI_PROC_MY_NAME->vpid,
393+
ompi_pml_ucx.num_disconnect, ompi_pml_ucx.ucp_worker);
394+
free(del_procs);
437395

438-
return OMPI_SUCCESS;
396+
return ret;
439397
}
440398

441399
int mca_pml_ucx_enable(bool enable)

opal/mca/common/ucx/common_ucx.c

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,3 +107,69 @@ OPAL_DECLSPEC void opal_common_ucx_mca_pmix_fence(ucp_worker_h worker)
107107
}
108108
}
109109

110+
111+
static void opal_common_ucx_wait_all_requests(void **reqs, int count, ucp_worker_h worker)
112+
{
113+
int i;
114+
115+
MCA_COMMON_UCX_VERBOSE(2, "waiting for %d disconnect requests", count);
116+
for (i = 0; i < count; ++i) {
117+
opal_common_ucx_wait_request(reqs[i], worker, "ucp_disconnect_nb");
118+
reqs[i] = NULL;
119+
}
120+
}
121+
122+
OPAL_DECLSPEC int opal_common_ucx_del_procs(opal_common_ucx_del_proc_t *procs, size_t count,
123+
size_t my_rank, size_t max_disconnect, ucp_worker_h worker)
124+
{
125+
size_t num_reqs;
126+
size_t max_reqs;
127+
void *dreq, **dreqs;
128+
size_t i;
129+
size_t n;
130+
131+
MCA_COMMON_UCX_ASSERT(procs || !count);
132+
MCA_COMMON_UCX_ASSERT(max_disconnect > 0);
133+
134+
max_reqs = (max_disconnect > count) ? count : max_disconnect;
135+
136+
dreqs = malloc(sizeof(*dreqs) * max_reqs);
137+
if (dreqs == NULL) {
138+
return OPAL_ERR_OUT_OF_RESOURCE;
139+
}
140+
141+
num_reqs = 0;
142+
143+
for (i = 0; i < count; ++i) {
144+
n = (i + my_rank) % count;
145+
if (procs[n].ep == NULL) {
146+
continue;
147+
}
148+
149+
MCA_COMMON_UCX_VERBOSE(2, "disconnecting from rank %zu", procs[n].vpid);
150+
dreq = ucp_disconnect_nb(procs[n].ep);
151+
if (dreq != NULL) {
152+
if (UCS_PTR_IS_ERR(dreq)) {
153+
MCA_COMMON_UCX_ERROR("ucp_disconnect_nb(%zu) failed: %s", procs[n].vpid,
154+
ucs_status_string(UCS_PTR_STATUS(dreq)));
155+
continue;
156+
} else {
157+
dreqs[num_reqs++] = dreq;
158+
if (num_reqs >= max_disconnect) {
159+
opal_common_ucx_wait_all_requests(dreqs, num_reqs, worker);
160+
num_reqs = 0;
161+
}
162+
}
163+
}
164+
}
165+
/* num_reqs == 0 is processed by opal_common_ucx_wait_all_requests routine,
166+
* so suppress coverity warning */
167+
/* coverity[uninit_use_in_call] */
168+
opal_common_ucx_wait_all_requests(dreqs, num_reqs, worker);
169+
free(dreqs);
170+
171+
opal_common_ucx_mca_pmix_fence(worker);
172+
173+
return OPAL_SUCCESS;
174+
}
175+

opal/mca/common/ucx/common_ucx.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,19 @@ typedef struct opal_common_ucx_module {
6060
bool opal_mem_hooks;
6161
} opal_common_ucx_module_t;
6262

63+
typedef struct opal_common_ucx_del_proc {
64+
ucp_ep_h ep;
65+
size_t vpid;
66+
} opal_common_ucx_del_proc_t;
67+
6368
extern opal_common_ucx_module_t opal_common_ucx;
6469

6570
OPAL_DECLSPEC void opal_common_ucx_mca_register(void);
6671
OPAL_DECLSPEC void opal_common_ucx_mca_deregister(void);
6772
OPAL_DECLSPEC void opal_common_ucx_empty_complete_cb(void *request, ucs_status_t status);
6873
OPAL_DECLSPEC void opal_common_ucx_mca_pmix_fence(ucp_worker_h worker);
74+
OPAL_DECLSPEC int opal_common_ucx_del_procs(opal_common_ucx_del_proc_t *procs, size_t count,
75+
size_t my_rank, size_t max_disconnect, ucp_worker_h worker);
6976

7077
static inline
7178
int opal_common_ucx_wait_request(ucs_status_ptr_t request, ucp_worker_h worker,

oshmem/mca/spml/ucx/spml_ucx.c

Lines changed: 18 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -103,82 +103,42 @@ int mca_spml_ucx_enable(bool enable)
103103
return OSHMEM_SUCCESS;
104104
}
105105

106-
107-
static void mca_spml_ucx_waitall(void **reqs, int *count_p)
108-
{
109-
int i;
110-
111-
SPML_UCX_VERBOSE(10, "waiting for %d disconnect requests", *count_p);
112-
for (i = 0; i < *count_p; ++i) {
113-
opal_common_ucx_wait_request(reqs[i], mca_spml_ucx_ctx_default.ucp_worker, "ucp_disconnect_nb");
114-
reqs[i] = NULL;
115-
}
116-
117-
*count_p = 0;
118-
}
119-
120106
int mca_spml_ucx_del_procs(ompi_proc_t** procs, size_t nprocs)
121107
{
122-
int my_rank = oshmem_my_proc_id();
123-
int num_reqs;
124-
size_t max_reqs;
125-
void *dreq, **dreqs;
126-
ucp_ep_h ep;
127-
size_t i, n;
108+
opal_common_ucx_del_proc_t *del_procs;
109+
size_t i;
110+
int ret;
128111

129112
oshmem_shmem_barrier();
130113

131114
if (!mca_spml_ucx_ctx_default.ucp_peers) {
132115
return OSHMEM_SUCCESS;
133116
}
134117

135-
max_reqs = mca_spml_ucx.num_disconnect;
136-
if (max_reqs > nprocs) {
137-
max_reqs = nprocs;
138-
}
139-
140-
dreqs = malloc(sizeof(*dreqs) * max_reqs);
141-
if (dreqs == NULL) {
118+
del_procs = malloc(sizeof(*del_procs) * nprocs);
119+
if (del_procs == NULL) {
142120
return OMPI_ERR_OUT_OF_RESOURCE;
143121
}
144122

145-
num_reqs = 0;
146-
147123
for (i = 0; i < nprocs; ++i) {
148-
n = (i + my_rank) % nprocs;
149-
ep = mca_spml_ucx_ctx_default.ucp_peers[n].ucp_conn;
150-
if (ep == NULL) {
151-
continue;
152-
}
124+
del_procs[i].ep = mca_spml_ucx_ctx_default.ucp_peers[i].ucp_conn;
125+
del_procs[i].vpid = i;
153126

154-
mca_spml_ucx_ctx_default.ucp_peers[n].ucp_conn = NULL;
155-
156-
SPML_UCX_VERBOSE(10, "disconnecting from peer %zu", n);
157-
dreq = ucp_disconnect_nb(ep);
158-
if (dreq != NULL) {
159-
if (UCS_PTR_IS_ERR(dreq)) {
160-
SPML_UCX_ERROR("ucp_disconnect_nb(%zu) failed: %s", n,
161-
ucs_status_string(UCS_PTR_STATUS(dreq)));
162-
continue;
163-
} else {
164-
dreqs[num_reqs++] = dreq;
165-
if (num_reqs >= mca_spml_ucx.num_disconnect) {
166-
mca_spml_ucx_waitall(dreqs, &num_reqs);
167-
}
168-
}
169-
}
127+
/* mark peer as disconnected */
128+
mca_spml_ucx_ctx_default.ucp_peers[i].ucp_conn = NULL;
170129
}
171-
/* num_reqs == 0 is processed by mca_pml_ucx_waitall routine,
172-
* so suppress coverity warning */
173-
/* coverity[uninit_use_in_call] */
174-
mca_spml_ucx_waitall(dreqs, &num_reqs);
175-
free(dreqs);
176-
free(mca_spml_ucx.remote_addrs_tbl);
177130

178-
opal_common_ucx_mca_pmix_fence(mca_spml_ucx_ctx_default.ucp_worker);
131+
ret = opal_common_ucx_del_procs(del_procs, nprocs, oshmem_my_proc_id(),
132+
mca_spml_ucx.num_disconnect,
133+
mca_spml_ucx_ctx_default.ucp_worker);
134+
135+
free(del_procs);
136+
free(mca_spml_ucx.remote_addrs_tbl);
179137
free(mca_spml_ucx_ctx_default.ucp_peers);
138+
180139
mca_spml_ucx_ctx_default.ucp_peers = NULL;
181-
return OSHMEM_SUCCESS;
140+
141+
return ret;
182142
}
183143

184144
/* TODO: move func into common place, use it with rkey exchng too */

0 commit comments

Comments
 (0)