Skip to content

Commit b128682

Browse files
authored
Merge pull request #4765 from xinzhao3/topic/osc-ucx-mem-hook
OMPI/OSC/UCX: move memory hooks init in osc to win creation.
2 parents 9d3a799 + 74ef51a commit b128682

File tree

2 files changed

+60
-57
lines changed

2 files changed

+60
-57
lines changed

ompi/mca/osc/ucx/osc_ucx.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ typedef struct ompi_osc_ucx_component {
3636
ucp_worker_h ucp_worker;
3737
bool enable_mpi_threads;
3838
opal_free_list_t requests; /* request free list for the r* communication variants */
39+
bool env_initialized; /* UCX environment is initialized or not */
3940
int num_incomplete_req_ops;
4041
unsigned int priority;
4142
} ompi_osc_ucx_component_t;

ompi/mca/osc/ucx/osc_ucx_component.c

Lines changed: 59 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -113,64 +113,16 @@ static int progress_callback(void) {
113113
}
114114

115115
static int component_init(bool enable_progress_threads, bool enable_mpi_threads) {
116-
ucp_config_t *config = NULL;
117-
ucp_params_t context_params;
118-
bool requests_created = false;
119116
int ret = OMPI_SUCCESS;
120-
ucs_status_t status;
121117

122118
mca_osc_ucx_component.ucp_context = NULL;
123119
mca_osc_ucx_component.ucp_worker = NULL;
124120
mca_osc_ucx_component.enable_mpi_threads = enable_mpi_threads;
125-
126-
status = ucp_config_read("MPI", NULL, &config);
127-
if (UCS_OK != status) {
128-
OSC_UCX_VERBOSE(1, "ucp_config_read failed: %d", status);
129-
return OMPI_ERROR;
130-
}
131-
132-
OBJ_CONSTRUCT(&mca_osc_ucx_component.requests, opal_free_list_t);
133-
requests_created = true;
134-
ret = opal_free_list_init (&mca_osc_ucx_component.requests,
135-
sizeof(ompi_osc_ucx_request_t),
136-
opal_cache_line_size,
137-
OBJ_CLASS(ompi_osc_ucx_request_t),
138-
0, 0, 8, 0, 8, NULL, 0, NULL, NULL, NULL);
139-
if (OMPI_SUCCESS != ret) {
140-
OSC_UCX_VERBOSE(1, "opal_free_list_init failed: %d", ret);
141-
goto error;
142-
}
143-
121+
mca_osc_ucx_component.env_initialized = false;
144122
mca_osc_ucx_component.num_incomplete_req_ops = 0;
145123

146-
/* initialize UCP context */
147-
148-
memset(&context_params, 0, sizeof(context_params));
149-
context_params.field_mask = UCP_PARAM_FIELD_FEATURES |
150-
UCP_PARAM_FIELD_MT_WORKERS_SHARED |
151-
UCP_PARAM_FIELD_ESTIMATED_NUM_EPS |
152-
UCP_PARAM_FIELD_REQUEST_INIT |
153-
UCP_PARAM_FIELD_REQUEST_SIZE;
154-
context_params.features = UCP_FEATURE_RMA | UCP_FEATURE_AMO32 | UCP_FEATURE_AMO64;
155-
context_params.mt_workers_shared = 0;
156-
context_params.estimated_num_eps = ompi_proc_world_size();
157-
context_params.request_init = internal_req_init;
158-
context_params.request_size = sizeof(ompi_osc_ucx_internal_request_t);
159-
160-
status = ucp_init(&context_params, config, &mca_osc_ucx_component.ucp_context);
161-
ucp_config_release(config);
162-
if (UCS_OK != status) {
163-
OSC_UCX_VERBOSE(1, "ucp_init failed: %d", status);
164-
ret = OMPI_ERROR;
165-
goto error;
166-
}
167-
168124
opal_common_ucx_mca_register();
169125
return ret;
170-
error:
171-
if (requests_created) OBJ_DESTRUCT(&mca_osc_ucx_component.requests);
172-
if (mca_osc_ucx_component.ucp_context) ucp_cleanup(mca_osc_ucx_component.ucp_context);
173-
return ret;
174126
}
175127

176128
static int component_finalize(void) {
@@ -187,9 +139,12 @@ static int component_finalize(void) {
187139
}
188140

189141
assert(mca_osc_ucx_component.num_incomplete_req_ops == 0);
190-
OBJ_DESTRUCT(&mca_osc_ucx_component.requests);
191-
opal_progress_unregister(progress_callback);
192-
ucp_cleanup(mca_osc_ucx_component.ucp_context);
142+
if (mca_osc_ucx_component.env_initialized == true) {
143+
OBJ_DESTRUCT(&mca_osc_ucx_component.requests);
144+
opal_progress_unregister(progress_callback);
145+
ucp_cleanup(mca_osc_ucx_component.ucp_context);
146+
mca_osc_ucx_component.env_initialized = false;
147+
}
193148
opal_common_ucx_mca_deregister();
194149
return OMPI_SUCCESS;
195150
}
@@ -296,7 +251,7 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
296251
ucs_status_t status;
297252
int i, comm_size = ompi_comm_size(comm);
298253
int is_eps_ready;
299-
bool progress_registered = false, eps_created = false, worker_created = false;
254+
bool progress_registered = false, eps_created = false, env_initialized = false;
300255
ucp_address_t *my_addr = NULL;
301256
size_t my_addr_len;
302257
char *recv_buf = NULL;
@@ -315,11 +270,52 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
315270
return OMPI_ERR_NOT_SUPPORTED;
316271
}
317272

318-
/* if UCP worker has never been initialized before, init it first */
319-
if (mca_osc_ucx_component.ucp_worker == NULL) {
273+
if (mca_osc_ucx_component.env_initialized == false) {
274+
ucp_config_t *config = NULL;
275+
ucp_params_t context_params;
320276
ucp_worker_params_t worker_params;
321277
ucp_worker_attr_t worker_attr;
322278

279+
status = ucp_config_read("MPI", NULL, &config);
280+
if (UCS_OK != status) {
281+
OSC_UCX_VERBOSE(1, "ucp_config_read failed: %d", status);
282+
return OMPI_ERROR;
283+
}
284+
285+
OBJ_CONSTRUCT(&mca_osc_ucx_component.requests, opal_free_list_t);
286+
ret = opal_free_list_init (&mca_osc_ucx_component.requests,
287+
sizeof(ompi_osc_ucx_request_t),
288+
opal_cache_line_size,
289+
OBJ_CLASS(ompi_osc_ucx_request_t),
290+
0, 0, 8, 0, 8, NULL, 0, NULL, NULL, NULL);
291+
if (OMPI_SUCCESS != ret) {
292+
OSC_UCX_VERBOSE(1, "opal_free_list_init failed: %d", ret);
293+
goto error;
294+
}
295+
296+
/* initialize UCP context */
297+
298+
memset(&context_params, 0, sizeof(context_params));
299+
context_params.field_mask = UCP_PARAM_FIELD_FEATURES |
300+
UCP_PARAM_FIELD_MT_WORKERS_SHARED |
301+
UCP_PARAM_FIELD_ESTIMATED_NUM_EPS |
302+
UCP_PARAM_FIELD_REQUEST_INIT |
303+
UCP_PARAM_FIELD_REQUEST_SIZE;
304+
context_params.features = UCP_FEATURE_RMA | UCP_FEATURE_AMO32 | UCP_FEATURE_AMO64;
305+
context_params.mt_workers_shared = 0;
306+
context_params.estimated_num_eps = ompi_proc_world_size();
307+
context_params.request_init = internal_req_init;
308+
context_params.request_size = sizeof(ompi_osc_ucx_internal_request_t);
309+
310+
status = ucp_init(&context_params, config, &mca_osc_ucx_component.ucp_context);
311+
ucp_config_release(config);
312+
if (UCS_OK != status) {
313+
OSC_UCX_VERBOSE(1, "ucp_init failed: %d", status);
314+
ret = OMPI_ERROR;
315+
goto error;
316+
}
317+
318+
assert(mca_osc_ucx_component.ucp_worker == NULL);
323319
memset(&worker_params, 0, sizeof(worker_params));
324320
worker_params.field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE;
325321
worker_params.thread_mode = (mca_osc_ucx_component.enable_mpi_threads == true)
@@ -355,7 +351,8 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
355351
goto error_nomem;
356352
}
357353

358-
worker_created = true;
354+
mca_osc_ucx_component.env_initialized = true;
355+
env_initialized = true;
359356
}
360357

361358
/* create module structure */
@@ -650,7 +647,12 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
650647
if (module) free(module);
651648

652649
error_nomem:
653-
if (worker_created) ucp_worker_destroy(mca_osc_ucx_component.ucp_worker);
650+
if (env_initialized == true) {
651+
OBJ_DESTRUCT(&mca_osc_ucx_component.requests);
652+
ucp_worker_destroy(mca_osc_ucx_component.ucp_worker);
653+
ucp_cleanup(mca_osc_ucx_component.ucp_context);
654+
mca_osc_ucx_component.env_initialized = false;
655+
}
654656
return ret;
655657
}
656658

0 commit comments

Comments
 (0)