Skip to content

Commit db646a7

Browse files
authored
Merge pull request #11110 from MamziB/mamzi/osc-finalize-2
2 parents e93fe0e + 19afd18 commit db646a7

File tree

3 files changed

+51
-0
lines changed

3 files changed

+51
-0
lines changed

ompi/mca/osc/ucx/osc_ucx_component.c

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,12 +319,14 @@ static int component_init(bool enable_progress_threads, bool enable_mpi_threads)
319319
}
320320

321321
static int component_finalize(void) {
322+
322323
if (!opal_common_ucx_thread_enabled) {
323324
int i;
324325
for (i = 0; i < mca_osc_ucx_component.comm_world_size; i++) {
325326
ucp_ep_h ep = mca_osc_ucx_component.endpoints[i];
326327
if (ep != NULL) {
327328
ucp_ep_destroy(ep);
329+
OPAL_COMMON_UCX_DEBUG_ATOMIC_ADD(opal_common_ucx_ep_counts, -1);
328330
}
329331
}
330332
free(mca_osc_ucx_component.endpoints);
@@ -334,6 +336,9 @@ static int component_finalize(void) {
334336
opal_common_ucx_wpool_finalize(mca_osc_ucx_component.wpool);
335337
}
336338
opal_common_ucx_wpool_free(mca_osc_ucx_component.wpool);
339+
340+
assert(opal_common_ucx_ep_counts == 0);
341+
assert(opal_common_ucx_unpacked_rkey_counts == 0);
337342
return OMPI_SUCCESS;
338343
}
339344

@@ -790,6 +795,11 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
790795
goto error;
791796
}
792797

798+
if (my_mem_addr_size != 0) {
799+
/* rkey object is already distributed among comm processes */
800+
ucp_rkey_buffer_release(my_mem_addr);
801+
}
802+
793803
state_base = (void *)&(module->state);
794804
ret = opal_common_ucx_wpmem_create(module->ctx, &state_base,
795805
sizeof(ompi_osc_ucx_state_t),
@@ -803,6 +813,11 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
803813
goto error;
804814
}
805815

816+
if (my_mem_addr_size != 0) {
817+
/* rkey object is already distributed among comm processes */
818+
ucp_rkey_buffer_release(my_mem_addr);
819+
}
820+
806821
/* exchange window addrs */
807822
if (flavor == MPI_WIN_FLAVOR_ALLOCATE || flavor == MPI_WIN_FLAVOR_CREATE ||
808823
flavor == MPI_WIN_FLAVOR_SHARED) {

opal/mca/common/ucx/common_ucx_wpool.c

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ __thread int initialized = 0;
3232
#endif
3333

3434
bool opal_common_ucx_thread_enabled = false;
35+
opal_atomic_int64_t opal_common_ucx_ep_counts = 0;
36+
opal_atomic_int64_t opal_common_ucx_unpacked_rkey_counts = 0;
3537

3638
static _ctx_record_t *_tlocal_add_ctx_rec(opal_common_ucx_ctx_t *ctx);
3739
static inline _ctx_record_t *_tlocal_get_ctx_rec(opal_tsd_tracked_key_t tls_key);
@@ -102,6 +104,7 @@ static void _winfo_destructor(opal_common_ucx_winfo_t *winfo)
102104
for (i = 0; i < winfo->comm_size; i++) {
103105
if (NULL != winfo->endpoints[i]) {
104106
ucp_ep_destroy(winfo->endpoints[i]);
107+
OPAL_COMMON_UCX_DEBUG_ATOMIC_ADD(opal_common_ucx_ep_counts, -1);
105108
}
106109
assert(winfo->inflight_ops[i] == 0);
107110
}
@@ -326,9 +329,26 @@ static opal_common_ucx_winfo_t *_wpool_get_winfo(opal_common_ucx_wpool_t *wpool,
326329
return winfo;
327330
}
328331

332+
/* Remove the winfo from active workers and add it to idle workers */
329333
static void _wpool_put_winfo(opal_common_ucx_wpool_t *wpool, opal_common_ucx_winfo_t *winfo)
330334
{
331335
opal_mutex_lock(&wpool->mutex);
336+
if (winfo->comm_size != 0) {
337+
size_t i;
338+
if (opal_common_ucx_thread_enabled) {
339+
for (i = 0; i < winfo->comm_size; i++) {
340+
if (NULL != winfo->endpoints[i]) {
341+
ucp_ep_destroy(winfo->endpoints[i]);
342+
OPAL_COMMON_UCX_DEBUG_ATOMIC_ADD(opal_common_ucx_ep_counts, -1);
343+
}
344+
assert(winfo->inflight_ops[i] == 0);
345+
}
346+
}
347+
free(winfo->endpoints);
348+
free(winfo->inflight_ops);
349+
}
350+
winfo->endpoints = NULL;
351+
winfo->comm_size = 0;
332352
opal_list_remove_item(&wpool->active_workers, &winfo->super);
333353
opal_list_prepend(&wpool->idle_workers, &winfo->super);
334354
opal_mutex_unlock(&wpool->mutex);
@@ -632,6 +652,7 @@ static int _tlocal_ctx_connect(_ctx_record_t *ctx_rec, int target)
632652
memset(&ep_params, 0, sizeof(ucp_ep_params_t));
633653
ep_params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS;
634654

655+
assert(winfo->endpoints[target] == NULL);
635656
opal_mutex_lock(&winfo->mutex);
636657
displ = gctx->recv_worker_displs[target];
637658
ep_params.address = (ucp_address_t *) &(gctx->recv_worker_addrs[displ]);
@@ -641,7 +662,9 @@ static int _tlocal_ctx_connect(_ctx_record_t *ctx_rec, int target)
641662
opal_mutex_unlock(&winfo->mutex);
642663
return OPAL_ERROR;
643664
}
665+
OPAL_COMMON_UCX_DEBUG_ATOMIC_ADD(opal_common_ucx_ep_counts, 1);
644666
opal_mutex_unlock(&winfo->mutex);
667+
assert(winfo->endpoints[target] != NULL);
645668
return OPAL_SUCCESS;
646669
}
647670

@@ -662,6 +685,7 @@ static void _tlocal_mem_rec_cleanup(_mem_record_t *mem_rec)
662685
for (i = 0; i < mem_rec->gmem->ctx->comm_size; i++) {
663686
if (mem_rec->rkeys[i]) {
664687
ucp_rkey_destroy(mem_rec->rkeys[i]);
688+
OPAL_COMMON_UCX_DEBUG_ATOMIC_ADD(opal_common_ucx_unpacked_rkey_counts, -1);
665689
}
666690
}
667691
opal_mutex_unlock(&mem_rec->winfo->mutex);
@@ -701,6 +725,7 @@ static int _tlocal_mem_create_rkey(_mem_record_t *mem_rec, ucp_ep_h ep, int targ
701725

702726
opal_mutex_lock(&mem_rec->winfo->mutex);
703727
status = ucp_ep_rkey_unpack(ep, &gmem->mem_addrs[displ], &mem_rec->rkeys[target]);
728+
OPAL_COMMON_UCX_DEBUG_ATOMIC_ADD(opal_common_ucx_unpacked_rkey_counts, 1);
704729
opal_mutex_unlock(&mem_rec->winfo->mutex);
705730
if (status != UCS_OK) {
706731
MCA_COMMON_UCX_VERBOSE(1, "ucp_ep_rkey_unpack failed: %d", status);

opal/mca/common/ucx/common_ucx_wpool.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,17 @@ typedef struct {
5959
} opal_common_ucx_wpool_t;
6060

6161
extern bool opal_common_ucx_thread_enabled;
62+
extern opal_atomic_int64_t opal_common_ucx_ep_counts;
63+
extern opal_atomic_int64_t opal_common_ucx_unpacked_rkey_counts;
64+
65+
#if OPAL_ENABLE_DEBUG
66+
#define OPAL_COMMON_UCX_DEBUG_ATOMIC_ADD(_var, _val) \
67+
do { \
68+
opal_atomic_add_fetch_64(&(_var), (_val)); \
69+
} while(0);
70+
#else
71+
#define OPAL_COMMON_UCX_DEBUG_ATOMIC_ADD(&(_var), (_val));
72+
#endif
6273

6374
/* Worker Pool Context (wpctx) is an object that is comprised of a set of UCP
6475
* workers that are considered as one logical communication entity.

0 commit comments

Comments
 (0)