Skip to content

Commit ccb59eb

Browse files
authored
Merge pull request #6439 from rhc54/topic/xcnt
Fix cross-mpirun connect/accept operations
2 parents 29fa66c + 60961ce commit ccb59eb

File tree

4 files changed

+151
-10
lines changed

4 files changed

+151
-10
lines changed

orte/mca/state/base/state_base_fns.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -922,8 +922,9 @@ void orte_state_base_check_all_complete(int fd, short args, void *cbdata)
922922
one_still_alive = false;
923923
j = opal_hash_table_get_first_key_uint32(orte_job_data, &u32, (void **)&job, &nptr);
924924
while (OPAL_SUCCESS == j) {
925-
/* skip the daemon job */
926-
if (job->jobid == ORTE_PROC_MY_NAME->jobid) {
925+
/* skip the daemon job and all jobs from other families */
926+
if (job->jobid == ORTE_PROC_MY_NAME->jobid ||
927+
ORTE_JOB_FAMILY(job->jobid) != ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid)) {
927928
goto next;
928929
}
929930
/* if this is the job we are checking AND it normally terminated,

orte/orted/pmix/pmix_server_dyn.c

Lines changed: 106 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242

4343
#include "orte/mca/errmgr/errmgr.h"
4444
#include "orte/mca/rmaps/base/base.h"
45+
#include "orte/mca/rml/base/rml_contact.h"
4546
#include "orte/mca/state/state.h"
4647
#include "orte/util/name_fns.h"
4748
#include "orte/util/show_help.h"
@@ -539,7 +540,14 @@ static void _cnlk(int status, opal_list_t *data, void *cbdata)
539540
int rc, cnt;
540541
opal_pmix_pdata_t *pdat;
541542
orte_job_t *jdata;
542-
opal_buffer_t buf;
543+
orte_node_t *node;
544+
orte_proc_t *proc;
545+
opal_buffer_t buf, bucket;
546+
opal_byte_object_t *bo;
547+
orte_process_name_t dmn, pname;
548+
char *uri;
549+
opal_value_t val;
550+
opal_list_t nodes;
543551

544552
ORTE_ACQUIRE_OBJECT(cd);
545553

@@ -556,6 +564,7 @@ static void _cnlk(int status, opal_list_t *data, void *cbdata)
556564
pdat = (opal_pmix_pdata_t*)opal_list_get_first(data);
557565
if (OPAL_BYTE_OBJECT != pdat->value.type) {
558566
rc = ORTE_ERR_BAD_PARAM;
567+
ORTE_ERROR_LOG(rc);
559568
goto release;
560569
}
561570
/* the data will consist of a packed buffer with the job data in it */
@@ -565,15 +574,107 @@ static void _cnlk(int status, opal_list_t *data, void *cbdata)
565574
pdat->value.data.bo.size = 0;
566575
cnt = 1;
567576
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &jdata, &cnt, ORTE_JOB))) {
577+
ORTE_ERROR_LOG(rc);
578+
OBJ_DESTRUCT(&buf);
579+
goto release;
580+
}
581+
582+
/* unpack the byte object containing the daemon uri's */
583+
cnt=1;
584+
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &bo, &cnt, OPAL_BYTE_OBJECT))) {
585+
ORTE_ERROR_LOG(rc);
568586
OBJ_DESTRUCT(&buf);
569587
goto release;
570588
}
589+
/* load it into a buffer */
590+
OBJ_CONSTRUCT(&bucket, opal_buffer_t);
591+
opal_dss.load(&bucket, bo->bytes, bo->size);
592+
bo->bytes = NULL;
593+
free(bo);
594+
/* prep a list to save the nodes */
595+
OBJ_CONSTRUCT(&nodes, opal_list_t);
596+
/* unpack and store the URI's */
597+
cnt = 1;
598+
while (OPAL_SUCCESS == (rc = opal_dss.unpack(&bucket, &uri, &cnt, OPAL_STRING))) {
599+
rc = orte_rml_base_parse_uris(uri, &dmn, NULL);
600+
if (ORTE_SUCCESS != rc) {
601+
OBJ_DESTRUCT(&buf);
602+
OBJ_DESTRUCT(&bucket);
603+
goto release;
604+
}
605+
/* save a node object for this daemon */
606+
node = OBJ_NEW(orte_node_t);
607+
node->daemon = OBJ_NEW(orte_proc_t);
608+
memcpy(&node->daemon->name, &dmn, sizeof(orte_process_name_t));
609+
opal_list_append(&nodes, &node->super);
610+
/* register the URI */
611+
OBJ_CONSTRUCT(&val, opal_value_t);
612+
val.key = OPAL_PMIX_PROC_URI;
613+
val.type = OPAL_STRING;
614+
val.data.string = uri;
615+
if (OPAL_SUCCESS != (rc = opal_pmix.store_local(&dmn, &val))) {
616+
ORTE_ERROR_LOG(rc);
617+
val.key = NULL;
618+
val.data.string = NULL;
619+
OBJ_DESTRUCT(&val);
620+
OBJ_DESTRUCT(&buf);
621+
OBJ_DESTRUCT(&bucket);
622+
goto release;
623+
}
624+
val.key = NULL;
625+
val.data.string = NULL;
626+
OBJ_DESTRUCT(&val);
627+
cnt = 1;
628+
}
629+
OBJ_DESTRUCT(&bucket);
630+
631+
/* unpack the proc-to-daemon map */
632+
cnt=1;
633+
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &bo, &cnt, OPAL_BYTE_OBJECT))) {
634+
ORTE_ERROR_LOG(rc);
635+
OBJ_DESTRUCT(&buf);
636+
goto release;
637+
}
638+
/* load it into a buffer */
639+
OBJ_CONSTRUCT(&bucket, opal_buffer_t);
640+
opal_dss.load(&bucket, bo->bytes, bo->size);
641+
bo->bytes = NULL;
642+
free(bo);
643+
/* unpack and store the map */
644+
cnt = 1;
645+
while (OPAL_SUCCESS == (rc = opal_dss.unpack(&bucket, &pname, &cnt, ORTE_NAME))) {
646+
/* get the name of the daemon hosting it */
647+
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&bucket, &dmn, &cnt, ORTE_NAME))) {
648+
OBJ_DESTRUCT(&buf);
649+
OBJ_DESTRUCT(&bucket);
650+
goto release;
651+
}
652+
/* create the proc object */
653+
proc = OBJ_NEW(orte_proc_t);
654+
memcpy(&proc->name, &pname, sizeof(orte_process_name_t));
655+
opal_pointer_array_set_item(jdata->procs, pname.vpid, proc);
656+
/* find the daemon */
657+
OPAL_LIST_FOREACH(node, &nodes, orte_node_t) {
658+
if (node->daemon->name.vpid == dmn.vpid) {
659+
OBJ_RETAIN(node);
660+
proc->node = node;
661+
break;
662+
}
663+
}
664+
}
665+
OBJ_DESTRUCT(&bucket);
666+
OPAL_LIST_DESTRUCT(&nodes);
571667
OBJ_DESTRUCT(&buf);
668+
669+
/* register the nspace */
572670
if (ORTE_SUCCESS != (rc = orte_pmix_server_register_nspace(jdata, true))) {
671+
ORTE_ERROR_LOG(rc);
573672
OBJ_RELEASE(jdata);
574673
goto release;
575674
}
576-
OBJ_RELEASE(jdata); // no reason to keep this around
675+
676+
/* save the job object so we don't endlessly cycle */
677+
opal_hash_table_set_value_uint32(orte_job_data, jdata->jobid, jdata);
577678

578679
/* restart the cnct processor */
579680
ORTE_PMIX_OPERATION(cd->procs, cd->info, _cnct, cd->cbfunc, cd->cbdata);
@@ -619,6 +720,7 @@ static void _cnct(int sd, short args, void *cbdata)
619720
* out about it, and all we can do is return an error */
620721
if (orte_pmix_server_globals.server.jobid == ORTE_PROC_MY_HNP->jobid &&
621722
orte_pmix_server_globals.server.vpid == ORTE_PROC_MY_HNP->vpid) {
723+
ORTE_ERROR_LOG(ORTE_ERR_NOT_SUPPORTED);
622724
rc = ORTE_ERR_NOT_SUPPORTED;
623725
goto release;
624726
}
@@ -634,6 +736,7 @@ static void _cnct(int sd, short args, void *cbdata)
634736
kv->data.uint32 = geteuid();
635737
opal_list_append(cd->info, &kv->super);
636738
if (ORTE_SUCCESS != (rc = pmix_server_lookup_fn(&nm->name, keys, cd->info, _cnlk, cd))) {
739+
ORTE_ERROR_LOG(rc);
637740
opal_argv_free(keys);
638741
goto release;
639742
}
@@ -647,6 +750,7 @@ static void _cnct(int sd, short args, void *cbdata)
647750
if (!orte_get_attribute(&jdata->attributes, ORTE_JOB_NSPACE_REGISTERED, NULL, OPAL_BOOL)) {
648751
/* it hasn't been registered yet, so register it now */
649752
if (ORTE_SUCCESS != (rc = orte_pmix_server_register_nspace(jdata, true))) {
753+
ORTE_ERROR_LOG(rc);
650754
goto release;
651755
}
652756
}

orte/orted/pmix/pmix_server_fence.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ static void dmodex_req(int sd, short args, void *cbdata)
227227
rc = ORTE_ERR_NOT_FOUND;
228228
goto callback;
229229
}
230+
230231
/* point the request to the daemon that is hosting the
231232
* target process */
232233
req->proxy.vpid = dmn->name.vpid;
@@ -240,7 +241,8 @@ static void dmodex_req(int sd, short args, void *cbdata)
240241

241242
/* if we are the host daemon, then this is a local request, so
242243
* just wait for the data to come in */
243-
if (ORTE_PROC_MY_NAME->vpid == dmn->name.vpid) {
244+
if (ORTE_PROC_MY_NAME->jobid == dmn->name.jobid &&
245+
ORTE_PROC_MY_NAME->vpid == dmn->name.vpid) {
244246
return;
245247
}
246248

orte/orted/pmix/pmix_server_register_fns.c

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* All rights reserved.
1414
* Copyright (c) 2009-2018 Cisco Systems, Inc. All rights reserved
1515
* Copyright (c) 2011 Oak Ridge National Labs. All rights reserved.
16-
* Copyright (c) 2013-2018 Intel, Inc. All rights reserved.
16+
* Copyright (c) 2013-2019 Intel, Inc. All rights reserved.
1717
* Copyright (c) 2014 Mellanox Technologies, Inc.
1818
* All rights reserved.
1919
* Copyright (c) 2014-2016 Research Organization for Information Science
@@ -71,6 +71,9 @@ int orte_pmix_server_register_nspace(orte_job_t *jdata, bool force)
7171
gid_t gid;
7272
opal_list_t *cache;
7373
hwloc_obj_t machine;
74+
opal_buffer_t buf, bucket;
75+
opal_byte_object_t bo, *boptr;
76+
orte_proc_t *proc;
7477

7578
opal_output_verbose(2, orte_pmix_server_globals.output,
7679
"%s register nspace for %s",
@@ -494,21 +497,52 @@ int orte_pmix_server_register_nspace(orte_job_t *jdata, bool force)
494497
jdata->num_local_procs,
495498
info, NULL, NULL);
496499
OPAL_LIST_RELEASE(info);
500+
if (OPAL_SUCCESS != rc) {
501+
return rc;
502+
}
497503

498-
/* if the user has connected us to an external server, then we must
499-
* assume there is going to be some cross-mpirun exchange, and so
504+
/* if I am the HNP and this job is a member of my family, then we must
505+
* assume there could be some cross-mpirun exchange, and so
500506
* we protect against that situation by publishing the job info
501507
* for this job - this allows any subsequent "connect" to retrieve
502508
* the job info */
503-
if (NULL != orte_data_server_uri) {
504-
opal_buffer_t buf;
505509

510+
if (ORTE_PROC_IS_HNP && ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid) == ORTE_JOB_FAMILY(jdata->jobid)) {
511+
/* pack the job - note that this doesn't include the procs
512+
* or their locations */
506513
OBJ_CONSTRUCT(&buf, opal_buffer_t);
507514
if (OPAL_SUCCESS != (rc = opal_dss.pack(&buf, &jdata, 1, ORTE_JOB))) {
508515
ORTE_ERROR_LOG(rc);
509516
OBJ_DESTRUCT(&buf);
510517
return rc;
511518
}
519+
520+
/* pack the hostname, daemon vpid and contact URI for each involved node */
521+
map = jdata->map;
522+
OBJ_CONSTRUCT(&bucket, opal_buffer_t);
523+
for (i=0; i < map->nodes->size; i++) {
524+
if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(map->nodes, i))) {
525+
continue;
526+
}
527+
opal_dss.pack(&bucket, &node->daemon->rml_uri, 1, OPAL_STRING);
528+
}
529+
opal_dss.unload(&bucket, (void**)&bo.bytes, &bo.size);
530+
boptr = &bo;
531+
opal_dss.pack(&buf, &boptr, 1, OPAL_BYTE_OBJECT);
532+
533+
/* pack the proc name and daemon vpid for each proc */
534+
OBJ_CONSTRUCT(&bucket, opal_buffer_t);
535+
for (i=0; i < jdata->procs->size; i++) {
536+
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, i))) {
537+
continue;
538+
}
539+
opal_dss.pack(&bucket, &proc->name, 1, ORTE_NAME);
540+
opal_dss.pack(&bucket, &proc->node->daemon->name, 1, ORTE_NAME);
541+
}
542+
opal_dss.unload(&bucket, (void**)&bo.bytes, &bo.size);
543+
boptr = &bo;
544+
opal_dss.pack(&buf, &boptr, 1, OPAL_BYTE_OBJECT);
545+
512546
info = OBJ_NEW(opal_list_t);
513547
/* create a key-value with the key being the string jobid
514548
* and the value being the byte object */

0 commit comments

Comments
 (0)