@@ -113,64 +113,16 @@ static int progress_callback(void) {
113
113
}
114
114
115
115
static int component_init (bool enable_progress_threads , bool enable_mpi_threads ) {
116
- ucp_config_t * config = NULL ;
117
- ucp_params_t context_params ;
118
- bool requests_created = false;
119
116
int ret = OMPI_SUCCESS ;
120
- ucs_status_t status ;
121
117
122
118
mca_osc_ucx_component .ucp_context = NULL ;
123
119
mca_osc_ucx_component .ucp_worker = NULL ;
124
120
mca_osc_ucx_component .enable_mpi_threads = enable_mpi_threads ;
125
-
126
- status = ucp_config_read ("MPI" , NULL , & config );
127
- if (UCS_OK != status ) {
128
- OSC_UCX_VERBOSE (1 , "ucp_config_read failed: %d" , status );
129
- return OMPI_ERROR ;
130
- }
131
-
132
- OBJ_CONSTRUCT (& mca_osc_ucx_component .requests , opal_free_list_t );
133
- requests_created = true;
134
- ret = opal_free_list_init (& mca_osc_ucx_component .requests ,
135
- sizeof (ompi_osc_ucx_request_t ),
136
- opal_cache_line_size ,
137
- OBJ_CLASS (ompi_osc_ucx_request_t ),
138
- 0 , 0 , 8 , 0 , 8 , NULL , 0 , NULL , NULL , NULL );
139
- if (OMPI_SUCCESS != ret ) {
140
- OSC_UCX_VERBOSE (1 , "opal_free_list_init failed: %d" , ret );
141
- goto error ;
142
- }
143
-
121
+ mca_osc_ucx_component .env_initialized = false;
144
122
mca_osc_ucx_component .num_incomplete_req_ops = 0 ;
145
123
146
- /* initialize UCP context */
147
-
148
- memset (& context_params , 0 , sizeof (context_params ));
149
- context_params .field_mask = UCP_PARAM_FIELD_FEATURES |
150
- UCP_PARAM_FIELD_MT_WORKERS_SHARED |
151
- UCP_PARAM_FIELD_ESTIMATED_NUM_EPS |
152
- UCP_PARAM_FIELD_REQUEST_INIT |
153
- UCP_PARAM_FIELD_REQUEST_SIZE ;
154
- context_params .features = UCP_FEATURE_RMA | UCP_FEATURE_AMO32 | UCP_FEATURE_AMO64 ;
155
- context_params .mt_workers_shared = 0 ;
156
- context_params .estimated_num_eps = ompi_proc_world_size ();
157
- context_params .request_init = internal_req_init ;
158
- context_params .request_size = sizeof (ompi_osc_ucx_internal_request_t );
159
-
160
- status = ucp_init (& context_params , config , & mca_osc_ucx_component .ucp_context );
161
- ucp_config_release (config );
162
- if (UCS_OK != status ) {
163
- OSC_UCX_VERBOSE (1 , "ucp_init failed: %d" , status );
164
- ret = OMPI_ERROR ;
165
- goto error ;
166
- }
167
-
168
124
opal_common_ucx_mca_register ();
169
125
return ret ;
170
- error :
171
- if (requests_created ) OBJ_DESTRUCT (& mca_osc_ucx_component .requests );
172
- if (mca_osc_ucx_component .ucp_context ) ucp_cleanup (mca_osc_ucx_component .ucp_context );
173
- return ret ;
174
126
}
175
127
176
128
static int component_finalize (void ) {
@@ -187,9 +139,12 @@ static int component_finalize(void) {
187
139
}
188
140
189
141
assert (mca_osc_ucx_component .num_incomplete_req_ops == 0 );
190
- OBJ_DESTRUCT (& mca_osc_ucx_component .requests );
191
- opal_progress_unregister (progress_callback );
192
- ucp_cleanup (mca_osc_ucx_component .ucp_context );
142
+ if (mca_osc_ucx_component .env_initialized == true) {
143
+ OBJ_DESTRUCT (& mca_osc_ucx_component .requests );
144
+ opal_progress_unregister (progress_callback );
145
+ ucp_cleanup (mca_osc_ucx_component .ucp_context );
146
+ mca_osc_ucx_component .env_initialized = false;
147
+ }
193
148
opal_common_ucx_mca_deregister ();
194
149
return OMPI_SUCCESS ;
195
150
}
@@ -296,7 +251,7 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
296
251
ucs_status_t status ;
297
252
int i , comm_size = ompi_comm_size (comm );
298
253
int is_eps_ready ;
299
- bool progress_registered = false, eps_created = false, worker_created = false;
254
+ bool progress_registered = false, eps_created = false, env_initialized = false;
300
255
ucp_address_t * my_addr = NULL ;
301
256
size_t my_addr_len ;
302
257
char * recv_buf = NULL ;
@@ -315,11 +270,52 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
315
270
return OMPI_ERR_NOT_SUPPORTED ;
316
271
}
317
272
318
- /* if UCP worker has never been initialized before, init it first */
319
- if (mca_osc_ucx_component .ucp_worker == NULL ) {
273
+ if (mca_osc_ucx_component .env_initialized == false) {
274
+ ucp_config_t * config = NULL ;
275
+ ucp_params_t context_params ;
320
276
ucp_worker_params_t worker_params ;
321
277
ucp_worker_attr_t worker_attr ;
322
278
279
+ status = ucp_config_read ("MPI" , NULL , & config );
280
+ if (UCS_OK != status ) {
281
+ OSC_UCX_VERBOSE (1 , "ucp_config_read failed: %d" , status );
282
+ return OMPI_ERROR ;
283
+ }
284
+
285
+ OBJ_CONSTRUCT (& mca_osc_ucx_component .requests , opal_free_list_t );
286
+ ret = opal_free_list_init (& mca_osc_ucx_component .requests ,
287
+ sizeof (ompi_osc_ucx_request_t ),
288
+ opal_cache_line_size ,
289
+ OBJ_CLASS (ompi_osc_ucx_request_t ),
290
+ 0 , 0 , 8 , 0 , 8 , NULL , 0 , NULL , NULL , NULL );
291
+ if (OMPI_SUCCESS != ret ) {
292
+ OSC_UCX_VERBOSE (1 , "opal_free_list_init failed: %d" , ret );
293
+ goto error ;
294
+ }
295
+
296
+ /* initialize UCP context */
297
+
298
+ memset (& context_params , 0 , sizeof (context_params ));
299
+ context_params .field_mask = UCP_PARAM_FIELD_FEATURES |
300
+ UCP_PARAM_FIELD_MT_WORKERS_SHARED |
301
+ UCP_PARAM_FIELD_ESTIMATED_NUM_EPS |
302
+ UCP_PARAM_FIELD_REQUEST_INIT |
303
+ UCP_PARAM_FIELD_REQUEST_SIZE ;
304
+ context_params .features = UCP_FEATURE_RMA | UCP_FEATURE_AMO32 | UCP_FEATURE_AMO64 ;
305
+ context_params .mt_workers_shared = 0 ;
306
+ context_params .estimated_num_eps = ompi_proc_world_size ();
307
+ context_params .request_init = internal_req_init ;
308
+ context_params .request_size = sizeof (ompi_osc_ucx_internal_request_t );
309
+
310
+ status = ucp_init (& context_params , config , & mca_osc_ucx_component .ucp_context );
311
+ ucp_config_release (config );
312
+ if (UCS_OK != status ) {
313
+ OSC_UCX_VERBOSE (1 , "ucp_init failed: %d" , status );
314
+ ret = OMPI_ERROR ;
315
+ goto error ;
316
+ }
317
+
318
+ assert (mca_osc_ucx_component .ucp_worker == NULL );
323
319
memset (& worker_params , 0 , sizeof (worker_params ));
324
320
worker_params .field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE ;
325
321
worker_params .thread_mode = (mca_osc_ucx_component .enable_mpi_threads == true)
@@ -355,7 +351,8 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
355
351
goto error_nomem ;
356
352
}
357
353
358
- worker_created = true;
354
+ mca_osc_ucx_component .env_initialized = true;
355
+ env_initialized = true;
359
356
}
360
357
361
358
/* create module structure */
@@ -650,7 +647,12 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
650
647
if (module ) free (module );
651
648
652
649
error_nomem :
653
- if (worker_created ) ucp_worker_destroy (mca_osc_ucx_component .ucp_worker );
650
+ if (env_initialized == true) {
651
+ OBJ_DESTRUCT (& mca_osc_ucx_component .requests );
652
+ ucp_worker_destroy (mca_osc_ucx_component .ucp_worker );
653
+ ucp_cleanup (mca_osc_ucx_component .ucp_context );
654
+ mca_osc_ucx_component .env_initialized = false;
655
+ }
654
656
return ret ;
655
657
}
656
658
0 commit comments