Skip to content

Commit a5f4846

Browse files
committed
Add support for dynamic allow_overtake
This support only covers the case when allow_overtake is enabled. For the opposite there is no simple solution, as described in the comment in pml_ob1.c Signed-off-by: George Bosilca <bosilca@icl.utk.edu>
1 parent 152ca59 commit a5f4846

File tree

3 files changed

+80
-2
lines changed

3 files changed

+80
-2
lines changed

ompi/mca/pml/ob1/pml_ob1.c

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
* Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana
44
* University Research and Technology
55
* Corporation. All rights reserved.
6-
* Copyright (c) 2004-2020 The University of Tennessee and The University
6+
* Copyright (c) 2004-2022 The University of Tennessee and The University
77
* of Tennessee Research Foundation. All rights
88
* reserved.
99
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
@@ -202,6 +202,43 @@ int mca_pml_ob1_enable(bool enable)
202202
return OMPI_SUCCESS;
203203
}
204204

205+
static const char*
206+
mca_pml_ob1_set_allow_overtake(opal_infosubscriber_t* obj,
207+
const char* key,
208+
const char* value)
209+
{
210+
ompi_communicator_t *ompi_comm = (ompi_communicator_t *) obj;
211+
bool allow_overtake_was_set = OMPI_COMM_CHECK_ASSERT_ALLOW_OVERTAKE(ompi_comm);
212+
213+
/* As we keep the out-of-sequence messages ordered by their sequence, as a receiver we
214+
* can just move the previously considered out-of-order messages into the unexpected queue,
215+
* and we maintain some form of logical consistency with the message order.
216+
*/
217+
if (opal_str_to_bool(value)) {
218+
if (!allow_overtake_was_set) {
219+
ompi_comm->c_flags |= OMPI_COMM_ASSERT_ALLOW_OVERTAKE;
220+
mca_pml_ob1_merge_cant_match(ompi_comm);
221+
}
222+
return "true";
223+
}
224+
if (allow_overtake_was_set) {
225+
/* However, in the case we are trying to turn off allow_overtake, it is not clear what
226+
* should be done with the previous messages that are pending on our peers, nor with
227+
* the messages currently in the network. Similarly, if one process turns off allow
228+
* overtake, before any potential sender start sending valid sequence numbers there
229+
* is no way to order the messages in a sensible order.
230+
* The possible solution is cumbersome, it would force a network quiescence followed by
231+
* a synchronization of all processes in the communicator, and then all peers will
232+
* start sending messages starting with sequence number 0.
233+
* A lot of code for minimal benefit, especially taking in account that the MPI standard
234+
* does not define this. Instead, refuse to disable allow overtake, and at least the
235+
* user has the opportunity to check if we accepted to change it.
236+
*/
237+
return "true";
238+
}
239+
return "false";
240+
}
241+
205242
int mca_pml_ob1_add_comm(ompi_communicator_t* comm)
206243
{
207244
/* allocate pml specific comm data */
@@ -221,11 +258,14 @@ int mca_pml_ob1_add_comm(ompi_communicator_t* comm)
221258
}
222259

223260
ompi_comm_assert_subscribe (comm, OMPI_COMM_ASSERT_NO_ANY_SOURCE);
224-
ompi_comm_assert_subscribe (comm, OMPI_COMM_ASSERT_ALLOW_OVERTAKE);
225261

226262
mca_pml_ob1_comm_init_size(pml_comm, comm->c_remote_group->grp_proc_count);
227263
comm->c_pml_comm = pml_comm;
228264

265+
/* Register the subscriber alert for the mpi_assert_allow_overtaking info. */
266+
opal_infosubscribe_subscribe (&comm->super, "mpi_assert_allow_overtaking",
267+
"false", mca_pml_ob1_set_allow_overtake);
268+
229269
/* Grab all related messages from the non_existing_communicator pending queue */
230270
OPAL_LIST_FOREACH_SAFE(frag, next_frag, &mca_pml_ob1.non_existing_communicator_pending, mca_pml_ob1_recv_frag_t) {
231271
hdr = &frag->hdr.hdr_match;

ompi/mca/pml/ob1/pml_ob1_recvfrag.c

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -637,6 +637,38 @@ void mca_pml_ob1_recv_frag_callback_match (mca_btl_base_module_t *btl,
637637
}
638638
}
639639

640+
/**
641+
* Merge all out of sequence fragments into the matching queue, as if they were received now.
642+
*/
643+
int mca_pml_ob1_merge_cant_match( ompi_communicator_t * ompi_comm )
644+
{
645+
mca_pml_ob1_comm_t * pml_comm = (mca_pml_ob1_comm_t *)ompi_comm->c_pml_comm;
646+
mca_pml_ob1_recv_frag_t *frag, *frags_cant_match;
647+
mca_pml_ob1_comm_proc_t* proc;
648+
int cnt = 0;
649+
650+
for (uint32_t i = 0; i < pml_comm->num_procs; i++) {
651+
if ((NULL == (proc = pml_comm->procs[i])) || (NULL != proc->frags_cant_match)) {
652+
continue;
653+
}
654+
655+
OB1_MATCHING_LOCK(&pml_comm->matching_lock);
656+
/* Acquire all cant_match frags from the peer */
657+
frags_cant_match = proc->frags_cant_match;
658+
proc->frags_cant_match = NULL;
659+
while(NULL != (frag = remove_head_from_ordered_list(&frags_cant_match))) {
660+
/* mca_pml_ob1_recv_frag_match_proc() will release the lock. */
661+
mca_pml_ob1_recv_frag_match_proc(frag->btl, ompi_comm, proc,
662+
&frag->hdr.hdr_match,
663+
frag->segments, frag->num_segments,
664+
frag->hdr.hdr_match.hdr_common.hdr_type, frag);
665+
OB1_MATCHING_LOCK(&pml_comm->matching_lock);
666+
cnt++;
667+
}
668+
}
669+
OB1_MATCHING_UNLOCK(&pml_comm->matching_lock);
670+
return cnt;
671+
}
640672

641673
void mca_pml_ob1_recv_frag_callback_rndv (mca_btl_base_module_t *btl,
642674
const mca_btl_base_receive_descriptor_t *descriptor)

ompi/mca/pml/ob1/pml_ob1_recvfrag.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,12 @@ extern void mca_pml_ob1_recv_frag_callback_cid( mca_btl_base_module_t *btl,
172172
extern mca_pml_ob1_recv_frag_t*
173173
check_cantmatch_for_match(mca_pml_ob1_comm_proc_t *proc);
174174

175+
/**
176+
* Move for all peers all pending cant_match fragments into the matching queues. This
177+
* function is necessary when allow_overtake info key is transition to set.
178+
*/
179+
int mca_pml_ob1_merge_cant_match( ompi_communicator_t * ompi_comm );
180+
175181
void append_frag_to_ordered_list(mca_pml_ob1_recv_frag_t** queue,
176182
mca_pml_ob1_recv_frag_t* frag,
177183
uint16_t seq);

0 commit comments

Comments
 (0)