Skip to content

Commit 34153d3

Browse files
committed
init/finalize functions
1 parent 4ad6e0b commit 34153d3

File tree

1 file changed

+148
-0
lines changed

1 file changed

+148
-0
lines changed

opal/mca/common/ucx/common_ucx.c

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,3 +286,151 @@ OPAL_DECLSPEC int opal_common_ucx_del_procs(opal_common_ucx_del_proc_t *procs, s
286286
return OPAL_SUCCESS;
287287
}
288288

289+
/***********************************************************************/
290+
291+
static inline void _cleanup_tlocal(void *arg)
292+
{
293+
// 1. Cleanup all rkeys in the window table
294+
// 2. Return all workers into the idle pool
295+
}
296+
297+
static ucp_worker_h _create_ctx_worker(opal_common_ucx_wpool_t *wpool)
298+
{
299+
ucp_worker_params_t worker_params;
300+
ucp_worker_h worker;
301+
ucs_status_t status;
302+
303+
memset(&worker_params, 0, sizeof(worker_params));
304+
worker_params.field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE;
305+
worker_params.thread_mode = UCS_THREAD_MODE_SINGLE;
306+
status = ucp_worker_create(wpool->ucp_ctx, &worker_params, &worker);
307+
if (UCS_OK != status) {
308+
return NULL;
309+
}
310+
311+
return worker;
312+
}
313+
314+
static void _wpool_add_to_idle(opal_common_ucx_wpool_t *wpool,
315+
_worker_engine_t *wkr)
316+
{
317+
_idle_list_item_t *item;
318+
319+
if(wkr->comm_size != 0) {
320+
int i;
321+
for (i = 0; i < wkr->comm_size; i++) {
322+
ucp_ep_destroy(wkr->endpoints[i]);
323+
}
324+
free(wkr->endpoints);
325+
wkr->endpoints = NULL;
326+
wkr->comm_size = 0;
327+
}
328+
329+
item = OBJ_NEW(_idle_list_item_t);
330+
item->ptr = wkr;
331+
332+
opal_mutex_lock(&wpool->mutex);
333+
opal_list_append(&wpool->idle_workers, &item->super);
334+
opal_mutex_unlock(&wpool->mutex);
335+
}
336+
337+
static _worker_engine_t* _wpool_remove_from_idle(opal_common_ucx_wpool_t *wpool)
338+
{
339+
_worker_engine_t *wkr = NULL;
340+
_idle_list_item_t *item = NULL;
341+
342+
opal_mutex_lock(&wpool->mutex);
343+
if (!opal_list_is_empty(&wpool->idle_workers)) {
344+
item = (_idle_list_item_t *)opal_list_get_first(&wpool->idle_workers);
345+
opal_list_remove_item(&wpool->idle_workers, &item->super);
346+
}
347+
opal_mutex_unlock(&wpool->mutex);
348+
349+
if (item != NULL) {
350+
wkr = item->ptr;
351+
OBJ_RELEASE(item);
352+
}
353+
354+
return wkr;
355+
}
356+
357+
358+
OPAL_DECLSPEC int opal_common_ucx_wpool_init(opal_common_ucx_wpool_t *wpool,
359+
int proc_world_size,
360+
ucp_request_init_callback_t req_init_ptr,
361+
size_t req_size)
362+
{
363+
ucp_config_t *config = NULL;
364+
ucp_params_t context_params;
365+
_worker_engine_t *wkr;
366+
ucs_status_t status;
367+
int ret = OPAL_SUCCESS;
368+
369+
wpool->cur_ctxid = wpool->cur_memid = 0;
370+
OBJ_CONSTRUCT(&wpool->mutex, opal_mutex_t);
371+
372+
status = ucp_config_read("MPI", NULL, &config);
373+
if (UCS_OK != status) {
374+
MCA_COMMON_UCX_VERBOSE(1, "ucp_config_read failed: %d", status);
375+
return OPAL_ERROR;
376+
}
377+
378+
/* initialize UCP context */
379+
memset(&context_params, 0, sizeof(context_params));
380+
context_params.field_mask = UCP_PARAM_FIELD_FEATURES |
381+
UCP_PARAM_FIELD_MT_WORKERS_SHARED |
382+
UCP_PARAM_FIELD_ESTIMATED_NUM_EPS |
383+
UCP_PARAM_FIELD_REQUEST_INIT |
384+
UCP_PARAM_FIELD_REQUEST_SIZE;
385+
context_params.features = UCP_FEATURE_RMA | UCP_FEATURE_AMO32 | UCP_FEATURE_AMO64;
386+
context_params.mt_workers_shared = 1;
387+
context_params.estimated_num_eps = proc_world_size;
388+
context_params.request_init = req_init_ptr;
389+
context_params.request_size = req_size;
390+
391+
status = ucp_init(&context_params, config, &wpool->ucp_ctx);
392+
ucp_config_release(config);
393+
if (UCS_OK != status) {
394+
MCA_COMMON_UCX_VERBOSE(1, "ucp_init failed: %d", status);
395+
ret = OPAL_ERROR;
396+
goto err_ucp_init;
397+
}
398+
399+
/* create recv worker and add to idle pool */
400+
OBJ_CONSTRUCT(&wpool->idle_workers, opal_list_t);
401+
wpool->recv_worker = _create_ctx_worker(wpool);
402+
if (wpool->recv_worker == NULL) {
403+
MCA_COMMON_UCX_VERBOSE(1, "_create_ctx_worker failed");
404+
ret = OPAL_ERROR;
405+
goto err_worker_create;
406+
}
407+
408+
wkr = OBJ_NEW(_worker_engine_t);
409+
OBJ_CONSTRUCT(&wkr->mutex, opal_mutex_t);
410+
wkr->worker = wpool->recv_worker;
411+
wkr->endpoints = NULL;
412+
wkr->comm_size = 0;
413+
414+
_wpool_add_to_idle(wpool, wkr);
415+
416+
status = ucp_worker_get_address(wpool->recv_worker,
417+
&wpool->recv_waddr, &wpool->recv_waddr_len);
418+
if (status != UCS_OK) {
419+
MCA_COMMON_UCX_VERBOSE(1, "ucp_worker_get_address failed: %d", status);
420+
ret = OPAL_ERROR;
421+
goto err_get_addr;
422+
}
423+
424+
pthread_key_create(&_tlocal_key, _cleanup_tlocal);
425+
426+
return ret;
427+
428+
err_get_addr:
429+
if (NULL != wpool->recv_worker) {
430+
ucp_worker_destroy(wpool->recv_worker);
431+
}
432+
err_worker_create:
433+
ucp_cleanup(wpool->ucp_ctx);
434+
err_ucp_init:
435+
return ret;
436+
}

0 commit comments

Comments
 (0)