24
24
memcpy(((char*)(_dst)) + (_off), _src, _len); \
25
25
(_off) += (_len);
26
26
27
+ opal_mutex_t mca_osc_service_mutex = OPAL_MUTEX_STATIC_INIT ;
28
+ static void _osc_ucx_init_lock (void )
29
+ {
30
+ if (mca_osc_ucx_component .enable_mpi_threads ) {
31
+ opal_mutex_lock (& mca_osc_service_mutex );
32
+ }
33
+ }
34
+ static void _osc_ucx_init_unlock (void )
35
+ {
36
+ if (mca_osc_ucx_component .enable_mpi_threads ) {
37
+ opal_mutex_unlock (& mca_osc_service_mutex );
38
+ }
39
+ }
40
+
41
+
27
42
static int component_open (void );
28
43
static int component_register (void );
29
44
static int component_init (bool enable_progress_threads , bool enable_mpi_threads );
@@ -192,6 +207,9 @@ static void ompi_osc_ucx_unregister_progress()
192
207
{
193
208
int ret ;
194
209
210
+ /* May be called concurrently - protect */
211
+ _osc_ucx_init_lock ();
212
+
195
213
mca_osc_ucx_component .num_modules -- ;
196
214
OSC_UCX_ASSERT (mca_osc_ucx_component .num_modules >= 0 );
197
215
if (0 == mca_osc_ucx_component .num_modules ) {
@@ -200,6 +218,8 @@ static void ompi_osc_ucx_unregister_progress()
200
218
OSC_UCX_VERBOSE (1 , "opal_progress_unregister failed: %d" , ret );
201
219
}
202
220
}
221
+
222
+ _osc_ucx_init_unlock ();
203
223
}
204
224
205
225
static int component_select (struct ompi_win_t * win , void * * base , size_t size , int disp_unit ,
@@ -226,7 +246,14 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
226
246
return OMPI_ERR_NOT_SUPPORTED ;
227
247
}
228
248
249
+ /* May be called concurrently - protect */
250
+ _osc_ucx_init_lock ();
251
+
229
252
if (mca_osc_ucx_component .env_initialized == false) {
253
+ /* Lazy initialization of the global state.
254
+ * As not all of the MPI applications are using One-Sided functionality
255
+ * we don't want to initialize in the component_init()
256
+ */
230
257
231
258
OBJ_CONSTRUCT (& mca_osc_ucx_component .requests , opal_free_list_t );
232
259
ret = opal_free_list_init (& mca_osc_ucx_component .requests ,
@@ -236,30 +263,52 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
236
263
0 , 0 , 8 , 0 , 8 , NULL , 0 , NULL , NULL , NULL );
237
264
if (OMPI_SUCCESS != ret ) {
238
265
OSC_UCX_VERBOSE (1 , "opal_free_list_init failed: %d" , ret );
239
- goto error ;
266
+ goto select_unlock ;
240
267
}
241
268
242
269
ret = opal_common_ucx_wpool_init (mca_osc_ucx_component .wpool ,
243
270
ompi_proc_world_size (),
244
271
mca_osc_ucx_component .enable_mpi_threads );
245
272
if (OMPI_SUCCESS != ret ) {
246
273
OSC_UCX_VERBOSE (1 , "opal_common_ucx_wpool_init failed: %d" , ret );
247
- goto error ;
274
+ goto select_unlock ;
248
275
}
249
276
277
+ /* Make sure that all memory updates performed above are globally
278
+ * observable before (mca_osc_ucx_component.env_initialized = true)
279
+ */
250
280
mca_osc_ucx_component .env_initialized = true;
251
281
env_initialized = true;
252
282
}
253
283
284
+ /* Account for the number of active "modules" = MPI windows */
285
+ mca_osc_ucx_component .num_modules ++ ;
286
+
287
+ /* If this is the first window to be registered - register the progress
288
+ * callback
289
+ */
290
+ OSC_UCX_ASSERT (mca_osc_ucx_component .num_modules > 0 );
291
+ if (1 == mca_osc_ucx_component .num_modules ) {
292
+ ret = opal_progress_register (progress_callback );
293
+ if (OMPI_SUCCESS != ret ) {
294
+ OSC_UCX_VERBOSE (1 , "opal_progress_register failed: %d" , ret );
295
+ goto error ;
296
+ }
297
+ }
298
+
299
+ select_unlock :
300
+ _osc_ucx_init_unlock ();
301
+ if (ret ) {
302
+ goto error ;
303
+ }
304
+
254
305
/* create module structure */
255
306
module = (ompi_osc_ucx_module_t * )calloc (1 , sizeof (ompi_osc_ucx_module_t ));
256
307
if (module == NULL ) {
257
308
ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE ;
258
309
goto error_nomem ;
259
310
}
260
311
261
- mca_osc_ucx_component .num_modules ++ ;
262
-
263
312
/* fill in the function pointer part */
264
313
memcpy (module , & ompi_osc_ucx_module_template , sizeof (ompi_osc_base_module_t ));
265
314
@@ -413,19 +462,15 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
413
462
goto error ;
414
463
}
415
464
416
- OSC_UCX_ASSERT (mca_osc_ucx_component .num_modules > 0 );
417
- if (1 == mca_osc_ucx_component .num_modules ) {
418
- ret = opal_progress_register (progress_callback );
419
- if (OMPI_SUCCESS != ret ) {
420
- OSC_UCX_VERBOSE (1 , "opal_progress_register failed: %d" , ret );
421
- goto error ;
422
- }
423
- }
424
465
return ret ;
425
466
426
- error :
467
+ error :
427
468
if (module -> disp_units ) free (module -> disp_units );
428
469
if (module -> comm ) ompi_comm_free (& module -> comm );
470
+ /* We update the modules count and (if need) registering a callback right
471
+ * prior to memory allocation for the module.
472
+ * So we use it as an indirect sign here
473
+ */
429
474
if (module ) {
430
475
free (module );
431
476
ompi_osc_ucx_unregister_progress ();
0 commit comments