Skip to content

Commit 8fe26b4

Browse files
committed
ulfm: new functions comm_get_failed, comm_ack_failed C bindings
Add error management to ack_failed_internal Use comm->num_acked to decide if any_source should be re-enabled More compatibility with mixed use of failure_ack and ack_failed Additional shortcut: bypass computing num_acked when no new faults Recompute num_acked when the v1 interface got used inbetween Signed-off-by: Aurelien Bouteiller <bouteill@icl.utk.edu> Copyrights The group needs to be initialized before the first goto. Signed-off-by: George Bosilca <bosilca@icl.utk.edu> Add #defines for the new API. Signed-off-by: George Bosilca <bosilca@icl.utk.edu>
1 parent a73399e commit 8fe26b4

File tree

8 files changed

+230
-4
lines changed

8 files changed

+230
-4
lines changed

ompi/communicator/comm_init.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,7 @@ static void ompi_comm_construct(ompi_communicator_t* comm)
451451
#if OPAL_ENABLE_FT_MPI
452452
comm->any_source_enabled = true;
453453
comm->any_source_offset = 0;
454+
comm->num_acked = 0;
454455
comm->comm_revoked = false;
455456
comm->coll_revoked = false;
456457
comm->c_epoch = 0;

ompi/communicator/communicator.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,8 @@ struct ompi_communicator_t {
329329
#if OPAL_ENABLE_FT_MPI
330330
/** agreement caching info for topology and previous returned decisions */
331331
opal_object_t *agreement_specific;
332+
/** num_acked - OMPI_Comm_ack_failed */
333+
int num_acked;
332334
/** MPI_ANY_SOURCE Failed Group Offset - OMPI_Comm_failure_get_acked */
333335
int any_source_offset;
334336
/** Are MPI_ANY_SOURCE operations enabled? - OMPI_Comm_failure_ack */
@@ -598,6 +600,17 @@ static inline bool ompi_comm_is_revoked(ompi_communicator_t* comm)
598600
return (comm->comm_revoked);
599601
}
600602

603+
/*
604+
* Obtain the group of locally known failed processes on comm
605+
*/
606+
OMPI_DECLSPEC int ompi_comm_get_failed_internal(ompi_communicator_t* comm, ompi_group_t** failed_group);
607+
608+
/*
609+
* Acknowledge failures and re-enable MPI_ANY_SOURCE
610+
* Related to OMPI_Comm_failure_ack() and OMPI_Comm_failure_get_acked()
611+
*/
612+
OMPI_DECLSPEC int ompi_comm_ack_failed_internal(ompi_communicator_t* comm, int num_to_ack, int *num_acked);
613+
601614
/*
602615
* Acknowledge failures and re-enable MPI_ANY_SOURCE
603616
* Related to OMPI_Comm_failure_ack() and OMPI_Comm_failure_get_acked()

ompi/communicator/ft/comm_ft.c

Lines changed: 104 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/*
22
* Copyright (c) 2010-2012 Oak Ridge National Labs. All rights reserved.
3-
* Copyright (c) 2011-2020 The University of Tennessee and The University
3+
* Copyright (c) 2011-2022 The University of Tennessee and The University
44
*
55
* of Tennessee Research Foundation. All rights
66
* reserved.
@@ -30,7 +30,7 @@ ompi_comm_rank_failure_callback_t *ompi_rank_failure_cbfunc = NULL;
3030
* The handling of known failed processes is based on a two level process. On one
3131
* side the MPI library itself must know the failed processes (in order to be able
3232
* to correctly handle complex operations such as shrink). On the other side, the
33-
* failed processes acknowledged by the users shuould not be altered during any of
33+
* failed processes acknowledged by the users should not be altered during any of
3434
* the internal calls, as they must only be updated upon user request.
3535
* Thus, the global list (ompi_group_all_failed_procs) is the list of all known
3636
* failed processes (by the MPI library internals), and it is allegedly updated
@@ -39,6 +39,7 @@ ompi_comm_rank_failure_callback_t *ompi_rank_failure_cbfunc = NULL;
3939
* in the context of a communicator. Thus, using a single index to know the user-level
4040
* acknowledged failure is the simplest solution.
4141
*/
42+
/* deprecated ulfm v1 API */
4243
int ompi_comm_failure_ack_internal(ompi_communicator_t* comm)
4344
{
4445
opal_mutex_lock(&ompi_group_afp_mutex);
@@ -49,11 +50,13 @@ int ompi_comm_failure_ack_internal(ompi_communicator_t* comm)
4950
/* use the AFP lock implicit memory barrier to propagate the update to
5051
* any_source_enabled at the same time.
5152
*/
53+
comm->num_acked = -1; /* compat with v2 API: force recompute next time ack_failed is called */
5254
opal_mutex_unlock(&ompi_group_afp_mutex);
5355

5456
return OMPI_SUCCESS;
5557
}
5658

59+
/* deprecated ulfm v1 API; used internally in MPI_COMM_AGREE as well */
5760
int ompi_comm_failure_get_acked_internal(ompi_communicator_t* comm, ompi_group_t **group )
5861
{
5962
int ret, exit_status = OMPI_SUCCESS;
@@ -114,6 +117,105 @@ int ompi_comm_failure_get_acked_internal(ompi_communicator_t* comm, ompi_group_t
114117
return exit_status;
115118
}
116119

120+
/* New v2 interface get_failed/ack_failed.
121+
* This interface uses a cached value comm->num_acked to track how many
122+
* processes in the group of this comm have been acknowledged in prior calls.
123+
* For compatibility with v1 interface (failure_get_acked), it still updates
124+
* the comm->any_source_offset, and the v1 interface failure_ack may erase the
125+
* cached value comm->num_acked with -1 to force recomputing this value in mixed
126+
* use cases (that is calling failure_ack will force full recomputation of
127+
* comm->num_acked during the next ack_failed call). */
128+
int ompi_comm_ack_failed_internal(ompi_communicator_t* comm, int num_to_ack, int *num_acked) {
129+
int ret, exit_status = MPI_SUCCESS;
130+
int nf = -1, na = -1;
131+
ompi_group_t *c_group = OMPI_COMM_IS_INTER(comm)? comm->c_local_group: comm->c_remote_group;
132+
ompi_group_t *failed_group = NULL;
133+
134+
opal_mutex_lock(&ompi_group_afp_mutex);
135+
136+
/* shortcut when reading only */
137+
if(num_to_ack <= comm->num_acked)
138+
goto return_num_acked;
139+
140+
/* shortcut when no new faults */
141+
if(comm->any_source_offset == ompi_group_size(ompi_group_all_failed_procs)
142+
&& comm->num_acked >= 0 /* reset by a call to v1 API? */)
143+
goto return_num_acked;
144+
145+
/* compute num_acked */
146+
ret = ompi_group_intersection(ompi_group_all_failed_procs,
147+
c_group, &failed_group);
148+
if(OMPI_SUCCESS != ret) {
149+
exit_status = ret;
150+
goto cleanup;
151+
}
152+
nf = ompi_group_size(failed_group);
153+
na = (num_to_ack < nf)? num_to_ack: nf; /* never ack more than requested */
154+
155+
if(comm->num_acked < 0) { /* reset by a call to the v1 API: recompute it */
156+
if(0 == comm->any_source_offset) comm->num_acked = 0;
157+
else {
158+
int aso = comm->any_source_offset - 1;
159+
ret = ompi_group_translate_ranks(ompi_group_all_failed_procs, 1, &aso,
160+
failed_group, &comm->num_acked);
161+
comm->num_acked++; /* make it a group size again */
162+
if(OMPI_SUCCESS != ret) {
163+
exit_status = ret;
164+
goto cleanup;
165+
}
166+
}
167+
}
168+
169+
if(comm->num_acked < na) { /* comm->num_acked needs to be updated during this call */
170+
comm->num_acked = na;
171+
if(nf == na) {
172+
/* all faults on comm acknowledged, resume any source then */
173+
comm->any_source_enabled = true;
174+
comm->any_source_offset = ompi_group_size(ompi_group_all_failed_procs); // compat with v1 interface
175+
}
176+
else {
177+
/* some faults not acknowledged, do not resume any source then, but
178+
* still update any_source_offset */
179+
assert(comm->num_acked > 0);
180+
int cna = comm->num_acked - 1;
181+
ret = ompi_group_translate_ranks(failed_group, 1, &cna,
182+
ompi_group_all_failed_procs, &comm->any_source_offset); // compat with v1 interface
183+
comm->any_source_offset++; /* make it a group size again */
184+
if(OMPI_SUCCESS != ret) {
185+
exit_status = ret;
186+
goto cleanup;
187+
}
188+
}
189+
}
190+
191+
return_num_acked:
192+
*num_acked = comm->num_acked;
193+
194+
cleanup:
195+
if(NULL != failed_group) OBJ_RELEASE(failed_group);
196+
/* use the AFP lock implicit memory barrier to propagate the update to
197+
* any_source_enabled, num_acked, etc. at the same time.
198+
*/
199+
opal_mutex_unlock(&ompi_group_afp_mutex);
200+
201+
return exit_status;
202+
}
203+
204+
int ompi_comm_get_failed_internal(ompi_communicator_t* comm, ompi_group_t **group)
205+
{
206+
int ret, exit_status = OMPI_SUCCESS;
207+
ompi_group_t *c_group = OMPI_COMM_IS_INTER(comm)? comm->c_local_group: comm->c_remote_group;
208+
opal_mutex_lock(&ompi_group_afp_mutex);
209+
ret = ompi_group_intersection(ompi_group_all_failed_procs,
210+
c_group,
211+
group);
212+
opal_mutex_unlock(&ompi_group_afp_mutex);
213+
if( OMPI_SUCCESS != ret ) {
214+
exit_status = ret;
215+
}
216+
return exit_status;
217+
}
218+
117219
int ompi_comm_shrink_internal(ompi_communicator_t* comm, ompi_communicator_t** newcomm)
118220
{
119221
int ret, exit_status = OMPI_SUCCESS;

ompi/mpiext/ftmpi/c/Makefile.am

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#
22
# Copyright (c) 2010-2012 Oak Ridge National Labs. All rights reserved.
3-
# Copyright (c) 2016-2018 The University of Tennessee and The University
3+
# Copyright (c) 2016-2022 The University of Tennessee and The University
44
# of Tennessee Research Foundation. All rights
55
# reserved.
66
# $COPYRIGHT$
@@ -29,6 +29,8 @@ libmpiext_ftmpi_c_la_SOURCES = \
2929
comm_shrink.c \
3030
comm_failure_ack.c \
3131
comm_failure_get_acked.c \
32+
comm_get_failed.c \
33+
comm_ack_failed.c \
3234
comm_agree.c \
3335
comm_iagree.c
3436
libmpiext_ftmpi_c_la_LIBADD = \

ompi/mpiext/ftmpi/c/comm_ack_failed.c

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright (c) 2022 The University of Tennessee and The University
3+
* of Tennessee Research Foundation. All rights
4+
* reserved.
5+
* $COPYRIGHT$
6+
*
7+
* Additional copyrights may follow
8+
*
9+
* $HEADER$
10+
*/
11+
#include "ompi_config.h"
12+
13+
#include "ompi/mpi/c/bindings.h"
14+
#include "ompi/runtime/params.h"
15+
#include "ompi/communicator/communicator.h"
16+
#include "ompi/proc/proc.h"
17+
#include "ompi/errhandler/errhandler.h"
18+
19+
#include "ompi/mpiext/ftmpi/c/mpiext_ftmpi_c.h"
20+
21+
#if OMPI_BUILD_MPI_PROFILING
22+
#if OPAL_HAVE_WEAK_SYMBOLS
23+
#pragma weak MPIX_Comm_ack_failed = PMPIX_Comm_ack_failed
24+
#endif
25+
#define MPIX_Comm_ack_failed PMPIX_Comm_ack_failed
26+
#endif
27+
28+
static const char FUNC_NAME[] = "MPIX_Comm_ack_failed";
29+
30+
31+
int MPIX_Comm_ack_failed(MPI_Comm comm, int num_to_ack, int* num_acked)
32+
{
33+
int rc = MPI_SUCCESS;
34+
35+
/* Argument checking */
36+
if (MPI_PARAM_CHECK) {
37+
OMPI_ERR_INIT_FINALIZE(FUNC_NAME);
38+
if (ompi_comm_invalid(comm)) {
39+
return OMPI_ERRHANDLER_INVOKE(MPI_COMM_WORLD, MPI_ERR_COMM, FUNC_NAME);
40+
}
41+
if (num_to_ack > ompi_comm_size(comm)) {
42+
rc = MPI_ERR_ARG;
43+
}
44+
OMPI_ERRHANDLER_CHECK(rc, comm, rc, FUNC_NAME);
45+
}
46+
47+
rc = ompi_comm_ack_failed_internal( (ompi_communicator_t*)comm, num_to_ack, num_acked );
48+
OMPI_ERRHANDLER_RETURN(rc, comm, rc, FUNC_NAME);
49+
}
50+

ompi/mpiext/ftmpi/c/comm_get_failed.c

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright (c) 2022 The University of Tennessee and The University
3+
* of Tennessee Research Foundation. All rights
4+
* reserved.
5+
* $COPYRIGHT$
6+
*
7+
* Additional copyrights may follow
8+
*
9+
* $HEADER$
10+
*/
11+
#include "ompi_config.h"
12+
13+
#include "ompi/mpi/c/bindings.h"
14+
#include "ompi/runtime/params.h"
15+
#include "ompi/communicator/communicator.h"
16+
#include "ompi/proc/proc.h"
17+
18+
#include "ompi/mpiext/ftmpi/c/mpiext_ftmpi_c.h"
19+
20+
#if OMPI_BUILD_MPI_PROFILING
21+
#if OPAL_HAVE_WEAK_SYMBOLS
22+
#pragma weak MPIX_Comm_get_failed = PMPIX_Comm_get_failed
23+
#endif
24+
#define MPIX_Comm_get_failed PMPIX_Comm_get_failed
25+
#endif
26+
27+
static const char FUNC_NAME[] = "MPIX_Comm_get_failed";
28+
29+
int MPIX_Comm_get_failed(MPI_Comm comm, MPI_Group *failedgrp)
30+
{
31+
int rc = MPI_SUCCESS;
32+
33+
/* Argument checking */
34+
if (MPI_PARAM_CHECK) {
35+
OMPI_ERR_INIT_FINALIZE(FUNC_NAME);
36+
if (ompi_comm_invalid(comm)) {
37+
return OMPI_ERRHANDLER_INVOKE(MPI_COMM_WORLD, MPI_ERR_COMM, FUNC_NAME);
38+
}
39+
OMPI_ERRHANDLER_CHECK(rc, comm, rc, FUNC_NAME);
40+
}
41+
42+
rc = ompi_comm_get_failed_internal( (ompi_communicator_t*)comm, (ompi_group_t**)failedgrp );
43+
if( OMPI_SUCCESS != rc ) {
44+
OMPI_ERRHANDLER_RETURN(rc, comm, rc, FUNC_NAME);
45+
}
46+
47+
return MPI_SUCCESS;
48+
}
49+

ompi/mpiext/ftmpi/c/mpiext_ftmpi_c.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ OMPI_DECLSPEC int MPIX_Comm_is_revoked(MPI_Comm comm, int *flag);
2727
OMPI_DECLSPEC int MPIX_Comm_shrink(MPI_Comm comm, MPI_Comm *newcomm);
2828
OMPI_DECLSPEC int MPIX_Comm_failure_ack(MPI_Comm comm);
2929
OMPI_DECLSPEC int MPIX_Comm_failure_get_acked(MPI_Comm comm, MPI_Group *failedgrp);
30+
OMPI_DECLSPEC int MPIX_Comm_get_failed(MPI_Comm comm, MPI_Group *failedgroup);
31+
OMPI_DECLSPEC int MPIX_Comm_ack_failed(MPI_Comm comm, int num_to_ack, int *num_acked);
3032
OMPI_DECLSPEC int MPIX_Comm_agree(MPI_Comm comm, int *flag);
3133
OMPI_DECLSPEC int MPIX_Comm_iagree(MPI_Comm comm, int *flag, MPI_Request *request);
3234

@@ -35,9 +37,14 @@ OMPI_DECLSPEC int PMPIX_Comm_is_revoked(MPI_Comm comm, int *flag);
3537
OMPI_DECLSPEC int PMPIX_Comm_shrink(MPI_Comm comm, MPI_Comm *newcomm);
3638
OMPI_DECLSPEC int PMPIX_Comm_failure_ack(MPI_Comm comm);
3739
OMPI_DECLSPEC int PMPIX_Comm_failure_get_acked(MPI_Comm comm, MPI_Group *failedgrp);
40+
OMPI_DECLSPEC int PMPIX_Comm_get_failed(MPI_Comm comm, MPI_Group *failedgroup);
41+
OMPI_DECLSPEC int PMPIX_Comm_ack_failed(MPI_Comm comm, int num_to_ack, int *num_acked);
3842
OMPI_DECLSPEC int PMPIX_Comm_agree(MPI_Comm comm, int *flag);
3943
OMPI_DECLSPEC int PMPIX_Comm_iagree(MPI_Comm comm, int *flag, MPI_Request *request);
4044

4145
#include <stdbool.h>
4246
OMPI_DECLSPEC int OMPI_Comm_failure_inject(MPI_Comm comm, bool notify);
47+
/* Provide defines to facilitate the detection of the new API */
48+
#define OMPI_HAVE_MPIX_COMM_GET_FAILED 1
49+
#define OMPI_HAVE_MPIX_COMM_ACK_FAILED 1
4350

ompi/mpiext/ftmpi/c/profile/Makefile.am

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#
2-
# Copyright (c) 2016-2018 The University of Tennessee and The University
2+
# Copyright (c) 2016-2022 The University of Tennessee and The University
33
# of Tennessee Research Foundation. All rights
44
# reserved.
55
# Copyright (c) 2021 Cisco Systems, Inc. All rights reserved.
@@ -21,6 +21,8 @@ nodist_libpmpiext_ftmpi_c_la_SOURCES = \
2121
pcomm_revoke.c \
2222
pcomm_is_revoked.c \
2323
pcomm_shrink.c \
24+
pcomm_get_failed.c \
25+
pcomm_ack_failed.c \
2426
pcomm_failure_ack.c \
2527
pcomm_failure_get_acked.c \
2628
pcomm_agree.c \

0 commit comments

Comments
 (0)