24
24
memcpy(((char*)(_dst)) + (_off), _src, _len); \
25
25
(_off) += (_len);
26
26
27
+ opal_mutex_t mca_osc_service_mutex = OPAL_MUTEX_STATIC_INIT ;
28
+ static void _osc_ucx_init_lock (void )
29
+ {
30
+ if (mca_osc_ucx_component .enable_mpi_threads ) {
31
+ opal_mutex_lock (& mca_osc_service_mutex );
32
+ }
33
+ }
34
+ static void _osc_ucx_init_unlock (void )
35
+ {
36
+ if (mca_osc_ucx_component .enable_mpi_threads ) {
37
+ opal_mutex_unlock (& mca_osc_service_mutex );
38
+ }
39
+ }
40
+
41
+
27
42
static int component_open (void );
28
43
static int component_register (void );
29
44
static int component_init (bool enable_progress_threads , bool enable_mpi_threads );
@@ -141,6 +156,9 @@ static int component_init(bool enable_progress_threads, bool enable_mpi_threads)
141
156
142
157
static int component_finalize (void ) {
143
158
opal_common_ucx_mca_deregister ();
159
+ if (mca_osc_ucx_component .env_initialized ) {
160
+ opal_common_ucx_wpool_finalize (mca_osc_ucx_component .wpool );
161
+ }
144
162
opal_common_ucx_wpool_free (mca_osc_ucx_component .wpool );
145
163
return OMPI_SUCCESS ;
146
164
}
@@ -189,6 +207,9 @@ static void ompi_osc_ucx_unregister_progress()
189
207
{
190
208
int ret ;
191
209
210
+ /* May be called concurrently - protect */
211
+ _osc_ucx_init_lock ();
212
+
192
213
mca_osc_ucx_component .num_modules -- ;
193
214
OSC_UCX_ASSERT (mca_osc_ucx_component .num_modules >= 0 );
194
215
if (0 == mca_osc_ucx_component .num_modules ) {
@@ -197,6 +218,8 @@ static void ompi_osc_ucx_unregister_progress()
197
218
OSC_UCX_VERBOSE (1 , "opal_progress_unregister failed: %d" , ret );
198
219
}
199
220
}
221
+
222
+ _osc_ucx_init_unlock ();
200
223
}
201
224
202
225
static int component_select (struct ompi_win_t * win , void * * base , size_t size , int disp_unit ,
@@ -223,7 +246,14 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
223
246
return OMPI_ERR_NOT_SUPPORTED ;
224
247
}
225
248
249
+ /* May be called concurrently - protect */
250
+ _osc_ucx_init_lock ();
251
+
226
252
if (mca_osc_ucx_component .env_initialized == false) {
253
+ /* Lazy initialization of the global state.
254
+ * As not all of the MPI applications are using One-Sided functionality
255
+ * we don't want to initialize in the component_init()
256
+ */
227
257
228
258
OBJ_CONSTRUCT (& mca_osc_ucx_component .requests , opal_free_list_t );
229
259
ret = opal_free_list_init (& mca_osc_ucx_component .requests ,
@@ -233,30 +263,52 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
233
263
0 , 0 , 8 , 0 , 8 , NULL , 0 , NULL , NULL , NULL );
234
264
if (OMPI_SUCCESS != ret ) {
235
265
OSC_UCX_VERBOSE (1 , "opal_free_list_init failed: %d" , ret );
236
- goto error ;
266
+ goto select_unlock ;
237
267
}
238
268
239
269
ret = opal_common_ucx_wpool_init (mca_osc_ucx_component .wpool ,
240
270
ompi_proc_world_size (),
241
271
mca_osc_ucx_component .enable_mpi_threads );
242
272
if (OMPI_SUCCESS != ret ) {
243
273
OSC_UCX_VERBOSE (1 , "opal_common_ucx_wpool_init failed: %d" , ret );
244
- goto error ;
274
+ goto select_unlock ;
245
275
}
246
276
277
+ /* Make sure that all memory updates performed above are globally
278
+ * observable before (mca_osc_ucx_component.env_initialized = true)
279
+ */
247
280
mca_osc_ucx_component .env_initialized = true;
248
281
env_initialized = true;
249
282
}
250
283
284
+ /* Account for the number of active "modules" = MPI windows */
285
+ mca_osc_ucx_component .num_modules ++ ;
286
+
287
+ /* If this is the first window to be registered - register the progress
288
+ * callback
289
+ */
290
+ OSC_UCX_ASSERT (mca_osc_ucx_component .num_modules > 0 );
291
+ if (1 == mca_osc_ucx_component .num_modules ) {
292
+ ret = opal_progress_register (progress_callback );
293
+ if (OMPI_SUCCESS != ret ) {
294
+ OSC_UCX_VERBOSE (1 , "opal_progress_register failed: %d" , ret );
295
+ goto error ;
296
+ }
297
+ }
298
+
299
+ select_unlock :
300
+ _osc_ucx_init_unlock ();
301
+ if (ret ) {
302
+ goto error ;
303
+ }
304
+
251
305
/* create module structure */
252
306
module = (ompi_osc_ucx_module_t * )calloc (1 , sizeof (ompi_osc_ucx_module_t ));
253
307
if (module == NULL ) {
254
308
ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE ;
255
309
goto error_nomem ;
256
310
}
257
311
258
- mca_osc_ucx_component .num_modules ++ ;
259
-
260
312
/* fill in the function pointer part */
261
313
memcpy (module , & ompi_osc_ucx_module_template , sizeof (ompi_osc_base_module_t ));
262
314
@@ -410,19 +462,15 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
410
462
goto error ;
411
463
}
412
464
413
- OSC_UCX_ASSERT (mca_osc_ucx_component .num_modules > 0 );
414
- if (1 == mca_osc_ucx_component .num_modules ) {
415
- ret = opal_progress_register (progress_callback );
416
- if (OMPI_SUCCESS != ret ) {
417
- OSC_UCX_VERBOSE (1 , "opal_progress_register failed: %d" , ret );
418
- goto error ;
419
- }
420
- }
421
465
return ret ;
422
466
423
- error :
467
+ error :
424
468
if (module -> disp_units ) free (module -> disp_units );
425
469
if (module -> comm ) ompi_comm_free (& module -> comm );
470
+ /* We update the modules count and (if need) registering a callback right
471
+ * prior to memory allocation for the module.
472
+ * So we use it as an indirect sign here
473
+ */
426
474
if (module ) {
427
475
free (module );
428
476
ompi_osc_ucx_unregister_progress ();
@@ -575,8 +623,6 @@ int ompi_osc_ucx_free(struct ompi_win_t *win) {
575
623
}
576
624
577
625
opal_common_ucx_wpctx_release (module -> ctx );
578
-
579
- opal_common_ucx_wpool_finalize (mca_osc_ucx_component .wpool );
580
626
581
627
if (module -> disp_units ) free (module -> disp_units );
582
628
ompi_comm_free (& module -> comm );
0 commit comments