22
22
#include "ompi/communicator/communicator.h"
23
23
#include "ompi/datatype/ompi_datatype.h"
24
24
#include "ompi/op/op.h"
25
+ #include "opal/util/bit_ops.h"
25
26
26
27
#include <assert.h>
27
28
@@ -33,6 +34,10 @@ static inline int allred_sched_ring(int rank, int p, int count, MPI_Datatype dat
33
34
static inline int allred_sched_linear (int rank , int p , const void * sendbuf , void * recvbuf , int count ,
34
35
MPI_Datatype datatype , ptrdiff_t gap , MPI_Op op , int ext , int size ,
35
36
NBC_Schedule * schedule , void * tmpbuf );
37
+ static inline int allred_sched_redscat_allgather (
38
+ int rank , int comm_size , int count , MPI_Datatype datatype , ptrdiff_t gap ,
39
+ const void * sbuf , void * rbuf , MPI_Op op , char inplace ,
40
+ NBC_Schedule * schedule , void * tmpbuf , struct ompi_communicator_t * comm );
36
41
37
42
#ifdef NBC_CACHE_SCHEDULE
38
43
/* tree comparison function for schedule cache */
@@ -64,7 +69,7 @@ static int nbc_allreduce_init(const void* sendbuf, void* recvbuf, int count, MPI
64
69
#ifdef NBC_CACHE_SCHEDULE
65
70
NBC_Allreduce_args * args , * found , search ;
66
71
#endif
67
- enum { NBC_ARED_BINOMIAL , NBC_ARED_RING } alg ;
72
+ enum { NBC_ARED_BINOMIAL , NBC_ARED_RING , NBC_ARED_REDSCAT_ALLGATHER } alg ;
68
73
char inplace ;
69
74
void * tmpbuf = NULL ;
70
75
ompi_coll_libnbc_module_t * libnbc_module = (ompi_coll_libnbc_module_t * ) module ;
@@ -105,12 +110,25 @@ static int nbc_allreduce_init(const void* sendbuf, void* recvbuf, int count, MPI
105
110
}
106
111
107
112
/* algorithm selection */
108
- if (p < 4 || size * count < 65536 || !ompi_op_is_commute (op ) || inplace ) {
109
- alg = NBC_ARED_BINOMIAL ;
113
+ int nprocs_pof2 = opal_next_poweroftwo (p ) >> 1 ;
114
+ if (libnbc_iallreduce_algorithm == 0 ) {
115
+ if (p < 4 || size * count < 65536 || !ompi_op_is_commute (op ) || inplace ) {
116
+ alg = NBC_ARED_BINOMIAL ;
117
+ } else if (count >= nprocs_pof2 && ompi_op_is_commute (op )) {
118
+ alg = NBC_ARED_REDSCAT_ALLGATHER ;
119
+ } else {
120
+ alg = NBC_ARED_RING ;
121
+ }
110
122
} else {
111
- alg = NBC_ARED_RING ;
123
+ if (libnbc_iallreduce_algorithm == 1 )
124
+ alg = NBC_ARED_RING ;
125
+ else if (libnbc_iallreduce_algorithm == 2 )
126
+ alg = NBC_ARED_BINOMIAL ;
127
+ else if (libnbc_iallreduce_algorithm == 3 && count >= nprocs_pof2 && ompi_op_is_commute (op )) {
128
+ alg = NBC_ARED_REDSCAT_ALLGATHER ;
129
+ } else
130
+ alg = NBC_ARED_RING ;
112
131
}
113
-
114
132
#ifdef NBC_CACHE_SCHEDULE
115
133
/* search schedule in communicator specific tree */
116
134
search .sendbuf = sendbuf ;
@@ -135,6 +153,9 @@ static int nbc_allreduce_init(const void* sendbuf, void* recvbuf, int count, MPI
135
153
case NBC_ARED_BINOMIAL :
136
154
res = allred_sched_diss (rank , p , count , datatype , gap , sendbuf , recvbuf , op , inplace , schedule , tmpbuf );
137
155
break ;
156
+ case NBC_ARED_REDSCAT_ALLGATHER :
157
+ res = allred_sched_redscat_allgather (rank , p , count , datatype , gap , sendbuf , recvbuf , op , inplace , schedule , tmpbuf , comm );
158
+ break ;
138
159
case NBC_ARED_RING :
139
160
res = allred_sched_ring (rank , p , count , datatype , sendbuf , recvbuf , op , size , ext , schedule , tmpbuf );
140
161
break ;
@@ -735,6 +756,271 @@ static inline int allred_sched_linear(int rank, int rsize, const void *sendbuf,
735
756
return OMPI_SUCCESS ;
736
757
}
737
758
759
+ /*
760
+ * allred_sched_redscat_allgather:
761
+ *
762
+ * Description: an implementation of Rabenseifner's Allreduce algorithm [1, 2].
763
+ * [1] Rajeev Thakur, Rolf Rabenseifner and William Gropp.
764
+ * Optimization of Collective Communication Operations in MPICH //
765
+ * The Int. Journal of High Performance Computing Applications. Vol 19,
766
+ * Issue 1, pp. 49--66.
767
+ * [2] http://www.hlrs.de/mpi/myreduce.html.
768
+ *
769
+ * This algorithm is a combination of a reduce-scatter implemented with
770
+ * recursive vector halving and recursive distance doubling, followed either
771
+ * by an allgather implemented with recursive doubling.
772
+ *
773
+ * Step 1. If the number of processes is not a power of two, reduce it to
774
+ * the nearest lower power of two (p' = 2^{\floor{\log_2 p}})
775
+ * by removing r = p - p' extra processes as follows. In the first 2r processes
776
+ * (ranks 0 to 2r - 1), all the even ranks send the second half of the input
777
+ * vector to their right neighbor (rank + 1), and all the odd ranks send
778
+ * the first half of the input vector to their left neighbor (rank - 1).
779
+ * The even ranks compute the reduction on the first half of the vector and
780
+ * the odd ranks compute the reduction on the second half. The odd ranks then
781
+ * send the result to their left neighbors (the even ranks). As a result,
782
+ * the even ranks among the first 2r processes now contain the reduction with
783
+ * the input vector on their right neighbors (the odd ranks). These odd ranks
784
+ * do not participate in the rest of the algorithm, which leaves behind
785
+ * a power-of-two number of processes. The first r even-ranked processes and
786
+ * the last p - 2r processes are now renumbered from 0 to p' - 1.
787
+ *
788
+ * Step 2. The remaining processes now perform a reduce-scatter by using
789
+ * recursive vector halving and recursive distance doubling. The even-ranked
790
+ * processes send the second half of their buffer to rank + 1 and the odd-ranked
791
+ * processes send the first half of their buffer to rank - 1. All processes
792
+ * then compute the reduction between the local buffer and the received buffer.
793
+ * In the next log_2(p') - 1 steps, the buffers are recursively halved, and the
794
+ * distance is doubled. At the end, each of the p' processes has 1 / p' of the
795
+ * total reduction result.
796
+ *
797
+ * Step 3. An allgather is performed by using recursive vector doubling and
798
+ * distance halving. All exchanges are executed in reverse order relative
799
+ * to recursive doubling on previous step. If the number of processes is not
800
+ * a power of two, the total result vector must be sent to the r processes
801
+ * that were removed in the first step.
802
+ *
803
+ * Limitations:
804
+ * count >= 2^{\floor{\log_2 p}}
805
+ * commutative operations only
806
+ * intra-communicators only
807
+ *
808
+ * Memory requirements (per process):
809
+ * count * typesize + 4 * \log_2(p) * sizeof(int) = O(count)
810
+ *
811
+ * Schedule length (rounds): O(\log(p))
812
+ */
813
+ static inline int allred_sched_redscat_allgather (
814
+ int rank , int comm_size , int count , MPI_Datatype datatype , ptrdiff_t gap ,
815
+ const void * sbuf , void * rbuf , MPI_Op op , char inplace ,
816
+ NBC_Schedule * schedule , void * tmpbuf , struct ompi_communicator_t * comm )
817
+ {
818
+ int res = OMPI_SUCCESS ;
819
+ int * rindex = NULL , * rcount = NULL , * sindex = NULL , * scount = NULL ;
820
+ /* Find nearest power-of-two less than or equal to comm_size */
821
+ int nsteps = opal_hibit (comm_size , comm -> c_cube_dim + 1 ); /* ilog2(comm_size) */
822
+ int nprocs_pof2 = 1 << nsteps ; /* flp2(comm_size) */
823
+ if (!inplace ) {
824
+ res = NBC_Sched_copy ((char * )sbuf , false, count , datatype ,
825
+ rbuf , false, count , datatype , schedule , true);
826
+ if (OPAL_UNLIKELY (OMPI_SUCCESS != res )) { goto cleanup_and_return ; }
827
+ }
828
+ char * tmp_buf = (char * )tmpbuf - gap ;
829
+ ptrdiff_t lb , extent ;
830
+ ompi_datatype_get_extent (datatype , & lb , & extent );
831
+ /*
832
+ * Step 1. Reduce the number of processes to the nearest lower power of two
833
+ * p' = 2^{\floor{\log_2 p}} by removing r = p - p' processes.
834
+ * 1. In the first 2r processes (ranks 0 to 2r - 1), all the even ranks send
835
+ * the second half of the input vector to their right neighbor (rank + 1)
836
+ * and all the odd ranks send the first half of the input vector to their
837
+ * left neighbor (rank - 1).
838
+ * 2. All 2r processes compute the reduction on their half.
839
+ * 3. The odd ranks then send the result to their left neighbors
840
+ * (the even ranks).
841
+ *
842
+ * The even ranks (0 to 2r - 1) now contain the reduction with the input
843
+ * vector on their right neighbors (the odd ranks). The first r even
844
+ * processes and the p - 2r last processes are renumbered from
845
+ * 0 to 2^{\floor{\log_2 p}} - 1.
846
+ */
847
+ int vrank , step , wsize ;
848
+ int nprocs_rem = comm_size - nprocs_pof2 ;
849
+ if (rank < 2 * nprocs_rem ) {
850
+ int count_lhalf = count / 2 ;
851
+ int count_rhalf = count - count_lhalf ;
852
+ if (rank % 2 != 0 ) {
853
+ /*
854
+ * Odd process -- exchange with rank - 1
855
+ * Send the left half of the input vector to the left neighbor,
856
+ * Recv the right half of the input vector from the left neighbor
857
+ */
858
+ res = NBC_Sched_send (rbuf , false, count_lhalf , datatype , rank - 1 ,
859
+ schedule , false);
860
+ if (OPAL_UNLIKELY (OMPI_SUCCESS != res )) { goto cleanup_and_return ; }
861
+ res = NBC_Sched_recv (tmp_buf + (ptrdiff_t )count_lhalf * extent ,
862
+ false, count_rhalf , datatype , rank - 1 , schedule , true);
863
+ if (OPAL_UNLIKELY (OMPI_SUCCESS != res )) { goto cleanup_and_return ; }
864
+ res = NBC_Sched_op (tmp_buf + (ptrdiff_t )count_lhalf * extent ,
865
+ false, (char * )rbuf + (ptrdiff_t )count_lhalf * extent ,
866
+ false, count_rhalf , datatype , op , schedule , true);
867
+ if (OPAL_UNLIKELY (OMPI_SUCCESS != res )) { goto cleanup_and_return ; }
868
+ /* Send the right half to the left neighbor */
869
+ res = NBC_Sched_send ((char * )rbuf + (ptrdiff_t )count_lhalf * extent ,
870
+ false, count_rhalf , datatype , rank - 1 , schedule , true);
871
+ if (OPAL_UNLIKELY (OMPI_SUCCESS != res )) { goto cleanup_and_return ; }
872
+ /* This process does not participate in recursive doubling phase */
873
+ vrank = -1 ;
874
+ } else {
875
+ /*
876
+ * Even process -- exchange with rank + 1
877
+ * Send the right half of the input vector to the right neighbor,
878
+ * Recv the left half of the input vector from the right neighbor
879
+ */
880
+ res = NBC_Sched_send ((char * )rbuf + (ptrdiff_t )count_lhalf * extent ,
881
+ false, count_rhalf , datatype , rank + 1 , schedule , false);
882
+ if (OPAL_UNLIKELY (OMPI_SUCCESS != res )) { goto cleanup_and_return ; }
883
+ res = NBC_Sched_recv (tmp_buf , false, count_lhalf , datatype , rank + 1 ,
884
+ schedule , true);
885
+ if (OPAL_UNLIKELY (OMPI_SUCCESS != res )) { goto cleanup_and_return ; }
886
+ res = NBC_Sched_op (tmp_buf , false, rbuf , false, count_lhalf ,
887
+ datatype , op , schedule , true);
888
+ if (OPAL_UNLIKELY (OMPI_SUCCESS != res )) { goto cleanup_and_return ; }
889
+ /* Recv the right half from the right neighbor */
890
+ res = NBC_Sched_recv ((char * )rbuf + (ptrdiff_t )count_lhalf * extent ,
891
+ false, count_rhalf , datatype , rank + 1 , schedule , true);
892
+ if (OPAL_UNLIKELY (OMPI_SUCCESS != res )) { goto cleanup_and_return ; }
893
+ vrank = rank / 2 ;
894
+ }
895
+ } else { /* rank >= 2 * nprocs_rem */
896
+ vrank = rank - nprocs_rem ;
897
+ }
898
+ /*
899
+ * Step 2. Reduce-scatter implemented with recursive vector halving and
900
+ * recursive distance doubling. We have p' = 2^{\floor{\log_2 p}}
901
+ * power-of-two number of processes with new ranks (vrank) and result in rbuf.
902
+ *
903
+ * The even-ranked processes send the right half of their buffer to rank + 1
904
+ * and the odd-ranked processes send the left half of their buffer to
905
+ * rank - 1. All processes then compute the reduction between the local
906
+ * buffer and the received buffer. In the next \log_2(p') - 1 steps, the
907
+ * buffers are recursively halved, and the distance is doubled. At the end,
908
+ * each of the p' processes has 1 / p' of the total reduction result.
909
+ */
910
+ rindex = malloc (sizeof (* rindex ) * nsteps );
911
+ sindex = malloc (sizeof (* sindex ) * nsteps );
912
+ rcount = malloc (sizeof (* rcount ) * nsteps );
913
+ scount = malloc (sizeof (* scount ) * nsteps );
914
+ if (NULL == rindex || NULL == sindex || NULL == rcount || NULL == scount ) {
915
+ res = OMPI_ERR_OUT_OF_RESOURCE ;
916
+ goto cleanup_and_return ;
917
+ }
918
+ if (vrank != -1 ) {
919
+ step = 0 ;
920
+ wsize = count ;
921
+ sindex [0 ] = rindex [0 ] = 0 ;
922
+ for (int mask = 1 ; mask < nprocs_pof2 ; mask <<= 1 ) {
923
+ /*
924
+ * On each iteration: rindex[step] = sindex[step] -- begining of the
925
+ * current window. Length of the current window is storded in wsize.
926
+ */
927
+ int vdest = vrank ^ mask ;
928
+ /* Translate vdest virtual rank to real rank */
929
+ int dest = (vdest < nprocs_rem ) ? vdest * 2 : vdest + nprocs_rem ;
930
+ if (rank < dest ) {
931
+ /*
932
+ * Recv into the left half of the current window, send the right
933
+ * half of the window to the peer (perform reduce on the left
934
+ * half of the current window)
935
+ */
936
+ rcount [step ] = wsize / 2 ;
937
+ scount [step ] = wsize - rcount [step ];
938
+ sindex [step ] = rindex [step ] + rcount [step ];
939
+ } else {
940
+ /*
941
+ * Recv into the right half of the current window, send the left
942
+ * half of the window to the peer (perform reduce on the right
943
+ * half of the current window)
944
+ */
945
+ scount [step ] = wsize / 2 ;
946
+ rcount [step ] = wsize - scount [step ];
947
+ rindex [step ] = sindex [step ] + scount [step ];
948
+ }
949
+ /* Send part of data from the rbuf, recv into the tmp_buf */
950
+ res = NBC_Sched_send ((char * )rbuf + (ptrdiff_t )sindex [step ] * extent ,
951
+ false, scount [step ], datatype , dest , schedule , false);
952
+ if (OPAL_UNLIKELY (OMPI_SUCCESS != res )) { goto cleanup_and_return ; }
953
+ res = NBC_Sched_recv ((char * )tmp_buf + (ptrdiff_t )rindex [step ] * extent ,
954
+ false, rcount [step ], datatype , dest , schedule , true);
955
+ if (OPAL_UNLIKELY (OMPI_SUCCESS != res )) { goto cleanup_and_return ; }
956
+ /* Local reduce: rbuf[] = tmp_buf[] <op> rbuf[] */
957
+ res = NBC_Sched_op ((char * )tmp_buf + (ptrdiff_t )rindex [step ] * extent ,
958
+ false, (char * )rbuf + (ptrdiff_t )rindex [step ] * extent ,
959
+ false, rcount [step ], datatype , op , schedule , true);
960
+ if (OPAL_UNLIKELY (OMPI_SUCCESS != res )) { goto cleanup_and_return ; }
961
+ /* Move the current window to the received message */
962
+ if (step + 1 < nsteps ) {
963
+ rindex [step + 1 ] = rindex [step ];
964
+ sindex [step + 1 ] = rindex [step ];
965
+ wsize = rcount [step ];
966
+ step ++ ;
967
+ }
968
+ }
969
+ /*
970
+ * Assertion: each process has 1 / p' of the total reduction result:
971
+ * rcount[nsteps - 1] elements in the rbuf[rindex[nsteps - 1], ...].
972
+ */
973
+ /*
974
+ * Step 3. Allgather by the recursive doubling algorithm.
975
+ * Each process has 1 / p' of the total reduction result:
976
+ * rcount[nsteps - 1] elements in the rbuf[rindex[nsteps - 1], ...].
977
+ * All exchanges are executed in reverse order relative
978
+ * to recursive doubling (previous step).
979
+ */
980
+ step = nsteps - 1 ;
981
+ for (int mask = nprocs_pof2 >> 1 ; mask > 0 ; mask >>= 1 ) {
982
+ int vdest = vrank ^ mask ;
983
+ /* Translate vdest virtual rank to real rank */
984
+ int dest = (vdest < nprocs_rem ) ? vdest * 2 : vdest + nprocs_rem ;
985
+ /*
986
+ * Send rcount[step] elements from rbuf[rindex[step]...]
987
+ * Recv scount[step] elements to rbuf[sindex[step]...]
988
+ */
989
+ res = NBC_Sched_send ((char * )rbuf + (ptrdiff_t )rindex [step ] * extent ,
990
+ false, rcount [step ], datatype , dest , schedule , false);
991
+ if (OPAL_UNLIKELY (OMPI_SUCCESS != res )) { goto cleanup_and_return ; }
992
+ res = NBC_Sched_recv ((char * )rbuf + (ptrdiff_t )sindex [step ] * extent ,
993
+ false, scount [step ], datatype , dest , schedule , true);
994
+ if (OPAL_UNLIKELY (OMPI_SUCCESS != res )) { goto cleanup_and_return ; }
995
+ step -- ;
996
+ }
997
+ }
998
+ /*
999
+ * Step 4. Send total result to excluded odd ranks.
1000
+ */
1001
+ if (rank < 2 * nprocs_rem ) {
1002
+ if (rank % 2 != 0 ) {
1003
+ /* Odd process -- recv result from rank - 1 */
1004
+ res = NBC_Sched_recv (rbuf , false, count , datatype , rank - 1 , schedule , false);
1005
+ if (OPAL_UNLIKELY (OMPI_SUCCESS != res )) { goto cleanup_and_return ; }
1006
+ } else {
1007
+ /* Even process -- send result to rank + 1 */
1008
+ res = NBC_Sched_send (rbuf , false, count , datatype , rank + 1 , schedule , false);
1009
+ if (OPAL_UNLIKELY (OMPI_SUCCESS != res )) { goto cleanup_and_return ; }
1010
+ }
1011
+ }
1012
+ cleanup_and_return :
1013
+ if (NULL != rindex )
1014
+ free (rindex );
1015
+ if (NULL != sindex )
1016
+ free (sindex );
1017
+ if (NULL != rcount )
1018
+ free (rcount );
1019
+ if (NULL != scount )
1020
+ free (scount );
1021
+ return res ;
1022
+ }
1023
+
738
1024
int ompi_coll_libnbc_allreduce_init (const void * sendbuf , void * recvbuf , int count , MPI_Datatype datatype , MPI_Op op ,
739
1025
struct ompi_communicator_t * comm , MPI_Info info , ompi_request_t * * request ,
740
1026
struct mca_coll_base_module_2_3_0_t * module ) {
0 commit comments