Skip to content

Commit 53d8fd0

Browse files
committed
Handle all errors in mca_part_persist_progress and return the number of completed tasks.
Signed-off-by: Keluaa <34173752+Keluaa@users.noreply.github.com>
1 parent 473bcc0 commit 53d8fd0

File tree

1 file changed

+50
-43
lines changed

1 file changed

+50
-43
lines changed

ompi/mca/part/persist/part_persist.h

Lines changed: 50 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -159,63 +159,56 @@ __opal_attribute_always_inline__ static inline int
159159
mca_part_persist_progress(void)
160160
{
161161
mca_part_persist_list_t *current;
162-
int err;
162+
int err = OMPI_SUCCESS;
163+
int completed = 0;
163164
size_t i;
164165

165166
/* prevent re-entry, */
166167
int block_entry = opal_atomic_add_fetch_32(&(ompi_part_persist.block_entry), 1);
167-
if(1 < block_entry)
168-
{
168+
if(1 < block_entry) {
169169
block_entry = opal_atomic_add_fetch_32(&(ompi_part_persist.block_entry), -1);
170-
return OMPI_SUCCESS;
170+
return completed;
171171
}
172172

173173
OPAL_THREAD_LOCK(&ompi_part_persist.lock);
174174

175175
mca_part_persist_request_t* to_delete = NULL;
176176

177177
/* Don't do anything till a function in the module is called. */
178-
if(-1 == ompi_part_persist.init_world)
179-
{
180-
OPAL_THREAD_UNLOCK(&ompi_part_persist.lock);
181-
block_entry = opal_atomic_add_fetch_32(&(ompi_part_persist.block_entry), -1);
182-
return OMPI_SUCCESS;
178+
if(-1 == ompi_part_persist.init_world) {
179+
goto end_part_progress;
183180
}
184181

185182
/* Can't do anything if we don't have world */
186183
if(0 == ompi_part_persist.init_world) {
187184
ompi_part_persist.my_world_rank = ompi_comm_rank(&ompi_mpi_comm_world.comm);
188185
err = ompi_comm_idup(&ompi_mpi_comm_world.comm, &ompi_part_persist.part_comm, &ompi_part_persist.part_comm_req);
189-
if(err != OMPI_SUCCESS) {
190-
exit(-1);
191-
}
186+
if(OMPI_SUCCESS != err) goto end_part_progress;
192187
ompi_part_persist.part_comm_ready = 0;
193188
err = ompi_comm_idup(&ompi_mpi_comm_world.comm, &ompi_part_persist.part_comm_setup, &ompi_part_persist.part_comm_sreq);
194-
if(err != OMPI_SUCCESS) {
195-
exit(-1);
196-
}
189+
if(OMPI_SUCCESS != err) goto end_part_progress;
197190
ompi_part_persist.part_comm_sready = 0;
198191
ompi_part_persist.init_world = 1;
199192

200-
OPAL_THREAD_UNLOCK(&ompi_part_persist.lock);
201-
block_entry = opal_atomic_add_fetch_32(&(ompi_part_persist.block_entry), -1);
202-
return OMPI_SUCCESS;
193+
completed++;
194+
goto end_part_progress;
203195
}
204196

205197
/* Check to see if Comms are setup */
206198
if(0 == ompi_part_persist.init_comms) {
207199
if(0 == ompi_part_persist.part_comm_ready) {
208-
ompi_request_test(&ompi_part_persist.part_comm_req, &ompi_part_persist.part_comm_ready, MPI_STATUS_IGNORE);
200+
err = ompi_request_test(&ompi_part_persist.part_comm_req, &ompi_part_persist.part_comm_ready, MPI_STATUS_IGNORE);
201+
if(OMPI_SUCCESS != err) goto end_part_progress;
209202
}
210203
if(0 == ompi_part_persist.part_comm_sready) {
211-
ompi_request_test(&ompi_part_persist.part_comm_sreq, &ompi_part_persist.part_comm_sready, MPI_STATUS_IGNORE);
204+
err = ompi_request_test(&ompi_part_persist.part_comm_sreq, &ompi_part_persist.part_comm_sready, MPI_STATUS_IGNORE);
205+
if(OMPI_SUCCESS != err) goto end_part_progress;
212206
}
213207
if(0 != ompi_part_persist.part_comm_ready && 0 != ompi_part_persist.part_comm_sready) {
214208
ompi_part_persist.init_comms = 1;
215209
}
216-
OPAL_THREAD_UNLOCK(&ompi_part_persist.lock);
217-
block_entry = opal_atomic_add_fetch_32(&(ompi_part_persist.block_entry), -1);
218-
return OMPI_SUCCESS;
210+
completed++;
211+
goto end_part_progress;
219212
}
220213

221214
OPAL_LIST_FOREACH(current, ompi_part_persist.progress_list, mca_part_persist_list_t) {
@@ -227,10 +220,12 @@ mca_part_persist_progress(void)
227220

228221
if(true == req->flag_post_setup_recv) {
229222
err = MCA_PML_CALL(irecv(&(req->setup_info[1]), sizeof(struct ompi_mca_persist_setup_t), MPI_BYTE, OMPI_ANY_SOURCE, req->my_recv_tag, ompi_part_persist.part_comm_setup, &req->setup_req[1]));
223+
if(OMPI_SUCCESS != err) goto end_part_progress;
230224
req->flag_post_setup_recv = false;
231225
}
232226

233-
ompi_request_test(&(req->setup_req[1]), &done, MPI_STATUS_IGNORE);
227+
err = ompi_request_test(&(req->setup_req[1]), &done, MPI_STATUS_IGNORE);
228+
if(OMPI_SUCCESS != err) goto end_part_progress;
234229

235230
if(done) {
236231
size_t dt_size_;
@@ -241,15 +236,16 @@ mca_part_persist_progress(void)
241236
req->world_peer = req->setup_info[1].world_rank;
242237

243238
err = opal_datatype_type_size(&(req->req_datatype->super), &dt_size_);
244-
if(OMPI_SUCCESS != err) return OMPI_ERROR;
239+
if(OMPI_SUCCESS != err) goto end_part_progress;
245240
dt_size = (dt_size_ > (size_t) UINT_MAX) ? MPI_UNDEFINED : (uint32_t) dt_size_;
246241
uint32_t bytes = req->real_count * dt_size;
247242

248243
/* Set up persistent sends */
249244
req->persist_reqs = (ompi_request_t**) malloc(sizeof(ompi_request_t*)*(req->real_parts));
250245
for(i = 0; i < req->real_parts; i++) {
251-
void *buf = ((void*) (((char*)req->req_addr) + (bytes * i)));
252-
err = MCA_PML_CALL(isend_init(buf, req->real_count, req->req_datatype, req->world_peer, req->my_send_tag+i, MCA_PML_BASE_SEND_STANDARD, ompi_part_persist.part_comm, &(req->persist_reqs[i])));
246+
void *buf = ((void*) (((char*)req->req_addr) + (bytes * i)));
247+
err = MCA_PML_CALL(isend_init(buf, req->real_count, req->req_datatype, req->world_peer, req->my_send_tag+i, MCA_PML_BASE_SEND_STANDARD, ompi_part_persist.part_comm, &(req->persist_reqs[i])));
248+
if(OMPI_SUCCESS != err) goto end_part_progress;
253249
}
254250
} else {
255251
/* parse message */
@@ -262,37 +258,38 @@ mca_part_persist_progress(void)
262258

263259

264260
err = opal_datatype_type_size(&(req->req_datatype->super), &dt_size_);
265-
if(OMPI_SUCCESS != err) return OMPI_ERROR;
261+
if(OMPI_SUCCESS != err) goto end_part_progress;
266262
dt_size = (dt_size_ > (size_t) UINT_MAX) ? MPI_UNDEFINED : (uint32_t) dt_size_;
267263
uint32_t bytes = req->real_count * dt_size;
268264

269265

270-
271-
/* Set up persistent sends */
266+
/* Set up persistent sends */
272267
req->persist_reqs = (ompi_request_t**) malloc(sizeof(ompi_request_t*)*(req->real_parts));
273268
req->flags = (int*) calloc(req->real_parts,sizeof(int));
274-
275269
if(req->real_dt_size == dt_size) {
276-
277-
for(i = 0; i < req->real_parts; i++) {
270+
for(i = 0; i < req->real_parts; i++) {
278271
void *buf = ((void*) (((char*)req->req_addr) + (bytes * i)));
279272
err = MCA_PML_CALL(irecv_init(buf, req->real_count, req->req_datatype, req->world_peer, req->my_send_tag+i, ompi_part_persist.part_comm, &(req->persist_reqs[i])));
273+
if(OMPI_SUCCESS != err) goto end_part_progress;
280274
}
281275
} else {
282276
for(i = 0; i < req->real_parts; i++) {
283277
void *buf = ((void*) (((char*)req->req_addr) + (req->real_count * req->real_dt_size * i)));
284278
err = MCA_PML_CALL(irecv_init(buf, req->real_count * req->real_dt_size, MPI_BYTE, req->world_peer, req->my_send_tag+i, ompi_part_persist.part_comm, &(req->persist_reqs[i])));
279+
if(OMPI_SUCCESS != err) goto end_part_progress;
285280
}
286-
}
287-
err = req->persist_reqs[0]->req_start(req->real_parts, (&(req->persist_reqs[0])));
281+
}
282+
err = req->persist_reqs[0]->req_start(req->real_parts, (&(req->persist_reqs[0])));
283+
if(OMPI_SUCCESS != err) goto end_part_progress;
288284

289285
/* Send back a message */
290286
req->setup_info[0].world_rank = ompi_part_persist.my_world_rank;
291287
err = MCA_PML_CALL(isend(&(req->setup_info[0]), sizeof(struct ompi_mca_persist_setup_t), MPI_BYTE, req->world_peer, req->my_recv_tag, MCA_PML_BASE_SEND_STANDARD, ompi_part_persist.part_comm_setup, &req->setup_req[0]));
292-
if(OMPI_SUCCESS != err) return OMPI_ERROR;
288+
if(OMPI_SUCCESS != err) goto end_part_progress;
293289
}
294290

295-
req->initialized = true;
291+
completed++;
292+
req->initialized = true;
296293
}
297294
} else {
298295
if(false == req->req_part_complete && REQUEST_COMPLETED != req->req_ompi.req_complete && OMPI_REQUEST_ACTIVE == req->req_ompi.req_state) {
@@ -301,21 +298,27 @@ mca_part_persist_progress(void)
301298
/* Check to see if partition is queued for being started. Only applicable to sends. */
302299
if(-2 == req->flags[i]) {
303300
err = req->persist_reqs[i]->req_start(1, (&(req->persist_reqs[i])));
301+
if(OMPI_SUCCESS != err) goto end_part_progress;
304302
req->flags[i] = 0;
305303
}
306304

307305
if(0 == req->flags[i] && OMPI_REQUEST_ACTIVE == req->persist_reqs[i]->req_state) {
308-
ompi_request_test(&(req->persist_reqs[i]), &(req->flags[i]), MPI_STATUS_IGNORE);
309-
if(0 != req->flags[i]) req->done_count++;
306+
err = ompi_request_test(&(req->persist_reqs[i]), &(req->flags[i]), MPI_STATUS_IGNORE);
307+
if(OMPI_SUCCESS != err) goto end_part_progress;
308+
if(0 != req->flags[i]) {
309+
req->done_count++;
310+
}
310311
}
311312
}
312313

313314
/* Check for completion and complete the requests */
314-
if(req->done_count == req->real_parts)
315-
{
315+
if(req->done_count == req->real_parts) {
316316
req->first_send = false;
317317
mca_part_persist_complete(req);
318-
}
318+
completed++;
319+
} else if(req->done_count > req->real_parts) {
320+
ompi_rte_abort(OMPI_ERR_FATAL, "internal part request done count is %d > %d", req->done_count, req->real_parts);
321+
}
319322
}
320323

321324
if(true == req->req_free_called && true == req->req_part_complete && REQUEST_COMPLETED == req->req_ompi.req_complete && OMPI_REQUEST_INACTIVE == req->req_ompi.req_state) {
@@ -328,10 +331,14 @@ mca_part_persist_progress(void)
328331
err = mca_part_persist_free_req(to_delete);
329332
}
330333

334+
end_part_progress:
335+
if(OMPI_SUCCESS != err) {
336+
ompi_rte_abort(err, "part progress internal failure");
337+
}
331338
OPAL_THREAD_UNLOCK(&ompi_part_persist.lock);
332339
block_entry = opal_atomic_add_fetch_32(&(ompi_part_persist.block_entry), -1);
333340

334-
return OMPI_SUCCESS;
341+
return completed;
335342
}
336343

337344
__opal_attribute_always_inline__ static inline int

0 commit comments

Comments
 (0)