Skip to content

Commit e4695b2

Browse files
authored
Merge pull request #10507 from abouteiller/ulfm/get_failed
ULFM: new functions MPI_COMM_ACK/GET_FAILED (as per latest standard draft)
2 parents 478b6b2 + 6b48e41 commit e4695b2

20 files changed

+596
-24
lines changed

ompi/communicator/comm_init.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,7 @@ static void ompi_comm_construct(ompi_communicator_t* comm)
451451
#if OPAL_ENABLE_FT_MPI
452452
comm->any_source_enabled = true;
453453
comm->any_source_offset = 0;
454+
comm->num_acked = 0;
454455
comm->comm_revoked = false;
455456
comm->coll_revoked = false;
456457
comm->c_epoch = 0;

ompi/communicator/communicator.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,8 @@ struct ompi_communicator_t {
329329
#if OPAL_ENABLE_FT_MPI
330330
/** agreement caching info for topology and previous returned decisions */
331331
opal_object_t *agreement_specific;
332+
/** num_acked - OMPI_Comm_ack_failed */
333+
int num_acked;
332334
/** MPI_ANY_SOURCE Failed Group Offset - OMPI_Comm_failure_get_acked */
333335
int any_source_offset;
334336
/** Are MPI_ANY_SOURCE operations enabled? - OMPI_Comm_failure_ack */
@@ -598,6 +600,17 @@ static inline bool ompi_comm_is_revoked(ompi_communicator_t* comm)
598600
return (comm->comm_revoked);
599601
}
600602

603+
/*
604+
* Obtain the group of locally known failed processes on comm
605+
*/
606+
OMPI_DECLSPEC int ompi_comm_get_failed_internal(ompi_communicator_t* comm, ompi_group_t** failed_group);
607+
608+
/*
609+
* Acknowledge failures and re-enable MPI_ANY_SOURCE
610+
* Related to OMPI_Comm_failure_ack() and OMPI_Comm_failure_get_acked()
611+
*/
612+
OMPI_DECLSPEC int ompi_comm_ack_failed_internal(ompi_communicator_t* comm, int num_to_ack, int *num_acked);
613+
601614
/*
602615
* Acknowledge failures and re-enable MPI_ANY_SOURCE
603616
* Related to OMPI_Comm_failure_ack() and OMPI_Comm_failure_get_acked()

ompi/communicator/ft/comm_ft.c

Lines changed: 104 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/*
22
* Copyright (c) 2010-2012 Oak Ridge National Labs. All rights reserved.
3-
* Copyright (c) 2011-2020 The University of Tennessee and The University
3+
* Copyright (c) 2011-2022 The University of Tennessee and The University
44
*
55
* of Tennessee Research Foundation. All rights
66
* reserved.
@@ -30,7 +30,7 @@ ompi_comm_rank_failure_callback_t *ompi_rank_failure_cbfunc = NULL;
3030
* The handling of known failed processes is based on a two level process. On one
3131
* side the MPI library itself must know the failed processes (in order to be able
3232
* to correctly handle complex operations such as shrink). On the other side, the
33-
* failed processes acknowledged by the users shuould not be altered during any of
33+
* failed processes acknowledged by the users should not be altered during any of
3434
* the internal calls, as they must only be updated upon user request.
3535
* Thus, the global list (ompi_group_all_failed_procs) is the list of all known
3636
* failed processes (by the MPI library internals), and it is allegedly updated
@@ -39,6 +39,7 @@ ompi_comm_rank_failure_callback_t *ompi_rank_failure_cbfunc = NULL;
3939
* in the context of a communicator. Thus, using a single index to know the user-level
4040
* acknowledged failure is the simplest solution.
4141
*/
42+
/* deprecated ulfm v1 API */
4243
int ompi_comm_failure_ack_internal(ompi_communicator_t* comm)
4344
{
4445
opal_mutex_lock(&ompi_group_afp_mutex);
@@ -49,11 +50,13 @@ int ompi_comm_failure_ack_internal(ompi_communicator_t* comm)
4950
/* use the AFP lock implicit memory barrier to propagate the update to
5051
* any_source_enabled at the same time.
5152
*/
53+
comm->num_acked = -1; /* compat with v2 API: force recompute next time ack_failed is called */
5254
opal_mutex_unlock(&ompi_group_afp_mutex);
5355

5456
return OMPI_SUCCESS;
5557
}
5658

59+
/* deprecated ulfm v1 API; used internally in MPI_COMM_AGREE as well */
5760
int ompi_comm_failure_get_acked_internal(ompi_communicator_t* comm, ompi_group_t **group )
5861
{
5962
int ret, exit_status = OMPI_SUCCESS;
@@ -114,6 +117,105 @@ int ompi_comm_failure_get_acked_internal(ompi_communicator_t* comm, ompi_group_t
114117
return exit_status;
115118
}
116119

120+
/* New v2 interface get_failed/ack_failed.
121+
* This interface uses a cached value comm->num_acked to track how many
122+
* processes in the group of this comm have been acknowledged in prior calls.
123+
* For compatibility with v1 interface (failure_get_acked), it still updates
124+
* the comm->any_source_offset, and the v1 interface failure_ack may erase the
125+
* cached value comm->num_acked with -1 to force recomputing this value in mixed
126+
* use cases (that is calling failure_ack will force full recomputation of
127+
* comm->num_acked during the next ack_failed call). */
128+
int ompi_comm_ack_failed_internal(ompi_communicator_t* comm, int num_to_ack, int *num_acked) {
129+
int ret, exit_status = MPI_SUCCESS;
130+
int nf = -1, na = -1;
131+
ompi_group_t *c_group = OMPI_COMM_IS_INTER(comm)? comm->c_local_group: comm->c_remote_group;
132+
ompi_group_t *failed_group = NULL;
133+
134+
opal_mutex_lock(&ompi_group_afp_mutex);
135+
136+
/* shortcut when reading only */
137+
if(num_to_ack <= comm->num_acked)
138+
goto return_num_acked;
139+
140+
/* shortcut when no new faults */
141+
if(comm->any_source_offset == ompi_group_size(ompi_group_all_failed_procs)
142+
&& comm->num_acked >= 0 /* reset by a call to v1 API? */)
143+
goto return_num_acked;
144+
145+
/* compute num_acked */
146+
ret = ompi_group_intersection(ompi_group_all_failed_procs,
147+
c_group, &failed_group);
148+
if(OMPI_SUCCESS != ret) {
149+
exit_status = ret;
150+
goto cleanup;
151+
}
152+
nf = ompi_group_size(failed_group);
153+
na = (num_to_ack < nf)? num_to_ack: nf; /* never ack more than requested */
154+
155+
if(comm->num_acked < 0) { /* reset by a call to the v1 API: recompute it */
156+
if(0 == comm->any_source_offset) comm->num_acked = 0;
157+
else {
158+
int aso = comm->any_source_offset - 1;
159+
ret = ompi_group_translate_ranks(ompi_group_all_failed_procs, 1, &aso,
160+
failed_group, &comm->num_acked);
161+
comm->num_acked++; /* make it a group size again */
162+
if(OMPI_SUCCESS != ret) {
163+
exit_status = ret;
164+
goto cleanup;
165+
}
166+
}
167+
}
168+
169+
if(comm->num_acked < na) { /* comm->num_acked needs to be updated during this call */
170+
comm->num_acked = na;
171+
if(nf == na) {
172+
/* all faults on comm acknowledged, resume any source then */
173+
comm->any_source_enabled = true;
174+
comm->any_source_offset = ompi_group_size(ompi_group_all_failed_procs); // compat with v1 interface
175+
}
176+
else {
177+
/* some faults not acknowledged, do not resume any source then, but
178+
* still update any_source_offset */
179+
assert(comm->num_acked > 0);
180+
int cna = comm->num_acked - 1;
181+
ret = ompi_group_translate_ranks(failed_group, 1, &cna,
182+
ompi_group_all_failed_procs, &comm->any_source_offset); // compat with v1 interface
183+
comm->any_source_offset++; /* make it a group size again */
184+
if(OMPI_SUCCESS != ret) {
185+
exit_status = ret;
186+
goto cleanup;
187+
}
188+
}
189+
}
190+
191+
return_num_acked:
192+
*num_acked = comm->num_acked;
193+
194+
cleanup:
195+
if(NULL != failed_group) OBJ_RELEASE(failed_group);
196+
/* use the AFP lock implicit memory barrier to propagate the update to
197+
* any_source_enabled, num_acked, etc. at the same time.
198+
*/
199+
opal_mutex_unlock(&ompi_group_afp_mutex);
200+
201+
return exit_status;
202+
}
203+
204+
int ompi_comm_get_failed_internal(ompi_communicator_t* comm, ompi_group_t **group)
205+
{
206+
int ret, exit_status = OMPI_SUCCESS;
207+
ompi_group_t *c_group = OMPI_COMM_IS_INTER(comm)? comm->c_local_group: comm->c_remote_group;
208+
opal_mutex_lock(&ompi_group_afp_mutex);
209+
ret = ompi_group_intersection(ompi_group_all_failed_procs,
210+
c_group,
211+
group);
212+
opal_mutex_unlock(&ompi_group_afp_mutex);
213+
if( OMPI_SUCCESS != ret ) {
214+
exit_status = ret;
215+
}
216+
return exit_status;
217+
}
218+
117219
int ompi_comm_shrink_internal(ompi_communicator_t* comm, ompi_communicator_t** newcomm)
118220
{
119221
int ret, exit_status = OMPI_SUCCESS;

ompi/mca/coll/han/coll_han_subcomms.c

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ int mca_coll_han_comm_create_new(struct ompi_communicator_t *comm,
5252
ompi_communicator_t **low_comm = &(han_module->sub_comm[INTRA_NODE]);
5353
ompi_communicator_t **up_comm = &(han_module->sub_comm[INTER_NODE]);
5454
mca_coll_han_collectives_fallback_t fallbacks;
55-
int vrank, *vranks;
55+
int rc = OMPI_SUCCESS, vrank, *vranks;
5656
opal_info_t comm_info;
5757

5858
/* The sub communicators have already been created */
@@ -91,9 +91,12 @@ int mca_coll_han_comm_create_new(struct ompi_communicator_t *comm,
9191
* all participants.
9292
*/
9393
int local_procs = ompi_group_count_local_peers(comm->c_local_group);
94-
comm->c_coll->coll_allreduce(MPI_IN_PLACE, &local_procs, 1, MPI_INT,
95-
MPI_MAX, comm,
96-
comm->c_coll->coll_allreduce_module);
94+
rc = comm->c_coll->coll_allreduce(MPI_IN_PLACE, &local_procs, 1, MPI_INT,
95+
MPI_MAX, comm,
96+
comm->c_coll->coll_allreduce_module);
97+
if( OMPI_SUCCESS != rc ) {
98+
goto return_with_error;
99+
}
97100
if( local_procs == 1 ) {
98101
/* restore saved collectives */
99102
HAN_SUBCOM_LOAD_COLLECTIVE(fallbacks, comm, han_module, allgatherv);
@@ -118,8 +121,12 @@ int mca_coll_han_comm_create_new(struct ompi_communicator_t *comm,
118121
*/
119122
opal_info_set(&comm_info, "ompi_comm_coll_preference", "han");
120123
opal_info_set(&comm_info, "ompi_comm_coll_han_topo_level", "INTRA_NODE");
121-
ompi_comm_split_type(comm, MPI_COMM_TYPE_SHARED, 0,
122-
&comm_info, low_comm);
124+
rc = ompi_comm_split_type(comm, MPI_COMM_TYPE_SHARED, 0,
125+
&comm_info, low_comm);
126+
if( OMPI_SUCCESS != rc ) {
127+
/* cannot create subcommunicators. Return the error upstream */
128+
goto return_with_error;
129+
}
123130

124131
/*
125132
* Get my local rank and the local size
@@ -132,7 +139,11 @@ int mca_coll_han_comm_create_new(struct ompi_communicator_t *comm,
132139
* same intra-node rank id share such a sub-communicator
133140
*/
134141
opal_info_set(&comm_info, "ompi_comm_coll_han_topo_level", "INTER_NODE");
135-
ompi_comm_split_with_info(comm, low_rank, w_rank, &comm_info, up_comm, false);
142+
rc = ompi_comm_split_with_info(comm, low_rank, w_rank, &comm_info, up_comm, false);
143+
if( OMPI_SUCCESS != rc ) {
144+
/* cannot create subcommunicators. Return the error upstream */
145+
goto return_with_error;
146+
}
136147

137148
up_rank = ompi_comm_rank(*up_comm);
138149

@@ -150,14 +161,13 @@ int mca_coll_han_comm_create_new(struct ompi_communicator_t *comm,
150161
* gather vrank from each process so every process will know other processes
151162
* vrank
152163
*/
153-
comm->c_coll->coll_allgather(&vrank,
154-
1,
155-
MPI_INT,
156-
vranks,
157-
1,
158-
MPI_INT,
159-
comm,
160-
comm->c_coll->coll_allgather_module);
164+
rc = comm->c_coll->coll_allgather(&vrank, 1, MPI_INT,
165+
vranks, 1, MPI_INT,
166+
comm, comm->c_coll->coll_allgather_module);
167+
if( OMPI_SUCCESS != rc ) {
168+
/* cannot create subcommunicators. Return the error upstream */
169+
goto return_with_error;
170+
}
161171

162172
/*
163173
* Set the cached info
@@ -175,6 +185,17 @@ int mca_coll_han_comm_create_new(struct ompi_communicator_t *comm,
175185

176186
OBJ_DESTRUCT(&comm_info);
177187
return OMPI_SUCCESS;
188+
189+
return_with_error:
190+
if( NULL != *low_comm ) {
191+
ompi_comm_free(low_comm);
192+
*low_comm = NULL; /* don't leave the MPI_COMM_NULL set by ompi_comm_free */
193+
}
194+
if( NULL != *up_comm ) {
195+
ompi_comm_free(up_comm);
196+
*up_comm = NULL; /* don't leave the MPI_COMM_NULL set by ompi_comm_free */
197+
}
198+
return rc;
178199
}
179200

180201
/*

ompi/mpiext/ftmpi/c/Makefile.am

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#
22
# Copyright (c) 2010-2012 Oak Ridge National Labs. All rights reserved.
3-
# Copyright (c) 2016-2018 The University of Tennessee and The University
3+
# Copyright (c) 2016-2022 The University of Tennessee and The University
44
# of Tennessee Research Foundation. All rights
55
# reserved.
66
# $COPYRIGHT$
@@ -29,6 +29,8 @@ libmpiext_ftmpi_c_la_SOURCES = \
2929
comm_shrink.c \
3030
comm_failure_ack.c \
3131
comm_failure_get_acked.c \
32+
comm_get_failed.c \
33+
comm_ack_failed.c \
3234
comm_agree.c \
3335
comm_iagree.c
3436
libmpiext_ftmpi_c_la_LIBADD = \

ompi/mpiext/ftmpi/c/comm_ack_failed.c

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright (c) 2022 The University of Tennessee and The University
3+
* of Tennessee Research Foundation. All rights
4+
* reserved.
5+
* $COPYRIGHT$
6+
*
7+
* Additional copyrights may follow
8+
*
9+
* $HEADER$
10+
*/
11+
#include "ompi_config.h"
12+
13+
#include "ompi/mpi/c/bindings.h"
14+
#include "ompi/runtime/params.h"
15+
#include "ompi/communicator/communicator.h"
16+
#include "ompi/proc/proc.h"
17+
#include "ompi/errhandler/errhandler.h"
18+
19+
#include "ompi/mpiext/ftmpi/c/mpiext_ftmpi_c.h"
20+
21+
#if OMPI_BUILD_MPI_PROFILING
22+
#if OPAL_HAVE_WEAK_SYMBOLS
23+
#pragma weak MPIX_Comm_ack_failed = PMPIX_Comm_ack_failed
24+
#endif
25+
#define MPIX_Comm_ack_failed PMPIX_Comm_ack_failed
26+
#endif
27+
28+
static const char FUNC_NAME[] = "MPIX_Comm_ack_failed";
29+
30+
31+
int MPIX_Comm_ack_failed(MPI_Comm comm, int num_to_ack, int* num_acked)
32+
{
33+
int rc = MPI_SUCCESS;
34+
35+
/* Argument checking */
36+
if (MPI_PARAM_CHECK) {
37+
OMPI_ERR_INIT_FINALIZE(FUNC_NAME);
38+
if (ompi_comm_invalid(comm)) {
39+
return OMPI_ERRHANDLER_INVOKE(MPI_COMM_WORLD, MPI_ERR_COMM, FUNC_NAME);
40+
}
41+
if (num_to_ack > ompi_comm_size(comm)) {
42+
rc = MPI_ERR_ARG;
43+
}
44+
OMPI_ERRHANDLER_CHECK(rc, comm, rc, FUNC_NAME);
45+
}
46+
47+
rc = ompi_comm_ack_failed_internal( (ompi_communicator_t*)comm, num_to_ack, num_acked );
48+
OMPI_ERRHANDLER_RETURN(rc, comm, rc, FUNC_NAME);
49+
}
50+

ompi/mpiext/ftmpi/c/comm_get_failed.c

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright (c) 2022 The University of Tennessee and The University
3+
* of Tennessee Research Foundation. All rights
4+
* reserved.
5+
* $COPYRIGHT$
6+
*
7+
* Additional copyrights may follow
8+
*
9+
* $HEADER$
10+
*/
11+
#include "ompi_config.h"
12+
13+
#include "ompi/mpi/c/bindings.h"
14+
#include "ompi/runtime/params.h"
15+
#include "ompi/communicator/communicator.h"
16+
#include "ompi/proc/proc.h"
17+
18+
#include "ompi/mpiext/ftmpi/c/mpiext_ftmpi_c.h"
19+
20+
#if OMPI_BUILD_MPI_PROFILING
21+
#if OPAL_HAVE_WEAK_SYMBOLS
22+
#pragma weak MPIX_Comm_get_failed = PMPIX_Comm_get_failed
23+
#endif
24+
#define MPIX_Comm_get_failed PMPIX_Comm_get_failed
25+
#endif
26+
27+
static const char FUNC_NAME[] = "MPIX_Comm_get_failed";
28+
29+
int MPIX_Comm_get_failed(MPI_Comm comm, MPI_Group *failedgrp)
30+
{
31+
int rc = MPI_SUCCESS;
32+
33+
/* Argument checking */
34+
if (MPI_PARAM_CHECK) {
35+
OMPI_ERR_INIT_FINALIZE(FUNC_NAME);
36+
if (ompi_comm_invalid(comm)) {
37+
return OMPI_ERRHANDLER_INVOKE(MPI_COMM_WORLD, MPI_ERR_COMM, FUNC_NAME);
38+
}
39+
OMPI_ERRHANDLER_CHECK(rc, comm, rc, FUNC_NAME);
40+
}
41+
42+
rc = ompi_comm_get_failed_internal( (ompi_communicator_t*)comm, (ompi_group_t**)failedgrp );
43+
if( OMPI_SUCCESS != rc ) {
44+
OMPI_ERRHANDLER_RETURN(rc, comm, rc, FUNC_NAME);
45+
}
46+
47+
return MPI_SUCCESS;
48+
}
49+

0 commit comments

Comments
 (0)