Skip to content

Commit 114a563

Browse files
authored
Merge pull request #10443 from janjust/osc-ucx-priority-fix
osc/ucx: fix osc ucx component priority selection
2 parents 2befd74 + c2e6cd9 commit 114a563

File tree

3 files changed

+69
-40
lines changed

3 files changed

+69
-40
lines changed

ompi/mca/osc/ucx/osc_ucx_component.c

Lines changed: 62 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -207,10 +207,71 @@ static int progress_callback(void) {
207207
return 0;
208208
}
209209

210+
static int ucp_context_init(bool enable_mt, int proc_world_size) {
211+
int ret = OMPI_SUCCESS;
212+
ucs_status_t status;
213+
ucp_config_t *config = NULL;
214+
ucp_params_t context_params;
215+
216+
status = ucp_config_read("MPI", NULL, &config);
217+
if (UCS_OK != status) {
218+
OSC_UCX_VERBOSE(1, "ucp_config_read failed: %d", status);
219+
return OMPI_ERROR;
220+
}
221+
222+
/* initialize UCP context */
223+
memset(&context_params, 0, sizeof(context_params));
224+
context_params.field_mask = UCP_PARAM_FIELD_FEATURES | UCP_PARAM_FIELD_MT_WORKERS_SHARED
225+
| UCP_PARAM_FIELD_ESTIMATED_NUM_EPS | UCP_PARAM_FIELD_REQUEST_INIT
226+
| UCP_PARAM_FIELD_REQUEST_SIZE;
227+
context_params.features = UCP_FEATURE_RMA | UCP_FEATURE_AMO32 | UCP_FEATURE_AMO64;
228+
context_params.mt_workers_shared = (enable_mt ? 1 : 0);
229+
context_params.estimated_num_eps = proc_world_size;
230+
context_params.request_init = opal_common_ucx_req_init;
231+
context_params.request_size = sizeof(opal_common_ucx_request_t);
232+
233+
#if HAVE_DECL_UCP_PARAM_FIELD_ESTIMATED_NUM_PPN
234+
context_params.estimated_num_ppn = opal_process_info.num_local_peers + 1;
235+
context_params.field_mask |= UCP_PARAM_FIELD_ESTIMATED_NUM_PPN;
236+
#endif
237+
238+
status = ucp_init(&context_params, config, &mca_osc_ucx_component.wpool->ucp_ctx);
239+
if (UCS_OK != status) {
240+
OSC_UCX_VERBOSE(1, "ucp_init failed: %d", status);
241+
ret = OMPI_ERROR;
242+
}
243+
ucp_config_release(config);
244+
245+
return ret;
246+
}
247+
210248
static int component_init(bool enable_progress_threads, bool enable_mpi_threads) {
249+
opal_common_ucx_support_level_t support_level;
250+
int ret = OMPI_SUCCESS;
251+
211252
mca_osc_ucx_component.enable_mpi_threads = enable_mpi_threads;
212253
mca_osc_ucx_component.wpool = opal_common_ucx_wpool_allocate();
213254
opal_common_ucx_mca_register();
255+
256+
ret = ucp_context_init(enable_mpi_threads, ompi_proc_world_size());
257+
if (OMPI_ERROR == ret) {
258+
return OMPI_ERR_NOT_AVAILABLE;
259+
}
260+
261+
support_level = opal_common_ucx_support_level(mca_osc_ucx_component.wpool->ucp_ctx);
262+
if (OPAL_COMMON_UCX_SUPPORT_NONE == support_level) {
263+
ucp_cleanup(mca_osc_ucx_component.wpool->ucp_ctx);
264+
mca_osc_ucx_component.wpool->ucp_ctx = NULL;
265+
return OMPI_ERR_NOT_AVAILABLE;
266+
}
267+
268+
/*
269+
* Retain priority if we have supported devices and transports.
270+
* Lower priority if we have supported transports, but not supported devices.
271+
*/
272+
mca_osc_ucx_component.priority = (support_level == OPAL_COMMON_UCX_SUPPORT_DEVICE) ?
273+
mca_osc_ucx_component.priority : 19;
274+
OSC_UCX_VERBOSE(2, "returning priority %d", mca_osc_ucx_component.priority);
214275
return OMPI_SUCCESS;
215276
}
216277

@@ -395,9 +456,7 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
395456
goto select_unlock;
396457
}
397458

398-
ret = opal_common_ucx_wpool_init(mca_osc_ucx_component.wpool,
399-
ompi_proc_world_size(),
400-
mca_osc_ucx_component.enable_mpi_threads);
459+
ret = opal_common_ucx_wpool_init(mca_osc_ucx_component.wpool);
401460
if (OMPI_SUCCESS != ret) {
402461
OSC_UCX_VERBOSE(1, "opal_common_ucx_wpool_init failed: %d", ret);
403462
goto select_unlock;

opal/mca/common/ucx/common_ucx_wpool.c

Lines changed: 6 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -127,11 +127,8 @@ OPAL_DECLSPEC void opal_common_ucx_wpool_free(opal_common_ucx_wpool_t *wpool)
127127
static int _wpool_list_put(opal_common_ucx_wpool_t *wpool, opal_list_t *list,
128128
opal_common_ucx_winfo_t *winfo);
129129

130-
OPAL_DECLSPEC int opal_common_ucx_wpool_init(opal_common_ucx_wpool_t *wpool, int proc_world_size,
131-
bool enable_mt)
130+
OPAL_DECLSPEC int opal_common_ucx_wpool_init(opal_common_ucx_wpool_t *wpool)
132131
{
133-
ucp_config_t *config = NULL;
134-
ucp_params_t context_params;
135132
opal_common_ucx_winfo_t *winfo;
136133
ucs_status_t status;
137134
int rc = OPAL_SUCCESS;
@@ -144,36 +141,6 @@ OPAL_DECLSPEC int opal_common_ucx_wpool_init(opal_common_ucx_wpool_t *wpool, int
144141

145142
OBJ_CONSTRUCT(&wpool->mutex, opal_mutex_t);
146143

147-
status = ucp_config_read("MPI", NULL, &config);
148-
if (UCS_OK != status) {
149-
MCA_COMMON_UCX_VERBOSE(1, "ucp_config_read failed: %d", status);
150-
return OPAL_ERROR;
151-
}
152-
153-
/* initialize UCP context */
154-
memset(&context_params, 0, sizeof(context_params));
155-
context_params.field_mask = UCP_PARAM_FIELD_FEATURES | UCP_PARAM_FIELD_MT_WORKERS_SHARED
156-
| UCP_PARAM_FIELD_ESTIMATED_NUM_EPS | UCP_PARAM_FIELD_REQUEST_INIT
157-
| UCP_PARAM_FIELD_REQUEST_SIZE;
158-
context_params.features = UCP_FEATURE_RMA | UCP_FEATURE_AMO32 | UCP_FEATURE_AMO64;
159-
context_params.mt_workers_shared = (enable_mt ? 1 : 0);
160-
context_params.estimated_num_eps = proc_world_size;
161-
context_params.request_init = opal_common_ucx_req_init;
162-
context_params.request_size = sizeof(opal_common_ucx_request_t);
163-
164-
#if HAVE_DECL_UCP_PARAM_FIELD_ESTIMATED_NUM_PPN
165-
context_params.estimated_num_ppn = opal_process_info.num_local_peers + 1;
166-
context_params.field_mask |= UCP_PARAM_FIELD_ESTIMATED_NUM_PPN;
167-
#endif
168-
169-
status = ucp_init(&context_params, config, &wpool->ucp_ctx);
170-
ucp_config_release(config);
171-
if (UCS_OK != status) {
172-
MCA_COMMON_UCX_VERBOSE(1, "ucp_init failed: %d", status);
173-
rc = OPAL_ERROR;
174-
goto err_ucp_init;
175-
}
176-
177144
/* create recv worker and add to idle pool */
178145
OBJ_CONSTRUCT(&wpool->idle_workers, opal_list_t);
179146
OBJ_CONSTRUCT(&wpool->active_workers, opal_list_t);
@@ -253,7 +220,11 @@ void opal_common_ucx_wpool_finalize(opal_common_ucx_wpool_t *wpool)
253220
wpool->dflt_winfo = NULL;
254221

255222
OBJ_DESTRUCT(&wpool->mutex);
256-
ucp_cleanup(wpool->ucp_ctx);
223+
if (NULL != wpool->ucp_ctx) {
224+
ucp_cleanup(wpool->ucp_ctx);
225+
wpool->ucp_ctx = NULL;
226+
}
227+
257228
return;
258229
}
259230

opal/mca/common/ucx/common_ucx_wpool.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,8 +182,7 @@ typedef int (*opal_common_ucx_exchange_func_t)(void *my_info, size_t my_info_len
182182
/* Manage Worker Pool (wpool) */
183183
OPAL_DECLSPEC opal_common_ucx_wpool_t *opal_common_ucx_wpool_allocate(void);
184184
OPAL_DECLSPEC void opal_common_ucx_wpool_free(opal_common_ucx_wpool_t *wpool);
185-
OPAL_DECLSPEC int opal_common_ucx_wpool_init(opal_common_ucx_wpool_t *wpool, int proc_world_size,
186-
bool enable_mt);
185+
OPAL_DECLSPEC int opal_common_ucx_wpool_init(opal_common_ucx_wpool_t *wpool);
187186
OPAL_DECLSPEC void opal_common_ucx_wpool_finalize(opal_common_ucx_wpool_t *wpool);
188187
OPAL_DECLSPEC int opal_common_ucx_wpool_progress(opal_common_ucx_wpool_t *wpool);
189188

0 commit comments

Comments
 (0)