@@ -159,63 +159,56 @@ __opal_attribute_always_inline__ static inline int
159
159
mca_part_persist_progress (void )
160
160
{
161
161
mca_part_persist_list_t * current ;
162
- int err ;
162
+ int err = OMPI_SUCCESS ;
163
+ int completed = 0 ;
163
164
size_t i ;
164
165
165
166
/* prevent re-entry, */
166
167
int block_entry = opal_atomic_add_fetch_32 (& (ompi_part_persist .block_entry ), 1 );
167
- if (1 < block_entry )
168
- {
168
+ if (1 < block_entry ) {
169
169
block_entry = opal_atomic_add_fetch_32 (& (ompi_part_persist .block_entry ), -1 );
170
- return OMPI_SUCCESS ;
170
+ return completed ;
171
171
}
172
172
173
173
OPAL_THREAD_LOCK (& ompi_part_persist .lock );
174
174
175
175
mca_part_persist_request_t * to_delete = NULL ;
176
176
177
177
/* Don't do anything till a function in the module is called. */
178
- if (-1 == ompi_part_persist .init_world )
179
- {
180
- OPAL_THREAD_UNLOCK (& ompi_part_persist .lock );
181
- block_entry = opal_atomic_add_fetch_32 (& (ompi_part_persist .block_entry ), -1 );
182
- return OMPI_SUCCESS ;
178
+ if (-1 == ompi_part_persist .init_world ) {
179
+ goto end_part_progress ;
183
180
}
184
181
185
182
/* Can't do anything if we don't have world */
186
183
if (0 == ompi_part_persist .init_world ) {
187
184
ompi_part_persist .my_world_rank = ompi_comm_rank (& ompi_mpi_comm_world .comm );
188
185
err = ompi_comm_idup (& ompi_mpi_comm_world .comm , & ompi_part_persist .part_comm , & ompi_part_persist .part_comm_req );
189
- if (err != OMPI_SUCCESS ) {
190
- exit (-1 );
191
- }
186
+ if (OMPI_SUCCESS != err ) goto end_part_progress ;
192
187
ompi_part_persist .part_comm_ready = 0 ;
193
188
err = ompi_comm_idup (& ompi_mpi_comm_world .comm , & ompi_part_persist .part_comm_setup , & ompi_part_persist .part_comm_sreq );
194
- if (err != OMPI_SUCCESS ) {
195
- exit (-1 );
196
- }
189
+ if (OMPI_SUCCESS != err ) goto end_part_progress ;
197
190
ompi_part_persist .part_comm_sready = 0 ;
198
191
ompi_part_persist .init_world = 1 ;
199
192
200
- OPAL_THREAD_UNLOCK (& ompi_part_persist .lock );
201
- block_entry = opal_atomic_add_fetch_32 (& (ompi_part_persist .block_entry ), -1 );
202
- return OMPI_SUCCESS ;
193
+ completed ++ ;
194
+ goto end_part_progress ;
203
195
}
204
196
205
197
/* Check to see if Comms are setup */
206
198
if (0 == ompi_part_persist .init_comms ) {
207
199
if (0 == ompi_part_persist .part_comm_ready ) {
208
- ompi_request_test (& ompi_part_persist .part_comm_req , & ompi_part_persist .part_comm_ready , MPI_STATUS_IGNORE );
200
+ err = ompi_request_test (& ompi_part_persist .part_comm_req , & ompi_part_persist .part_comm_ready , MPI_STATUS_IGNORE );
201
+ if (OMPI_SUCCESS != err ) goto end_part_progress ;
209
202
}
210
203
if (0 == ompi_part_persist .part_comm_sready ) {
211
- ompi_request_test (& ompi_part_persist .part_comm_sreq , & ompi_part_persist .part_comm_sready , MPI_STATUS_IGNORE );
204
+ err = ompi_request_test (& ompi_part_persist .part_comm_sreq , & ompi_part_persist .part_comm_sready , MPI_STATUS_IGNORE );
205
+ if (OMPI_SUCCESS != err ) goto end_part_progress ;
212
206
}
213
207
if (0 != ompi_part_persist .part_comm_ready && 0 != ompi_part_persist .part_comm_sready ) {
214
208
ompi_part_persist .init_comms = 1 ;
215
209
}
216
- OPAL_THREAD_UNLOCK (& ompi_part_persist .lock );
217
- block_entry = opal_atomic_add_fetch_32 (& (ompi_part_persist .block_entry ), -1 );
218
- return OMPI_SUCCESS ;
210
+ completed ++ ;
211
+ goto end_part_progress ;
219
212
}
220
213
221
214
OPAL_LIST_FOREACH (current , ompi_part_persist .progress_list , mca_part_persist_list_t ) {
@@ -227,10 +220,12 @@ mca_part_persist_progress(void)
227
220
228
221
if (true == req -> flag_post_setup_recv ) {
229
222
err = MCA_PML_CALL (irecv (& (req -> setup_info [1 ]), sizeof (struct ompi_mca_persist_setup_t ), MPI_BYTE , OMPI_ANY_SOURCE , req -> my_recv_tag , ompi_part_persist .part_comm_setup , & req -> setup_req [1 ]));
223
+ if (OMPI_SUCCESS != err ) goto end_part_progress ;
230
224
req -> flag_post_setup_recv = false;
231
225
}
232
226
233
- ompi_request_test (& (req -> setup_req [1 ]), & done , MPI_STATUS_IGNORE );
227
+ err = ompi_request_test (& (req -> setup_req [1 ]), & done , MPI_STATUS_IGNORE );
228
+ if (OMPI_SUCCESS != err ) goto end_part_progress ;
234
229
235
230
if (done ) {
236
231
size_t dt_size_ ;
@@ -241,15 +236,16 @@ mca_part_persist_progress(void)
241
236
req -> world_peer = req -> setup_info [1 ].world_rank ;
242
237
243
238
err = opal_datatype_type_size (& (req -> req_datatype -> super ), & dt_size_ );
244
- if (OMPI_SUCCESS != err ) return OMPI_ERROR ;
239
+ if (OMPI_SUCCESS != err ) goto end_part_progress ;
245
240
dt_size = (dt_size_ > (size_t ) UINT_MAX ) ? MPI_UNDEFINED : (uint32_t ) dt_size_ ;
246
241
uint32_t bytes = req -> real_count * dt_size ;
247
242
248
243
/* Set up persistent sends */
249
244
req -> persist_reqs = (ompi_request_t * * ) malloc (sizeof (ompi_request_t * )* (req -> real_parts ));
250
245
for (i = 0 ; i < req -> real_parts ; i ++ ) {
251
- void * buf = ((void * ) (((char * )req -> req_addr ) + (bytes * i )));
252
- err = MCA_PML_CALL (isend_init (buf , req -> real_count , req -> req_datatype , req -> world_peer , req -> my_send_tag + i , MCA_PML_BASE_SEND_STANDARD , ompi_part_persist .part_comm , & (req -> persist_reqs [i ])));
246
+ void * buf = ((void * ) (((char * )req -> req_addr ) + (bytes * i )));
247
+ err = MCA_PML_CALL (isend_init (buf , req -> real_count , req -> req_datatype , req -> world_peer , req -> my_send_tag + i , MCA_PML_BASE_SEND_STANDARD , ompi_part_persist .part_comm , & (req -> persist_reqs [i ])));
248
+ if (OMPI_SUCCESS != err ) goto end_part_progress ;
253
249
}
254
250
} else {
255
251
/* parse message */
@@ -262,37 +258,38 @@ mca_part_persist_progress(void)
262
258
263
259
264
260
err = opal_datatype_type_size (& (req -> req_datatype -> super ), & dt_size_ );
265
- if (OMPI_SUCCESS != err ) return OMPI_ERROR ;
261
+ if (OMPI_SUCCESS != err ) goto end_part_progress ;
266
262
dt_size = (dt_size_ > (size_t ) UINT_MAX ) ? MPI_UNDEFINED : (uint32_t ) dt_size_ ;
267
263
uint32_t bytes = req -> real_count * dt_size ;
268
264
269
265
270
-
271
- /* Set up persistent sends */
266
+ /* Set up persistent sends */
272
267
req -> persist_reqs = (ompi_request_t * * ) malloc (sizeof (ompi_request_t * )* (req -> real_parts ));
273
268
req -> flags = (int * ) calloc (req -> real_parts ,sizeof (int ));
274
-
275
269
if (req -> real_dt_size == dt_size ) {
276
-
277
- for (i = 0 ; i < req -> real_parts ; i ++ ) {
270
+ for (i = 0 ; i < req -> real_parts ; i ++ ) {
278
271
void * buf = ((void * ) (((char * )req -> req_addr ) + (bytes * i )));
279
272
err = MCA_PML_CALL (irecv_init (buf , req -> real_count , req -> req_datatype , req -> world_peer , req -> my_send_tag + i , ompi_part_persist .part_comm , & (req -> persist_reqs [i ])));
273
+ if (OMPI_SUCCESS != err ) goto end_part_progress ;
280
274
}
281
275
} else {
282
276
for (i = 0 ; i < req -> real_parts ; i ++ ) {
283
277
void * buf = ((void * ) (((char * )req -> req_addr ) + (req -> real_count * req -> real_dt_size * i )));
284
278
err = MCA_PML_CALL (irecv_init (buf , req -> real_count * req -> real_dt_size , MPI_BYTE , req -> world_peer , req -> my_send_tag + i , ompi_part_persist .part_comm , & (req -> persist_reqs [i ])));
279
+ if (OMPI_SUCCESS != err ) goto end_part_progress ;
285
280
}
286
- }
287
- err = req -> persist_reqs [0 ]-> req_start (req -> real_parts , (& (req -> persist_reqs [0 ])));
281
+ }
282
+ err = req -> persist_reqs [0 ]-> req_start (req -> real_parts , (& (req -> persist_reqs [0 ])));
283
+ if (OMPI_SUCCESS != err ) goto end_part_progress ;
288
284
289
285
/* Send back a message */
290
286
req -> setup_info [0 ].world_rank = ompi_part_persist .my_world_rank ;
291
287
err = MCA_PML_CALL (isend (& (req -> setup_info [0 ]), sizeof (struct ompi_mca_persist_setup_t ), MPI_BYTE , req -> world_peer , req -> my_recv_tag , MCA_PML_BASE_SEND_STANDARD , ompi_part_persist .part_comm_setup , & req -> setup_req [0 ]));
292
- if (OMPI_SUCCESS != err ) return OMPI_ERROR ;
288
+ if (OMPI_SUCCESS != err ) goto end_part_progress ;
293
289
}
294
290
295
- req -> initialized = true;
291
+ completed ++ ;
292
+ req -> initialized = true;
296
293
}
297
294
} else {
298
295
if (false == req -> req_part_complete && REQUEST_COMPLETED != req -> req_ompi .req_complete && OMPI_REQUEST_ACTIVE == req -> req_ompi .req_state ) {
@@ -301,21 +298,27 @@ mca_part_persist_progress(void)
301
298
/* Check to see if partition is queued for being started. Only applicable to sends. */
302
299
if (-2 == req -> flags [i ]) {
303
300
err = req -> persist_reqs [i ]-> req_start (1 , (& (req -> persist_reqs [i ])));
301
+ if (OMPI_SUCCESS != err ) goto end_part_progress ;
304
302
req -> flags [i ] = 0 ;
305
303
}
306
304
307
305
if (0 == req -> flags [i ] && OMPI_REQUEST_ACTIVE == req -> persist_reqs [i ]-> req_state ) {
308
- ompi_request_test (& (req -> persist_reqs [i ]), & (req -> flags [i ]), MPI_STATUS_IGNORE );
309
- if (0 != req -> flags [i ]) req -> done_count ++ ;
306
+ err = ompi_request_test (& (req -> persist_reqs [i ]), & (req -> flags [i ]), MPI_STATUS_IGNORE );
307
+ if (OMPI_SUCCESS != err ) goto end_part_progress ;
308
+ if (0 != req -> flags [i ]) {
309
+ req -> done_count ++ ;
310
+ }
310
311
}
311
312
}
312
313
313
314
/* Check for completion and complete the requests */
314
- if (req -> done_count == req -> real_parts )
315
- {
315
+ if (req -> done_count == req -> real_parts ) {
316
316
req -> first_send = false;
317
317
mca_part_persist_complete (req );
318
- }
318
+ completed ++ ;
319
+ } else if (req -> done_count > req -> real_parts ) {
320
+ ompi_rte_abort (OMPI_ERR_FATAL , "internal part request done count is %d > %d" , req -> done_count , req -> real_parts );
321
+ }
319
322
}
320
323
321
324
if (true == req -> req_free_called && true == req -> req_part_complete && REQUEST_COMPLETED == req -> req_ompi .req_complete && OMPI_REQUEST_INACTIVE == req -> req_ompi .req_state ) {
@@ -328,10 +331,14 @@ mca_part_persist_progress(void)
328
331
err = mca_part_persist_free_req (to_delete );
329
332
}
330
333
334
+ end_part_progress :
335
+ if (OMPI_SUCCESS != err ) {
336
+ ompi_rte_abort (err , "part progress internal failure" );
337
+ }
331
338
OPAL_THREAD_UNLOCK (& ompi_part_persist .lock );
332
339
block_entry = opal_atomic_add_fetch_32 (& (ompi_part_persist .block_entry ), -1 );
333
340
334
- return OMPI_SUCCESS ;
341
+ return completed ;
335
342
}
336
343
337
344
__opal_attribute_always_inline__ static inline int
0 commit comments