Skip to content

Commit 9c3d00b

Browse files
xinzhao3Tomislav Janjusic
authored andcommitted
ompi/oshmem/spml/ucx: use lockfree array to optimize spml_ucx_progress/delete oshmem_barrier in shmem_ctx_destroy
ompi/oshmem/spml/ucx: optimize spml ucx progress Signed-off-by: Tomislav Janjusic <tomislavj@mellanox.com>
1 parent e041400 commit 9c3d00b

File tree

4 files changed

+129
-94
lines changed

4 files changed

+129
-94
lines changed

opal/mca/common/ucx/common_ucx.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ void opal_common_ucx_mca_proc_added(void)
151151
}
152152
}
153153
#endif
154+
}
154155

155156
OPAL_DECLSPEC int opal_common_ucx_mca_pmix_fence_nb(int *fenced)
156157
{

oshmem/mca/spml/ucx/spml_ucx.c

Lines changed: 66 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,6 @@ mca_spml_ucx_t mca_spml_ucx = {
7777
.get_mkey_slow = NULL
7878
};
7979

80-
OBJ_CLASS_INSTANCE(mca_spml_ucx_ctx_list_item_t, opal_list_item_t, NULL, NULL);
81-
8280
mca_spml_ucx_ctx_t mca_spml_ucx_ctx_default = {
8381
.ucp_worker = NULL,
8482
.ucp_peers = NULL,
@@ -243,7 +241,7 @@ int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs)
243241
goto error;
244242
}
245243

246-
opal_progress_register(spml_ucx_progress);
244+
opal_progress_register(spml_ucx_default_progress);
247245

248246
mca_spml_ucx.remote_addrs_tbl = (char **)calloc(nprocs, sizeof(char *));
249247
memset(mca_spml_ucx.remote_addrs_tbl, 0, nprocs * sizeof(char *));
@@ -511,9 +509,45 @@ int mca_spml_ucx_deregister(sshmem_mkey_t *mkeys)
511509
return OSHMEM_SUCCESS;
512510
}
513511

512+
static inline void _ctx_add(mca_spml_ucx_ctx_array_t *array, mca_spml_ucx_ctx_t *ctx)
513+
{
514+
int i;
515+
516+
if (array->ctxs_count < array->ctxs_num) {
517+
array->ctxs[array->ctxs_count] = ctx;
518+
} else {
519+
array->ctxs = realloc(array->ctxs, (array->ctxs_num + MCA_SPML_UCX_CTXS_ARRAY_INC) * sizeof(mca_spml_ucx_ctx_t *));
520+
opal_atomic_wmb ();
521+
for (i = array->ctxs_num; i < array->ctxs_num + MCA_SPML_UCX_CTXS_ARRAY_INC; i++) {
522+
array->ctxs[i] = NULL;
523+
}
524+
array->ctxs[array->ctxs_num] = ctx;
525+
array->ctxs_num += MCA_SPML_UCX_CTXS_ARRAY_INC;
526+
}
527+
528+
opal_atomic_wmb ();
529+
array->ctxs_count++;
530+
}
531+
532+
static inline void _ctx_remove(mca_spml_ucx_ctx_array_t *array, mca_spml_ucx_ctx_t *ctx)
533+
{
534+
int i;
535+
536+
for (i = 0; i < array->ctxs_count; i++) {
537+
if (array->ctxs[i] == ctx) {
538+
array->ctxs[i] = array->ctxs[array->ctxs_count-1];
539+
array->ctxs[array->ctxs_count-1] = NULL;
540+
break;
541+
}
542+
}
543+
544+
array->ctxs_count--;
545+
opal_atomic_wmb ();
546+
}
547+
514548
int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx)
515549
{
516-
mca_spml_ucx_ctx_list_item_t *ctx_item;
550+
mca_spml_ucx_ctx_t *ucx_ctx;
517551
ucp_worker_params_t params;
518552
ucp_ep_params_t ep_params;
519553
size_t i, j, nprocs = oshmem_num_procs();
@@ -524,8 +558,8 @@ int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx)
524558
sshmem_mkey_t *mkey;
525559
int rc = OSHMEM_ERROR;
526560

527-
ctx_item = OBJ_NEW(mca_spml_ucx_ctx_list_item_t);
528-
ctx_item->ctx.options = options;
561+
ucx_ctx = malloc(sizeof(mca_spml_ucx_ctx_t));
562+
ucx_ctx->options = options;
529563

530564
params.field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE;
531565
if (oshmem_mpi_thread_provided == SHMEM_THREAD_SINGLE || options & SHMEM_CTX_PRIVATE || options & SHMEM_CTX_SERIALIZED) {
@@ -535,22 +569,26 @@ int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx)
535569
}
536570

537571
err = ucp_worker_create(mca_spml_ucx.ucp_context, &params,
538-
&ctx_item->ctx.ucp_worker);
572+
&ucx_ctx->ucp_worker);
539573
if (UCS_OK != err) {
540-
OBJ_RELEASE(ctx_item);
574+
free(ucx_ctx);
541575
return OSHMEM_ERROR;
542576
}
543577

544-
ctx_item->ctx.ucp_peers = (ucp_peer_t *) calloc(nprocs, sizeof(*(ctx_item->ctx.ucp_peers)));
545-
if (NULL == ctx_item->ctx.ucp_peers) {
578+
ucx_ctx->ucp_peers = (ucp_peer_t *) calloc(nprocs, sizeof(*(ucx_ctx->ucp_peers)));
579+
if (NULL == ucx_ctx->ucp_peers) {
546580
goto error;
547581
}
548582

583+
if (mca_spml_ucx.active_array.ctxs_count == 0) {
584+
opal_progress_register(spml_ucx_ctx_progress);
585+
}
586+
549587
for (i = 0; i < nprocs; i++) {
550588
ep_params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS;
551589
ep_params.address = (ucp_address_t *)(mca_spml_ucx.remote_addrs_tbl[i]);
552-
err = ucp_ep_create(ctx_item->ctx.ucp_worker, &ep_params,
553-
&ctx_item->ctx.ucp_peers[i].ucp_conn);
590+
err = ucp_ep_create(ucx_ctx->ucp_worker, &ep_params,
591+
&ucx_ctx->ucp_peers[i].ucp_conn);
554592
if (UCS_OK != err) {
555593
SPML_ERROR("ucp_ep_create(proc=%d/%d) failed: %s", i, nprocs,
556594
ucs_status_string(err));
@@ -559,68 +597,55 @@ int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx)
559597

560598
for (j = 0; j < MCA_MEMHEAP_SEG_COUNT; j++) {
561599
mkey = &memheap_map->mem_segs[j].mkeys_cache[i][0];
562-
ucx_mkey = &ctx_item->ctx.ucp_peers[i].mkeys[j].key;
563-
err = ucp_ep_rkey_unpack(ctx_item->ctx.ucp_peers[i].ucp_conn,
600+
ucx_mkey = &ucx_ctx->ucp_peers[i].mkeys[j].key;
601+
err = ucp_ep_rkey_unpack(ucx_ctx->ucp_peers[i].ucp_conn,
564602
mkey->u.data,
565603
&ucx_mkey->rkey);
566604
if (UCS_OK != err) {
567605
SPML_UCX_ERROR("failed to unpack rkey");
568606
goto error2;
569607
}
570-
mca_spml_ucx_cache_mkey(&ctx_item->ctx, mkey, j, i);
608+
mca_spml_ucx_cache_mkey(ucx_ctx, mkey, j, i);
571609
}
572610
}
573611

574612
SHMEM_MUTEX_LOCK(mca_spml_ucx.internal_mutex);
575-
576-
opal_list_append(&(mca_spml_ucx.ctx_list), &ctx_item->super);
577-
613+
_ctx_add(&mca_spml_ucx.active_array, ucx_ctx);
578614
SHMEM_MUTEX_UNLOCK(mca_spml_ucx.internal_mutex);
579615

580-
(*ctx) = (shmem_ctx_t)(&ctx_item->ctx);
581-
616+
(*ctx) = (shmem_ctx_t)ucx_ctx;
582617
return OSHMEM_SUCCESS;
583618

584619
error2:
585620
for (i = 0; i < nprocs; i++) {
586-
if (ctx_item->ctx.ucp_peers[i].ucp_conn) {
587-
ucp_ep_destroy(ctx_item->ctx.ucp_peers[i].ucp_conn);
621+
if (ucx_ctx->ucp_peers[i].ucp_conn) {
622+
ucp_ep_destroy(ucx_ctx->ucp_peers[i].ucp_conn);
588623
}
589624
}
590625

591-
if (ctx_item->ctx.ucp_peers)
592-
free(ctx_item->ctx.ucp_peers);
626+
if (ucx_ctx->ucp_peers)
627+
free(ucx_ctx->ucp_peers);
593628

594629
error:
595-
ucp_worker_destroy(ctx_item->ctx.ucp_worker);
596-
OBJ_RELEASE(ctx_item);
630+
ucp_worker_destroy(ucx_ctx->ucp_worker);
631+
free(ucx_ctx);
597632
rc = OSHMEM_ERR_OUT_OF_RESOURCE;
598633
SPML_ERROR("ctx create FAILED rc=%d", rc);
599634
return rc;
600635
}
601636

602637
void mca_spml_ucx_ctx_destroy(shmem_ctx_t ctx)
603638
{
604-
mca_spml_ucx_ctx_list_item_t *ctx_item, *next;
605-
size_t i, j, nprocs = oshmem_num_procs();
606-
607639
MCA_SPML_CALL(quiet(ctx));
608640

609-
oshmem_shmem_barrier();
610-
611641
SHMEM_MUTEX_LOCK(mca_spml_ucx.internal_mutex);
642+
_ctx_remove(&mca_spml_ucx.active_array, (mca_spml_ucx_ctx_t *)ctx);
643+
_ctx_add(&mca_spml_ucx.idle_array, (mca_spml_ucx_ctx_t *)ctx);
644+
SHMEM_MUTEX_UNLOCK(mca_spml_ucx.internal_mutex);
612645

613-
/* delete context object from list */
614-
OPAL_LIST_FOREACH_SAFE(ctx_item, next, &(mca_spml_ucx.ctx_list),
615-
mca_spml_ucx_ctx_list_item_t) {
616-
if ((shmem_ctx_t)(&ctx_item->ctx) == ctx) {
617-
opal_list_remove_item(&(mca_spml_ucx.ctx_list), &ctx_item->super);
618-
opal_list_append(&(mca_spml_ucx.idle_ctx_list), &ctx_item->super);
619-
break;
620-
}
646+
if (!mca_spml_ucx.active_array.ctxs_count) {
647+
opal_progress_unregister(spml_ucx_ctx_progress);
621648
}
622-
623-
SHMEM_MUTEX_UNLOCK(mca_spml_ucx.internal_mutex);
624649
}
625650

626651
int mca_spml_ucx_get(shmem_ctx_t ctx, void *src_addr, size_t size, void *dst_addr, int src)

oshmem/mca/spml/ucx/spml_ucx.h

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -74,14 +74,14 @@ typedef struct mca_spml_ucx_ctx mca_spml_ucx_ctx_t;
7474

7575
extern mca_spml_ucx_ctx_t mca_spml_ucx_ctx_default;
7676

77-
struct mca_spml_ucx_ctx_list_item {
78-
opal_list_item_t super;
79-
mca_spml_ucx_ctx_t ctx;
80-
};
81-
typedef struct mca_spml_ucx_ctx_list_item mca_spml_ucx_ctx_list_item_t;
82-
8377
typedef spml_ucx_mkey_t * (*mca_spml_ucx_get_mkey_slow_fn_t)(shmem_ctx_t ctx, int pe, void *va, void **rva);
8478

79+
typedef struct mca_spml_ucx_ctx_array {
80+
int ctxs_count;
81+
int ctxs_num;
82+
mca_spml_ucx_ctx_t **ctxs;
83+
} mca_spml_ucx_ctx_array_t;
84+
8585
struct mca_spml_ucx {
8686
mca_spml_base_module_t super;
8787
ucp_context_h ucp_context;
@@ -90,8 +90,8 @@ struct mca_spml_ucx {
9090
bool enabled;
9191
mca_spml_ucx_get_mkey_slow_fn_t get_mkey_slow;
9292
char **remote_addrs_tbl;
93-
opal_list_t ctx_list;
94-
opal_list_t idle_ctx_list;
93+
mca_spml_ucx_ctx_array_t active_array;
94+
mca_spml_ucx_ctx_array_t idle_array;
9595
int priority; /* component priority */
9696
shmem_internal_mutex_t internal_mutex;
9797
};
@@ -151,7 +151,8 @@ extern int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs);
151151
extern int mca_spml_ucx_del_procs(ompi_proc_t** procs, size_t nprocs);
152152
extern int mca_spml_ucx_fence(shmem_ctx_t ctx);
153153
extern int mca_spml_ucx_quiet(shmem_ctx_t ctx);
154-
extern int spml_ucx_progress(void);
154+
extern int spml_ucx_default_progress(void);
155+
extern int spml_ucx_ctx_progress(void);
155156

156157
static void mca_spml_ucx_cache_mkey(mca_spml_ucx_ctx_t *ucx_ctx, sshmem_mkey_t *mkey, uint32_t segno, int dst_pe)
157158
{
@@ -192,6 +193,9 @@ static inline int ucx_status_to_oshmem_nb(ucs_status_t status)
192193
#endif
193194
}
194195

196+
#define MCA_SPML_UCX_CTXS_ARRAY_SIZE 64
197+
#define MCA_SPML_UCX_CTXS_ARRAY_INC 64
198+
195199
END_C_DECLS
196200

197201
#endif

0 commit comments

Comments
 (0)