Skip to content

Commit 920cc2e

Browse files
author
Sergey Oblomov
committed
MCA/COMMON/UCX: del_procs calls are unified to common module
Signed-off-by: Sergey Oblomov <sergeyo@mellanox.com>
1 parent 36cc69d commit 920cc2e

File tree

4 files changed

+103
-112
lines changed

4 files changed

+103
-112
lines changed

ompi/mca/pml/ucx/pml_ucx.c

Lines changed: 12 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -368,74 +368,32 @@ static inline ucp_ep_h mca_pml_ucx_get_ep(ompi_communicator_t *comm, int rank)
368368
return NULL;
369369
}
370370

371-
static void mca_pml_ucx_waitall(void **reqs, int *count_p)
372-
{
373-
int i;
374-
375-
PML_UCX_VERBOSE(2, "waiting for %d disconnect requests", *count_p);
376-
for (i = 0; i < *count_p; ++i) {
377-
opal_common_ucx_wait_request(reqs[i], ompi_pml_ucx.ucp_worker, "ucp_disconnect_nb");
378-
reqs[i] = NULL;
379-
}
380-
381-
*count_p = 0;
382-
}
383-
384371
int mca_pml_ucx_del_procs(struct ompi_proc_t **procs, size_t nprocs)
385372
{
386373
ompi_proc_t *proc;
387-
int num_reqs;
388-
size_t max_reqs;
389-
void *dreq, **dreqs;
390-
ucp_ep_h ep;
374+
opal_common_ucx_del_proc_t *del_procs;
391375
size_t i;
376+
int ret;
392377

393-
max_reqs = ompi_pml_ucx.num_disconnect;
394-
if (max_reqs > nprocs) {
395-
max_reqs = nprocs;
396-
}
397-
398-
dreqs = malloc(sizeof(*dreqs) * max_reqs);
399-
if (dreqs == NULL) {
378+
del_procs = malloc(sizeof(*del_procs) * nprocs);
379+
if (del_procs == NULL) {
400380
return OMPI_ERR_OUT_OF_RESOURCE;
401381
}
402382

403-
num_reqs = 0;
404-
405383
for (i = 0; i < nprocs; ++i) {
406-
proc = procs[(i + OMPI_PROC_MY_NAME->vpid) % nprocs];
407-
ep = proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML];
408-
if (ep == NULL) {
409-
continue;
410-
}
384+
proc = procs[i];
385+
del_procs[i].ep = proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML];
386+
del_procs[i].vpid = proc->super.proc_name.vpid;
411387

388+
/* mark peer as disconnected */
412389
proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML] = NULL;
413-
414-
PML_UCX_VERBOSE(2, "disconnecting from rank %d", proc->super.proc_name.vpid);
415-
dreq = ucp_disconnect_nb(ep);
416-
if (dreq != NULL) {
417-
if (UCS_PTR_IS_ERR(dreq)) {
418-
PML_UCX_ERROR("ucp_disconnect_nb(%d) failed: %s",
419-
proc->super.proc_name.vpid,
420-
ucs_status_string(UCS_PTR_STATUS(dreq)));
421-
continue;
422-
} else {
423-
dreqs[num_reqs++] = dreq;
424-
if (num_reqs >= ompi_pml_ucx.num_disconnect) {
425-
mca_pml_ucx_waitall(dreqs, &num_reqs);
426-
}
427-
}
428-
}
429390
}
430-
/* num_reqs == 0 is processed by mca_pml_ucx_waitall routine,
431-
* so suppress coverity warning */
432-
/* coverity[uninit_use_in_call] */
433-
mca_pml_ucx_waitall(dreqs, &num_reqs);
434-
free(dreqs);
435391

436-
opal_common_ucx_mca_pmix_fence(ompi_pml_ucx.ucp_worker);
392+
ret = opal_common_ucx_del_procs(del_procs, nprocs, OMPI_PROC_MY_NAME->vpid,
393+
ompi_pml_ucx.num_disconnect, ompi_pml_ucx.ucp_worker);
394+
free(del_procs);
437395

438-
return OMPI_SUCCESS;
396+
return ret;
439397
}
440398

441399
int mca_pml_ucx_enable(bool enable)

opal/mca/common/ucx/common_ucx.c

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,3 +74,69 @@ OPAL_DECLSPEC void opal_common_ucx_mca_pmix_fence(ucp_worker_h worker)
7474
}
7575
}
7676

77+
78+
static void opal_common_ucx_wait_all_requests(void **reqs, int count, ucp_worker_h worker)
79+
{
80+
int i;
81+
82+
MCA_COMMON_UCX_VERBOSE(2, "waiting for %d disconnect requests", count);
83+
for (i = 0; i < count; ++i) {
84+
opal_common_ucx_wait_request(reqs[i], worker, "ucp_disconnect_nb");
85+
reqs[i] = NULL;
86+
}
87+
}
88+
89+
OPAL_DECLSPEC int opal_common_ucx_del_procs(opal_common_ucx_del_proc_t *procs, size_t count,
90+
size_t my_rank, size_t max_disconnect, ucp_worker_h worker)
91+
{
92+
size_t num_reqs;
93+
size_t max_reqs;
94+
void *dreq, **dreqs;
95+
size_t i;
96+
size_t n;
97+
98+
MCA_COMMON_UCX_ASSERT(procs || !count);
99+
MCA_COMMON_UCX_ASSERT(max_disconnect > 0);
100+
101+
max_reqs = (max_disconnect > count) ? count : max_disconnect;
102+
103+
dreqs = malloc(sizeof(*dreqs) * max_reqs);
104+
if (dreqs == NULL) {
105+
return OPAL_ERR_OUT_OF_RESOURCE;
106+
}
107+
108+
num_reqs = 0;
109+
110+
for (i = 0; i < count; ++i) {
111+
n = (i + my_rank) % count;
112+
if (procs[n].ep == NULL) {
113+
continue;
114+
}
115+
116+
MCA_COMMON_UCX_VERBOSE(2, "disconnecting from rank %zu", procs[n].vpid);
117+
dreq = ucp_disconnect_nb(procs[n].ep);
118+
if (dreq != NULL) {
119+
if (UCS_PTR_IS_ERR(dreq)) {
120+
MCA_COMMON_UCX_ERROR("ucp_disconnect_nb(%zu) failed: %s", procs[n].vpid,
121+
ucs_status_string(UCS_PTR_STATUS(dreq)));
122+
continue;
123+
} else {
124+
dreqs[num_reqs++] = dreq;
125+
if (num_reqs >= max_disconnect) {
126+
opal_common_ucx_wait_all_requests(dreqs, num_reqs, worker);
127+
num_reqs = 0;
128+
}
129+
}
130+
}
131+
}
132+
/* num_reqs == 0 is processed by opal_common_ucx_wait_all_requests routine,
133+
* so suppress coverity warning */
134+
/* coverity[uninit_use_in_call] */
135+
opal_common_ucx_wait_all_requests(dreqs, num_reqs, worker);
136+
free(dreqs);
137+
138+
opal_common_ucx_mca_pmix_fence(worker);
139+
140+
return OPAL_SUCCESS;
141+
}
142+

opal/mca/common/ucx/common_ucx.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,19 @@ typedef struct opal_common_ucx_module {
5959
int registered;
6060
} opal_common_ucx_module_t;
6161

62+
typedef struct opal_common_ucx_del_proc {
63+
ucp_ep_h ep;
64+
size_t vpid;
65+
} opal_common_ucx_del_proc_t;
66+
6267
extern opal_common_ucx_module_t opal_common_ucx;
6368

6469
OPAL_DECLSPEC void opal_common_ucx_mca_register(void);
6570
OPAL_DECLSPEC void opal_common_ucx_mca_deregister(void);
6671
OPAL_DECLSPEC void opal_common_ucx_empty_complete_cb(void *request, ucs_status_t status);
6772
OPAL_DECLSPEC void opal_common_ucx_mca_pmix_fence(ucp_worker_h worker);
73+
OPAL_DECLSPEC int opal_common_ucx_del_procs(opal_common_ucx_del_proc_t *procs, size_t count,
74+
size_t my_rank, size_t max_disconnect, ucp_worker_h worker);
6875

6976
static inline
7077
int opal_common_ucx_wait_request(ucs_status_ptr_t request, ucp_worker_h worker,

oshmem/mca/spml/ucx/spml_ucx.c

Lines changed: 18 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -103,82 +103,42 @@ int mca_spml_ucx_enable(bool enable)
103103
return OSHMEM_SUCCESS;
104104
}
105105

106-
107-
static void mca_spml_ucx_waitall(void **reqs, int *count_p)
108-
{
109-
int i;
110-
111-
SPML_UCX_VERBOSE(10, "waiting for %d disconnect requests", *count_p);
112-
for (i = 0; i < *count_p; ++i) {
113-
opal_common_ucx_wait_request(reqs[i], mca_spml_ucx_ctx_default.ucp_worker, "ucp_disconnect_nb");
114-
reqs[i] = NULL;
115-
}
116-
117-
*count_p = 0;
118-
}
119-
120106
int mca_spml_ucx_del_procs(ompi_proc_t** procs, size_t nprocs)
121107
{
122-
int my_rank = oshmem_my_proc_id();
123-
int num_reqs;
124-
size_t max_reqs;
125-
void *dreq, **dreqs;
126-
ucp_ep_h ep;
127-
size_t i, n;
108+
opal_common_ucx_del_proc_t *del_procs;
109+
size_t i;
110+
int ret;
128111

129112
oshmem_shmem_barrier();
130113

131114
if (!mca_spml_ucx_ctx_default.ucp_peers) {
132115
return OSHMEM_SUCCESS;
133116
}
134117

135-
max_reqs = mca_spml_ucx.num_disconnect;
136-
if (max_reqs > nprocs) {
137-
max_reqs = nprocs;
138-
}
139-
140-
dreqs = malloc(sizeof(*dreqs) * max_reqs);
141-
if (dreqs == NULL) {
118+
del_procs = malloc(sizeof(*del_procs) * nprocs);
119+
if (del_procs == NULL) {
142120
return OMPI_ERR_OUT_OF_RESOURCE;
143121
}
144122

145-
num_reqs = 0;
146-
147123
for (i = 0; i < nprocs; ++i) {
148-
n = (i + my_rank) % nprocs;
149-
ep = mca_spml_ucx_ctx_default.ucp_peers[n].ucp_conn;
150-
if (ep == NULL) {
151-
continue;
152-
}
124+
del_procs[i].ep = mca_spml_ucx_ctx_default.ucp_peers[i].ucp_conn;
125+
del_procs[i].vpid = i;
153126

154-
mca_spml_ucx_ctx_default.ucp_peers[n].ucp_conn = NULL;
155-
156-
SPML_UCX_VERBOSE(10, "disconnecting from peer %zu", n);
157-
dreq = ucp_disconnect_nb(ep);
158-
if (dreq != NULL) {
159-
if (UCS_PTR_IS_ERR(dreq)) {
160-
SPML_UCX_ERROR("ucp_disconnect_nb(%zu) failed: %s", n,
161-
ucs_status_string(UCS_PTR_STATUS(dreq)));
162-
continue;
163-
} else {
164-
dreqs[num_reqs++] = dreq;
165-
if (num_reqs >= mca_spml_ucx.num_disconnect) {
166-
mca_spml_ucx_waitall(dreqs, &num_reqs);
167-
}
168-
}
169-
}
127+
/* mark peer as disconnected */
128+
mca_spml_ucx_ctx_default.ucp_peers[i].ucp_conn = NULL;
170129
}
171-
/* num_reqs == 0 is processed by mca_pml_ucx_waitall routine,
172-
* so suppress coverity warning */
173-
/* coverity[uninit_use_in_call] */
174-
mca_spml_ucx_waitall(dreqs, &num_reqs);
175-
free(dreqs);
176-
free(mca_spml_ucx.remote_addrs_tbl);
177130

178-
opal_common_ucx_mca_pmix_fence(mca_spml_ucx_ctx_default.ucp_worker);
131+
ret = opal_common_ucx_del_procs(del_procs, nprocs, oshmem_my_proc_id(),
132+
mca_spml_ucx.num_disconnect,
133+
mca_spml_ucx_ctx_default.ucp_worker);
134+
135+
free(del_procs);
136+
free(mca_spml_ucx.remote_addrs_tbl);
179137
free(mca_spml_ucx_ctx_default.ucp_peers);
138+
180139
mca_spml_ucx_ctx_default.ucp_peers = NULL;
181-
return OSHMEM_SUCCESS;
140+
141+
return ret;
182142
}
183143

184144
/* TODO: move func into common place, use it with rkey exchng too */

0 commit comments

Comments
 (0)