Skip to content

Commit 57bb6dc

Browse files
authored
Merge pull request open-mpi#11864 from wenduwan/topo_aware_coll_comm
communicator: introduce OMPI_COMM_DISJOINT flag
2 parents f0c69b7 + b02555e commit 57bb6dc

File tree

5 files changed

+77
-7
lines changed

5 files changed

+77
-7
lines changed

contrib/check_unnecessary_headers.sh

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
# Copyright (c) 2004-2005 The Regents of the University of California.
1212
# All rights reserved.
1313
# Copyright (c) 2009 Oak Ridge National Labs. All rights reserved.
14-
# Copyright (c) 2022 Amazon.com, Inc. or its affiliates.
14+
# Copyright (c) Amazon.com, Inc. or its affiliates.
1515
# All Rights reserved.
1616
#
1717
#
@@ -181,8 +181,8 @@ SEARCH_HEADER[0]="ompi/attribute/attribute.h ATTR_HASH_SIZE OMPI_KEYVAL_PREDEFIN
181181
SEARCH_HEADER[1]="ompi/class/ompi_free_list.h ompi_free_list_item_init_fn_t ompi_free_list_t ompi_free_list_item_t ompi_free_list_init_ex ompi_free_list_init ompi_free_list_init_ex_new ompi_free_list_init_new ompi_free_list_grow ompi_free_list_resize ompi_free_list_pos_t OMPI_FREE_LIST_POS_BEGINNING ompi_free_list_parse OMPI_FREE_LIST_GET OMPI_FREE_LIST_WAIT __ompi_free_list_wait OMPI_FREE_LIST_RETURN"
182182
SEARCH_HEADER[2]="ompi/class/ompi_rb_tree.h ompi_rb_tree_nodecolor_t ompi_rb_tree_node_t ompi_rb_tree_comp_fn_t ompi_rb_tree_t ompi_rb_tree_condition_fn_t ompi_rb_tree_action_fn_t ompi_rb_tree_construct ompi_rb_tree_destruct ompi_rb_tree_init ompi_rb_tree_insert ompi_rb_tree_find_with ompi_rb_tree_find ompi_rb_tree_delete ompi_rb_tree_destroy ompi_rb_tree_traverse ompi_rb_tree_size"
183183
SEARCH_HEADER[3]="ompi/class/ompi_seq_tracker.h ompi_seq_tracker_range_t ompi_seq_tracker_t ompi_seq_tracker_check_duplicate ompi_seq_tracker_insert ompi_seq_tracker_copy"
184-
SEARCH_HEADER[4]="ompi/communicator/communicator.h MPI_Comm MPI_COMM_WORLD ompi_communicator_t OMPI_COMM_INTER OMPI_COMM_CART OMPI_COMM_GRAPH OMPI_COMM_NAMEISSET OMPI_COMM_ISFREED OMPI_COMM_INTRINSIC OMPI_COMM_DYNAMIC OMPI_COMM_INVALID OMPI_COMM_PML_ADDED OMPI_COMM_IS_ OMPI_COMM_SET_ OMPI_COMM_ALLGATHER_TAG OMPI_COMM_BARRIER_TAG OMPI_COMM_ALLREDUCE_TAG OMPI_COMM_CID_ OMPI_COMM_BLOCK_ ompi_predefined_communicator_t ompi_mpi_comm_parent ompi_mpi_comm_world ompi_mpi_comm_self ompi_mpi_comm_null ompi_comm_invalid ompi_comm_rank ompi_comm_size ompi_comm_remote_size ompi_comm_get_cid ompi_comm_lookup ompi_comm_peer_lookup ompi_comm_peer_invalid ompi_comm_init ompi_comm_link_function ompi_comm_group ompi_comm_create ompi_topo_create ompi_comm_split ompi_comm_dup ompi_comm_compare ompi_comm_free ompi_comm_allocate ompi_comm_nextcid ompi_comm_finalize ompi_comm_set ompi_comm_get_rprocs ompi_comm_overlapping_groups ompi_comm_determine_first ompi_comm_activate ompi_comm_dump ompi_comm_set_name ompi_comm_reg_init ompi_comm_reg_finalize ompi_comm_num_dyncomm ompi_mpi_cxx_comm_errhandler_invoke"
185-
SEARCH_HEADER[5]="ompi/datatype/convertor.h OMPI_COMM_INTER OMPI_COMM_CART OMPI_COMM_GRAPH OMPI_COMM_NAMEISSET OMPI_COMM_ISFREED OMPI_COMM_INTRINSIC OMPI_COMM_DYNAMIC OMPI_COMM_INVALID OMPI_COMM_PML_ADDED OMPI_COMM_IS_ OMPI_COMM_SET_ OMPI_COMM_ALLGATHER_TAG OMPI_COMM_BARRIER_TAG OMPI_COMM_ALLREDUCE_TAG OMPI_COMM_CID_ OMPI_COMM_BLOCK_ ompi_predefined_communicator_t ompi_mpi_comm_parent ompi_mpi_comm_null ompi_comm_invalid ompi_comm_rank ompi_comm_size ompi_comm_remote_size ompi_comm_get_cid ompi_comm_lookup ompi_comm_peer_lookup ompi_comm_peer_invalid ompi_comm_init ompi_comm_link_function ompi_comm_group ompi_comm_create ompi_topo_create ompi_comm_split ompi_comm_dup ompi_comm_compare ompi_comm_free ompi_comm_allocate ompi_comm_nextcid ompi_comm_finalize ompi_comm_set ompi_comm_get_rprocs ompi_comm_overlapping_groups ompi_comm_determine_first ompi_comm_activate ompi_comm_dump ompi_comm_set_name ompi_comm_reg_init ompi_comm_reg_finalize ompi_comm_num_dync CONVERTOR_DATATYPE_MASK CONVERTOR_SEND_CONVERSION CONVERTOR_RECV CONVERTOR_SEND CONVERTOR_HOMOGENEOUS CONVERTOR_NO_OP CONVERTOR_WITH_CHECKSUM CONVERTOR_TYPE_MASK CONVERTOR_STATE_START CONVERTOR_STATE_COMPLETE CONVERTOR_STATE_ALLOC CONVERTOR_COMPLETED ompi_convertor_t ompi_convertor_master_t dt_stack_t DT_STATIC_STACK_SIZE ompi_convertor_get_checksum ompi_convertor_pack ompi_convertor_unpack ompi_convertor_create ompi_convertor_cleanup ompi_convertor_need_buffers ompi_convertor_get_packed_size ompi_convertor_get_unpacked_size ompi_convertor_get_current_pointer ompi_convertor_prepare_for_send ompi_convertor_copy_and_prepare_for_send ompi_convertor_prepare_for_recv ompi_convertor_copy_and_prepare_for_recv ompi_convertor_raw ompi_convertor_set_position_nocheck ompi_convertor_set_position ompi_convertor_personalize ompi_convertor_clone ompi_convertor_clone_with_position ompi_convertor_dump ompi_ddt_dump_stack ompi_convertor_generic_simple_position MPI_Datatype"
184+
SEARCH_HEADER[4]="ompi/communicator/communicator.h MPI_Comm MPI_COMM_WORLD ompi_communicator_t OMPI_COMM_INTER OMPI_COMM_CART OMPI_COMM_GRAPH OMPI_COMM_NAMEISSET OMPI_COMM_ISFREED OMPI_COMM_INTRINSIC OMPI_COMM_DYNAMIC OMPI_COMM_INVALID OMPI_COMM_DISJOINT_SET OMPI_COMM_DISJOINT OMPI_COMM_PML_ADDED OMPI_COMM_IS_ OMPI_COMM_SET_ OMPI_COMM_ALLGATHER_TAG OMPI_COMM_BARRIER_TAG OMPI_COMM_ALLREDUCE_TAG OMPI_COMM_CID_ OMPI_COMM_BLOCK_ ompi_predefined_communicator_t ompi_mpi_comm_parent ompi_mpi_comm_world ompi_mpi_comm_self ompi_mpi_comm_null ompi_comm_invalid ompi_comm_rank ompi_comm_size ompi_comm_remote_size ompi_comm_get_cid ompi_comm_lookup ompi_comm_peer_lookup ompi_comm_peer_invalid ompi_comm_init ompi_comm_link_function ompi_comm_group ompi_comm_create ompi_topo_create ompi_comm_split ompi_comm_dup ompi_comm_compare ompi_comm_free ompi_comm_allocate ompi_comm_nextcid ompi_comm_finalize ompi_comm_set ompi_comm_get_rprocs ompi_comm_overlapping_groups ompi_comm_determine_first ompi_comm_activate ompi_comm_dump ompi_comm_set_name ompi_comm_reg_init ompi_comm_reg_finalize ompi_comm_num_dyncomm ompi_mpi_cxx_comm_errhandler_invoke"
185+
SEARCH_HEADER[5]="ompi/datatype/convertor.h OMPI_COMM_INTER OMPI_COMM_CART OMPI_COMM_GRAPH OMPI_COMM_NAMEISSET OMPI_COMM_ISFREED OMPI_COMM_INTRINSIC OMPI_COMM_DYNAMIC OMPI_COMM_INVALID OMPI_COMM_DISJOINT_SET OMPI_COMM_DISJOINT OMPI_COMM_PML_ADDED OMPI_COMM_IS_ OMPI_COMM_SET_ OMPI_COMM_ALLGATHER_TAG OMPI_COMM_BARRIER_TAG OMPI_COMM_ALLREDUCE_TAG OMPI_COMM_CID_ OMPI_COMM_BLOCK_ ompi_predefined_communicator_t ompi_mpi_comm_parent ompi_mpi_comm_null ompi_comm_invalid ompi_comm_rank ompi_comm_size ompi_comm_remote_size ompi_comm_get_cid ompi_comm_lookup ompi_comm_peer_lookup ompi_comm_peer_invalid ompi_comm_init ompi_comm_link_function ompi_comm_group ompi_comm_create ompi_topo_create ompi_comm_split ompi_comm_dup ompi_comm_compare ompi_comm_free ompi_comm_allocate ompi_comm_nextcid ompi_comm_finalize ompi_comm_set ompi_comm_get_rprocs ompi_comm_overlapping_groups ompi_comm_determine_first ompi_comm_activate ompi_comm_dump ompi_comm_set_name ompi_comm_reg_init ompi_comm_reg_finalize ompi_comm_num_dync CONVERTOR_DATATYPE_MASK CONVERTOR_SEND_CONVERSION CONVERTOR_RECV CONVERTOR_SEND CONVERTOR_HOMOGENEOUS CONVERTOR_NO_OP CONVERTOR_WITH_CHECKSUM CONVERTOR_TYPE_MASK CONVERTOR_STATE_START CONVERTOR_STATE_COMPLETE CONVERTOR_STATE_ALLOC CONVERTOR_COMPLETED ompi_convertor_t ompi_convertor_master_t dt_stack_t DT_STATIC_STACK_SIZE ompi_convertor_get_checksum ompi_convertor_pack ompi_convertor_unpack ompi_convertor_create ompi_convertor_cleanup ompi_convertor_need_buffers ompi_convertor_get_packed_size ompi_convertor_get_unpacked_size ompi_convertor_get_current_pointer ompi_convertor_prepare_for_send ompi_convertor_copy_and_prepare_for_send ompi_convertor_prepare_for_recv ompi_convertor_copy_and_prepare_for_recv ompi_convertor_raw ompi_convertor_set_position_nocheck ompi_convertor_set_position ompi_convertor_personalize ompi_convertor_clone ompi_convertor_clone_with_position ompi_convertor_dump ompi_ddt_dump_stack ompi_convertor_generic_simple_position MPI_Datatype"
186186
SEARCH_HEADER[6]="ompi/datatype/datatype.h MPI_Datatype DT_MAX_PREDEFINED DT_FLAG_ MAX_DT_COMPONENT_COUNT opal_ddt_count_t dt_type_desc_t ompi_datatype_t ompi_predefined_datatype_t ompi_ddt_init ompi_ddt_finalize ompi_ddt_create_ ompi_ddt_duplicate ompi_ddt_is_predefined ompi_ddt_create_from_packed_description"
187187
SEARCH_HEADER[7]="ompi/datatype/datatype_internal.h DDT_DUMP_STACK DT_ ddt_elem_id_description ddt_elem_desc ddt_elem_desc_t ddt_loop_desc ddt_loop_desc_t ddt_endloop_desc ddt_endloop_desc_t dt_elem_desc CREATE_LOOP_START CREATE_LOOP_END CREATE_ELEM ompi_complex_float_t ompi_complex_double_t ompi_complex_long_double_t ompi_ddt_basicDatatypes BASIC_DDT_FROM_ELEM ompi_ddt_default_convertors_init ompi_ddt_default_convertors_fini SAVE_STACK PUSH_STACK ompi_ddt_safeguard_pointer_debug_breakpoint OMPI_DDT_SAFEGUARD_POINTER GET_FIRST_NON_LOOP UPDATE_INTERNAL_COUNTERS ompi_ddt_print_args"
188188
SEARCH_HEADER[8]="ompi/errhandler/errhandler.h OMPI_ERRHANDLER_LANG_ ompi_errhandler_lang_t OMPI_ERRHANDLER_TYPE_ ompi_errhandler_type_t ompi_errhandler_t ompi_predefined_errhandler_t ompi_mpi_errhandler_null OMPI_ERRHANDLER_CHECK OMPI_ERRHANDLER_RETURN ompi_errhandler_init ompi_errhandler_finalize OMPI_ERRHANDLER_INVOKE ompi_errhandler_invoke ompi_errhandler_request_invoke ompi_errhandler_create ompi_errhandler_is_intrinsic ompi_errhandler_fortran_handler_fn_t OMPI_ERR_INIT_FINALIZE MPI_Errhandler"

ompi/communicator/comm.c

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,6 @@ int ompi_comm_create_w_info (ompi_communicator_t *comm, ompi_group_t *group, opa
458458
goto exit;
459459
}
460460

461-
462461
/* Check whether we are part of the new comm.
463462
If not, we have to free the structure again.
464463
However, we could not avoid the comm_nextcid step, since
@@ -2690,6 +2689,49 @@ static int ompi_comm_copy_topo(ompi_communicator_t *oldcomm,
26902689
return OMPI_SUCCESS;
26912690
}
26922691

2692+
int ompi_comm_set_disjointness(ompi_communicator_t *newcomm, ompi_communicator_t *oldcomm)
2693+
{
2694+
int local_peers = 0, rc = OMPI_ERROR;
2695+
2696+
if (OMPI_COMM_IS_DISJOINT_SET(newcomm)) {
2697+
rc = OMPI_SUCCESS;
2698+
goto out;
2699+
}
2700+
2701+
if (NULL != oldcomm && OMPI_COMM_IS_DISJOINT(oldcomm)) {
2702+
/**
2703+
* A communicator splitted from a disjoint
2704+
* communicator(1 process per node) is also disjoint
2705+
*/
2706+
newcomm->c_flags |= (OMPI_COMM_DISJOINT_SET | OMPI_COMM_DISJOINT);
2707+
rc = OMPI_SUCCESS;
2708+
goto out;
2709+
}
2710+
2711+
if (!newcomm->c_coll) {
2712+
rc = OMPI_ERR_NOT_AVAILABLE;
2713+
goto out;
2714+
}
2715+
2716+
local_peers = ompi_group_count_local_peers(newcomm->c_local_group);
2717+
rc = newcomm->c_coll->coll_allreduce(MPI_IN_PLACE, &local_peers, 1, MPI_INT, MPI_MAX, newcomm,
2718+
newcomm->c_coll->coll_allreduce_module);
2719+
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
2720+
goto out;
2721+
}
2722+
2723+
if (1 == local_peers) {
2724+
newcomm->c_flags |= OMPI_COMM_DISJOINT;
2725+
} else {
2726+
newcomm->c_flags &= ~OMPI_COMM_DISJOINT;
2727+
}
2728+
2729+
newcomm->c_flags |= OMPI_COMM_DISJOINT_SET;
2730+
2731+
out:
2732+
return rc;
2733+
}
2734+
26932735
char *ompi_comm_print_cid (const ompi_communicator_t *comm)
26942736
{
26952737
#if OPAL_HAVE_THREAD_LOCAL

ompi/communicator/comm_cid.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -811,6 +811,16 @@ static int ompi_comm_activate_complete (ompi_communicator_t **newcomm, ompi_comm
811811
return ret;
812812
}
813813

814+
/**
815+
* Use the initialized collective component to determine whether the processes are located on
816+
* individual nodes
817+
*/
818+
if (OMPI_SUCCESS != ompi_comm_set_disjointness(*newcomm, comm)) {
819+
OBJ_RELEASE(*newcomm);
820+
*newcomm = MPI_COMM_NULL;
821+
return ret;
822+
}
823+
814824
/* For an inter communicator, we have to deal with the potential
815825
* problem of what is happening if the local_comm that we created
816826
* has a lower CID than the parent comm. This is not a problem

ompi/communicator/communicator.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_communicator_t);
6262
#define OMPI_COMM_DYNAMIC 0x00000008
6363
#define OMPI_COMM_ISFREED 0x00000010
6464
#define OMPI_COMM_INVALID 0x00000020
65+
#define OMPI_COMM_DISJOINT_SET 0x00000040
66+
#define OMPI_COMM_DISJOINT 0x00000080
6567
#define OMPI_COMM_CART 0x00000100
6668
#define OMPI_COMM_GRAPH 0x00000200
6769
#define OMPI_COMM_DIST_GRAPH 0x00000400
@@ -80,6 +82,8 @@ OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_communicator_t);
8082
#define OMPI_COMM_IS_FREED(comm) ((comm)->c_flags & OMPI_COMM_ISFREED)
8183
#define OMPI_COMM_IS_DYNAMIC(comm) ((comm)->c_flags & OMPI_COMM_DYNAMIC)
8284
#define OMPI_COMM_IS_INVALID(comm) ((comm)->c_flags & OMPI_COMM_INVALID)
85+
#define OMPI_COMM_IS_DISJOINT_SET(comm) ((comm)->c_flags & OMPI_COMM_DISJOINT_SET)
86+
#define OMPI_COMM_IS_DISJOINT(comm) ((comm)->c_flags & OMPI_COMM_DISJOINT)
8387
#define OMPI_COMM_IS_PML_ADDED(comm) ((comm)->c_flags & OMPI_COMM_PML_ADDED)
8488
#define OMPI_COMM_IS_EXTRA_RETAIN(comm) ((comm)->c_flags & OMPI_COMM_EXTRA_RETAIN)
8589
#define OMPI_COMM_IS_TOPO(comm) (OMPI_COMM_IS_CART((comm)) || \
@@ -897,6 +901,19 @@ OMPI_DECLSPEC int ompi_comm_split_type(ompi_communicator_t *comm,
897901
struct opal_info_t *info,
898902
ompi_communicator_t** newcomm);
899903

904+
/**
905+
* Set newcomm's disjoint flags based on oldcomm if provided. In the case where oldcomm
906+
* is disjoint, the function will short circuit and set newcomm to be disjoint.
907+
* Otherwise, the function will carry out a collective communication on all processes
908+
* in newcomm. Therefore this function should only be called **after** the collectives
909+
* modules are initialized on newcomm.
910+
*
911+
* @param newcomm: new communicator
912+
* @param oldcomm: parent communictator or NULL
913+
*
914+
*/
915+
OMPI_DECLSPEC int ompi_comm_set_disjointness(ompi_communicator_t *newcomm, ompi_communicator_t *oldcomm);
916+
900917
/**
901918
* dup a communicator. Parameter are identical to the MPI-counterpart
902919
* of the function. It has been extracted, since we need to be able

ompi/mca/coll/han/coll_han_subcomms.c

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,7 @@ int mca_coll_han_comm_create(struct ompi_communicator_t *comm,
282282
opal_info_set(&comm_info, "ompi_comm_coll_preference", "tuned,^han");
283283
ompi_comm_split_type(comm, MPI_COMM_TYPE_SHARED, 0,
284284
&comm_info, &(low_comms[0]));
285+
assert(OMPI_COMM_IS_DISJOINT_SET(low_comms[0]) && !OMPI_COMM_IS_DISJOINT(low_comms[0]));
285286

286287
/*
287288
* Get my local rank and the local size
@@ -296,6 +297,7 @@ int mca_coll_han_comm_create(struct ompi_communicator_t *comm,
296297
opal_info_set(&comm_info, "ompi_comm_coll_preference", "sm,^han");
297298
ompi_comm_split_type(comm, MPI_COMM_TYPE_SHARED, 0,
298299
&comm_info, &(low_comms[1]));
300+
assert(OMPI_COMM_IS_DISJOINT_SET(low_comms[1]) && !OMPI_COMM_IS_DISJOINT(low_comms[1]));
299301

300302
/*
301303
* Upgrade libnbc module priority to set up up_comms[0] with libnbc module
@@ -304,15 +306,16 @@ int mca_coll_han_comm_create(struct ompi_communicator_t *comm,
304306
*/
305307
opal_info_set(&comm_info, "ompi_comm_coll_preference", "libnbc,^han");
306308
ompi_comm_split_with_info(comm, low_rank, w_rank, &comm_info, &(up_comms[0]), false);
307-
308309
up_rank = ompi_comm_rank(up_comms[0]);
310+
assert(OMPI_COMM_IS_DISJOINT_SET(up_comms[0]) && OMPI_COMM_IS_DISJOINT(up_comms[0]));
309311

310312
/*
311313
* Upgrade adapt module priority to set up up_comms[0] with adapt module
312314
* This sub-communicator contains one process per node.
313315
*/
314316
opal_info_set(&comm_info, "ompi_comm_coll_preference", "adapt,^han");
315317
ompi_comm_split_with_info(comm, low_rank, w_rank, &comm_info, &(up_comms[1]), false);
318+
assert(OMPI_COMM_IS_DISJOINT_SET(up_comms[1]) && OMPI_COMM_IS_DISJOINT(up_comms[1]));
316319

317320
/*
318321
* Set my virtual rank number.
@@ -350,5 +353,3 @@ int mca_coll_han_comm_create(struct ompi_communicator_t *comm,
350353
OBJ_DESTRUCT(&comm_info);
351354
return OMPI_SUCCESS;
352355
}
353-
354-

0 commit comments

Comments
 (0)