Skip to content

Commit 8174286

Browse files
committed
Sync nidmap to PRRTE to fix hetero topo problem
Signed-off-by: Ralph Castain <rhc@pmix.org>
1 parent dfbc144 commit 8174286

File tree

3 files changed

+150
-58
lines changed

3 files changed

+150
-58
lines changed

orte/mca/plm/base/plm_base_launch_support.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1325,7 +1325,7 @@ void orte_plm_base_daemon_callback(int status, orte_process_name_t* sender,
13251325
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
13261326
t = OBJ_NEW(orte_topology_t);
13271327
t->sig = sig;
1328-
opal_pointer_array_add(orte_node_topologies, t);
1328+
t->index = opal_pointer_array_add(orte_node_topologies, t);
13291329
daemon->node->topology = t;
13301330
if (NULL != topo) {
13311331
t->topo = topo;

orte/runtime/orte_globals.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ struct orte_job_map_t;
216216
/* define an object for storing node topologies */
217217
typedef struct {
218218
opal_object_t super;
219+
int index;
219220
hwloc_topology_t topo;
220221
char *sig;
221222
} orte_topology_t;

orte/util/nidmap.c

Lines changed: 148 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,7 @@ int orte_util_decode_nidmap(opal_buffer_t *buf)
349349
}
350350
} else {
351351
vp8 = (uint8_t*)boptr->bytes;
352+
sz = boptr->size;
352353
boptr->bytes = NULL;
353354
boptr->size = 0;
354355
}
@@ -444,27 +445,17 @@ int orte_util_decode_nidmap(opal_buffer_t *buf)
444445
return rc;
445446
}
446447

447-
typedef struct {
448-
opal_list_item_t super;
449-
orte_topology_t *t;
450-
} orte_tptr_trk_t;
451-
static OBJ_CLASS_INSTANCE(orte_tptr_trk_t,
452-
opal_list_item_t,
453-
NULL, NULL);
454-
455448
int orte_util_pass_node_info(opal_buffer_t *buffer)
456449
{
457450
uint16_t *slots=NULL, slot = UINT16_MAX;
458-
uint8_t *flags=NULL, flag = UINT8_MAX, *topologies = NULL;
451+
uint8_t *flags=NULL, flag = UINT8_MAX;
459452
int8_t i8, ntopos;
460453
int rc, n, nbitmap, nstart;
461454
bool compressed, unislots = true, uniflags = true, unitopos = true;
462455
orte_node_t *nptr;
463456
opal_byte_object_t bo, *boptr;
464457
size_t sz, nslots;
465458
opal_buffer_t bucket;
466-
orte_tptr_trk_t *trk;
467-
opal_list_t topos;
468459
orte_topology_t *t;
469460

470461
/* make room for the number of slots on each node */
@@ -493,15 +484,18 @@ int orte_util_pass_node_info(opal_buffer_t *buffer)
493484
} else {
494485
nstart = 0;
495486
}
496-
OBJ_CONSTRUCT(&topos, opal_list_t);
497487
OBJ_CONSTRUCT(&bucket, opal_buffer_t);
488+
ntopos = 0;
498489
for (n=nstart; n < orte_node_topologies->size; n++) {
499490
if (NULL == (t = (orte_topology_t*)opal_pointer_array_get_item(orte_node_topologies, n))) {
500491
continue;
501492
}
502-
trk = OBJ_NEW(orte_tptr_trk_t);
503-
trk->t = t;
504-
opal_list_append(&topos, &trk->super);
493+
/* pack the index */
494+
if (ORTE_SUCCESS != (rc = opal_dss.pack(&bucket, &t->index, 1, OPAL_INT))) {
495+
ORTE_ERROR_LOG(rc);
496+
OBJ_DESTRUCT(&bucket);
497+
goto cleanup;
498+
}
505499
/* pack this topology string */
506500
if (ORTE_SUCCESS != (rc = opal_dss.pack(&bucket, &t->sig, 1, OPAL_STRING))) {
507501
ORTE_ERROR_LOG(rc);
@@ -514,36 +508,65 @@ int orte_util_pass_node_info(opal_buffer_t *buffer)
514508
OBJ_DESTRUCT(&bucket);
515509
goto cleanup;
516510
}
511+
++ntopos;
517512
}
518513
/* pack the number of topologies in allocation */
519-
ntopos = opal_list_get_size(&topos);
520514
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &ntopos, 1, OPAL_INT8))) {
521515
goto cleanup;
522516
}
523517
if (1 < ntopos) {
524518
/* need to send them along */
525-
opal_dss.copy_payload(buffer, &bucket);
526-
/* allocate space to report them */
527-
ntopos = orte_node_pool->size;
528-
topologies = (uint8_t*)malloc(ntopos);
519+
if (opal_compress.compress_block((uint8_t*)bucket.base_ptr, bucket.bytes_used,
520+
&bo.bytes, &sz)) {
521+
/* the data was compressed - mark that we compressed it */
522+
compressed = true;
523+
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &compressed, 1, OPAL_BOOL))) {
524+
ORTE_ERROR_LOG(rc);
525+
OBJ_DESTRUCT(&bucket);
526+
goto cleanup;
527+
}
528+
/* pack the uncompressed length */
529+
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &bucket.bytes_used, 1, OPAL_SIZE))) {
530+
ORTE_ERROR_LOG(rc);
531+
OBJ_DESTRUCT(&bucket);
532+
goto cleanup;
533+
}
534+
bo.size = sz;
535+
} else {
536+
/* mark that it was not compressed */
537+
compressed = false;
538+
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &compressed, 1, OPAL_BOOL))) {
539+
ORTE_ERROR_LOG(rc);
540+
OBJ_DESTRUCT(&bucket);
541+
goto cleanup;
542+
}
543+
opal_dss.unload(&bucket, (void**)&bo.bytes, &bo.size);
544+
}
529545
unitopos = false;
546+
/* pack the info */
547+
boptr = &bo;
548+
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &boptr, 1, OPAL_BYTE_OBJECT))) {
549+
ORTE_ERROR_LOG(rc);
550+
OBJ_DESTRUCT(&bucket);
551+
goto cleanup;
552+
}
553+
OBJ_DESTRUCT(&bucket);
554+
free(bo.bytes);
530555
}
531-
OBJ_DESTRUCT(&bucket);
532556

557+
/* construct the per-node info */
558+
OBJ_CONSTRUCT(&bucket, opal_buffer_t);
533559
for (n=0; n < orte_node_pool->size; n++) {
534560
if (NULL == (nptr = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, n))) {
535561
continue;
536562
}
537-
/* store the topology, if required */
563+
/* track the topology, if required */
538564
if (!unitopos) {
539-
topologies[n] = 0;
540-
if (0 == nstart || 0 < n) {
541-
OPAL_LIST_FOREACH(trk, &topos, orte_tptr_trk_t) {
542-
if (trk->t == nptr->topology) {
543-
break;
544-
}
545-
topologies[n]++;
546-
}
565+
i8 = nptr->topology->index;
566+
if (ORTE_SUCCESS != (rc = opal_dss.pack(&bucket, &i8, 1, OPAL_INT8))) {
567+
ORTE_ERROR_LOG(rc);
568+
OBJ_DESTRUCT(&bucket);
569+
goto cleanup;
547570
}
548571
}
549572
/* store the number of slots */
@@ -572,21 +595,19 @@ int orte_util_pass_node_info(opal_buffer_t *buffer)
572595

573596
/* deal with the topology assignments */
574597
if (!unitopos) {
575-
if (opal_compress.compress_block((uint8_t*)topologies, ntopos,
598+
if (opal_compress.compress_block((uint8_t*)bucket.base_ptr, bucket.bytes_used,
576599
(uint8_t**)&bo.bytes, &sz)) {
577600
/* mark that this was compressed */
578-
i8 = 1;
579601
compressed = true;
580602
bo.size = sz;
581603
} else {
582604
/* mark that this was not compressed */
583-
i8 = 0;
584605
compressed = false;
585-
bo.bytes = topologies;
586-
bo.size = nbitmap;
606+
bo.bytes = bucket.base_ptr;
607+
bo.size = bucket.bytes_used;
587608
}
588609
/* indicate compression */
589-
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &i8, 1, OPAL_INT8))) {
610+
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &compressed, 1, OPAL_BOOL))) {
590611
if (compressed) {
591612
free(bo.bytes);
592613
}
@@ -607,6 +628,7 @@ int orte_util_pass_node_info(opal_buffer_t *buffer)
607628
free(bo.bytes);
608629
}
609630
}
631+
OBJ_DESTRUCT(&bucket);
610632

611633
/* if we have uniform #slots, then just flag it - no
612634
* need to pass anything */
@@ -713,16 +735,19 @@ int orte_util_pass_node_info(opal_buffer_t *buffer)
713735
int orte_util_parse_node_info(opal_buffer_t *buf)
714736
{
715737
int8_t i8;
716-
int rc = ORTE_SUCCESS, cnt, n, m;
738+
bool compressed;
739+
int rc = ORTE_SUCCESS, cnt, n, m, index;
717740
orte_node_t *nptr;
718741
size_t sz;
719742
opal_byte_object_t *boptr;
720743
uint16_t *slots = NULL;
721744
uint8_t *flags = NULL;
722745
uint8_t *topologies = NULL;
723-
orte_topology_t *t2, **tps = NULL;
746+
uint8_t *bytes = NULL;
747+
orte_topology_t *t2;
724748
hwloc_topology_t topo;
725749
char *sig;
750+
opal_buffer_t bucket;
726751

727752
/* check to see if we have uniform topologies */
728753
cnt = 1;
@@ -733,43 +758,104 @@ int orte_util_parse_node_info(opal_buffer_t *buf)
733758
/* we already defaulted to uniform topology, so only need to
734759
* process this if it is non-uniform */
735760
if (1 < i8) {
736-
/* create an array to cache these */
737-
tps = (orte_topology_t**)malloc(sizeof(orte_topology_t*));
761+
/* unpack the compression flag */
762+
cnt = 1;
763+
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &compressed, &cnt, OPAL_BOOL))) {
764+
ORTE_ERROR_LOG(rc);
765+
goto cleanup;
766+
}
767+
if (compressed) {
768+
/* get the uncompressed size */
769+
cnt = 1;
770+
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &sz, &cnt, OPAL_SIZE))) {
771+
ORTE_ERROR_LOG(rc);
772+
goto cleanup;
773+
}
774+
}
775+
/* unpack the topology object */
776+
cnt = 1;
777+
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &boptr, &cnt, OPAL_BYTE_OBJECT))) {
778+
ORTE_ERROR_LOG(rc);
779+
goto cleanup;
780+
}
781+
782+
/* if compressed, decompress */
783+
if (compressed) {
784+
if (!opal_compress.decompress_block((uint8_t**)&bytes, sz,
785+
boptr->bytes, boptr->size)) {
786+
ORTE_ERROR_LOG(ORTE_ERROR);
787+
if (NULL != boptr->bytes) {
788+
free(boptr->bytes);
789+
}
790+
free(boptr);
791+
rc = ORTE_ERROR;
792+
goto cleanup;
793+
}
794+
} else {
795+
bytes = (uint8_t*)boptr->bytes;
796+
sz = boptr->size;
797+
boptr->bytes = NULL;
798+
boptr->size = 0;
799+
}
800+
if (NULL != boptr->bytes) {
801+
free(boptr->bytes);
802+
}
803+
/* setup to unpack */
804+
OBJ_CONSTRUCT(&bucket, opal_buffer_t);
805+
opal_dss.load(&bucket, bytes, sz);
806+
738807
for (n=0; n < i8; n++) {
808+
/* unpack the index */
739809
cnt = 1;
740-
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &sig, &cnt, OPAL_STRING))) {
810+
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&bucket, &index, &cnt, OPAL_INT))) {
741811
ORTE_ERROR_LOG(rc);
742812
goto cleanup;
743813
}
814+
/* unpack the signature */
744815
cnt = 1;
745-
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &topo, &cnt, OPAL_HWLOC_TOPO))) {
816+
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&bucket, &sig, &cnt, OPAL_STRING))) {
746817
ORTE_ERROR_LOG(rc);
747818
goto cleanup;
748819
}
749-
/* new topology - record it */
820+
/* unpack the topology */
821+
cnt = 1;
822+
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&bucket, &topo, &cnt, OPAL_HWLOC_TOPO))) {
823+
ORTE_ERROR_LOG(rc);
824+
goto cleanup;
825+
}
826+
/* record it */
750827
t2 = OBJ_NEW(orte_topology_t);
828+
t2->index = index;
751829
t2->sig = sig;
752830
t2->topo = topo;
753-
opal_pointer_array_add(orte_node_topologies, t2);
754-
/* keep a cached copy */
755-
tps[n] = t2;
831+
opal_pointer_array_set_item(orte_node_topologies, index, t2);
756832
}
833+
OBJ_DESTRUCT(&bucket);
834+
757835
/* now get the array of assigned topologies */
758-
/* if compressed, get the uncompressed size */
836+
/* unpack the compression flag */
759837
cnt = 1;
760-
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &sz, &cnt, OPAL_SIZE))) {
838+
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &compressed, &cnt, OPAL_BOOL))) {
761839
ORTE_ERROR_LOG(rc);
762840
goto cleanup;
763841
}
842+
if (compressed) {
843+
/* get the uncompressed size */
844+
cnt = 1;
845+
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &sz, &cnt, OPAL_SIZE))) {
846+
ORTE_ERROR_LOG(rc);
847+
goto cleanup;
848+
}
849+
}
764850
/* unpack the topologies object */
765851
cnt = 1;
766852
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &boptr, &cnt, OPAL_BYTE_OBJECT))) {
767853
ORTE_ERROR_LOG(rc);
768854
goto cleanup;
769855
}
770856
/* if compressed, decompress */
771-
if (1 == i8) {
772-
if (!opal_compress.decompress_block((uint8_t**)&topologies, sz,
857+
if (compressed) {
858+
if (!opal_compress.decompress_block((uint8_t**)&bytes, sz,
773859
boptr->bytes, boptr->size)) {
774860
ORTE_ERROR_LOG(ORTE_ERROR);
775861
if (NULL != boptr->bytes) {
@@ -780,19 +866,27 @@ int orte_util_parse_node_info(opal_buffer_t *buf)
780866
goto cleanup;
781867
}
782868
} else {
783-
topologies = (uint8_t*)boptr->bytes;
869+
bytes = (uint8_t*)boptr->bytes;
870+
sz = boptr->size;
784871
boptr->bytes = NULL;
785872
boptr->size = 0;
786873
}
787874
if (NULL != boptr->bytes) {
788875
free(boptr->bytes);
789876
}
790877
free(boptr);
878+
OBJ_CONSTRUCT(&bucket, opal_buffer_t);
879+
opal_dss.load(&bucket, bytes, sz);
791880
/* cycle across the node pool and assign the values */
792-
for (n=0, m=0; n < orte_node_pool->size; n++) {
881+
for (n=0; n < orte_node_pool->size; n++) {
793882
if (NULL != (nptr = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, n))) {
794-
nptr->topology = tps[topologies[m]];
795-
++m;
883+
/* unpack the next topology index */
884+
cnt = 1;
885+
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&bucket, &i8, &cnt, OPAL_INT8))) {
886+
ORTE_ERROR_LOG(rc);
887+
goto cleanup;
888+
}
889+
nptr->topology = opal_pointer_array_get_item(orte_node_topologies, index);
796890
}
797891
}
798892
}
@@ -932,9 +1026,6 @@ int orte_util_parse_node_info(opal_buffer_t *buf)
9321026
if (NULL != flags) {
9331027
free(flags);
9341028
}
935-
if (NULL != tps) {
936-
free(tps);
937-
}
9381029
if (NULL != topologies) {
9391030
free(topologies);
9401031
}

0 commit comments

Comments
 (0)