@@ -1247,133 +1247,99 @@ int ompi_coll_base_allreduce_intra_redscat_allgather(
1247
1247
return err ;
1248
1248
}
1249
1249
1250
- /**
1251
- * A greedy algorithm to exchange data among processes in the communicator via
1252
- * an allgather pattern, followed by a local reduction on each process. This
1253
- * avoids the round trip in a rooted communication pattern, e.g. reduce on the
1254
- * root and then broadcast to peers.
1250
+ /*
1251
+ * ompi_coll_base_allreduce_intra_allgather_reduce
1252
+ *
1253
+ * Function: use allgather for allreduce operation
1254
+ * Accepts: Same as MPI_Allreduce()
1255
+ * Returns: MPI_SUCCESS or error code
1255
1256
*
1256
- * This algorithm supports both commutative and non-commutative MPI operations.
1257
- * For non-commutative operations the reduction is applied to the data in the
1258
- * same rank order, e.g. rank 0, rank 1, ... rank N, on each process.
1257
+ * Description: Implements allgather based allreduce aimed to improve internode
1258
+ * allreduce latency: this method takes advantage of the send and
1259
+ * receive can happen at the same time; first step is allgather
1260
+ * operation to allow all ranks to obtain the full dataset; the second
1261
+ * step is to do reduction on all ranks to get allreduce result.
1259
1262
*
1260
- * This algorithm benefits inter-node allreduce over a high-latency network.
1261
- * Caution is needed on larger communicators(n) and data sizes(m), which will
1262
- * result in m*n^2 total traffic and potential network congestion .
1263
+ * Limitations: This method is designed for small message sizes allreduce because it
1264
+ * is not efficient in terms of network bandwidth comparing
1265
+ * to gather/reduce/bcast type of approach .
1263
1266
*/
1264
1267
int ompi_coll_base_allreduce_intra_allgather_reduce (const void * sbuf , void * rbuf , int count ,
1265
1268
struct ompi_datatype_t * dtype ,
1266
1269
struct ompi_op_t * op ,
1267
1270
struct ompi_communicator_t * comm ,
1268
1271
mca_coll_base_module_t * module )
1269
1272
{
1270
- char * send_buf = (void * ) sbuf ;
1271
- const int comm_size = ompi_comm_size (comm );
1272
- const int rank = ompi_comm_rank (comm );
1273
- int err = MPI_SUCCESS ;
1274
- ompi_request_t * * reqs ;
1275
-
1276
- if (sbuf == MPI_IN_PLACE ) {
1277
- send_buf = rbuf ;
1278
- }
1279
-
1280
- /* Allocate a large-enough buffer to receive from everyone else */
1281
- char * tmp_buf = NULL , * tmp_buf_raw = NULL , * tmp_recv = NULL ;
1282
- ptrdiff_t lb , extent , dsize , gap = 0 ;
1273
+ int line = -1 ;
1274
+ char * partial_buf = NULL ;
1275
+ char * partial_buf_start = NULL ;
1276
+ char * sendtmpbuf = NULL ;
1277
+ char * tmpsend = NULL ;
1278
+ char * tmpsend_start = NULL ;
1279
+ int err = OMPI_SUCCESS ;
1280
+
1281
+ ptrdiff_t extent , lb ;
1283
1282
ompi_datatype_get_extent (dtype , & lb , & extent );
1284
- dsize = opal_datatype_span (& dtype -> super , count * comm_size , & gap );
1285
- tmp_buf_raw = (char * ) malloc (dsize );
1286
- if (NULL == tmp_buf_raw ) {
1287
- return OMPI_ERR_OUT_OF_RESOURCE ;
1288
- }
1289
1283
1290
- tmp_buf = tmp_buf_raw - gap ;
1291
-
1292
- /* Requests for send to AND receive from everyone else */
1293
- int reqs_needed = (comm_size - 1 ) * 2 ;
1294
- reqs = ompi_coll_base_comm_get_reqs (module -> base_data , reqs_needed );
1295
-
1296
- const ptrdiff_t incr = extent * count ;
1297
-
1298
- /* Exchange data with peer processes, excluding self */
1299
- int req_index = 0 , peer_rank = 0 ;
1300
- for (int i = 1 ; i < comm_size ; ++ i ) {
1301
- /* Start at the next rank */
1302
- peer_rank = (rank + i ) % comm_size ;
1303
-
1304
- /* Prepare for the next receive buffer */
1305
- if (0 == peer_rank && rbuf != send_buf ) {
1306
- /* Optimization for Rank 0 - its data will always be placed at the beginning of local
1307
- * reduce output buffer.
1308
- */
1309
- tmp_recv = rbuf ;
1310
- } else {
1311
- tmp_recv = tmp_buf + (peer_rank * incr );
1312
- }
1313
-
1314
- err = MCA_PML_CALL (irecv (tmp_recv , count , dtype , peer_rank , MCA_COLL_BASE_TAG_ALLREDUCE ,
1315
- comm , & reqs [req_index ++ ]));
1316
- if (MPI_SUCCESS != err ) {
1317
- goto err_hndl ;
1318
- }
1284
+ int rank = ompi_comm_rank (comm );
1285
+ int size = ompi_comm_size (comm );
1319
1286
1320
- err = MCA_PML_CALL (isend (send_buf , count , dtype , peer_rank , MCA_COLL_BASE_TAG_ALLREDUCE ,
1321
- MCA_PML_BASE_SEND_STANDARD , comm , & reqs [req_index ++ ]));
1322
- if (MPI_SUCCESS != err ) {
1323
- goto err_hndl ;
1324
- }
1287
+ sendtmpbuf = (char * ) sbuf ;
1288
+ if ( sbuf == MPI_IN_PLACE ) {
1289
+ sendtmpbuf = (char * )rbuf ;
1325
1290
}
1326
-
1327
- err = ompi_request_wait_all (req_index , reqs , MPI_STATUSES_IGNORE );
1328
-
1329
- /**
1330
- * Prepare for local reduction by moving Rank 0's data to rbuf.
1331
- * Previously we tried to receive Rank 0's data in rbuf, but we need to handle
1332
- * the following special cases.
1333
- */
1334
- if (0 != rank && rbuf == send_buf ) {
1335
- /* For inplace reduction copy out the send_buf before moving Rank 0's data */
1336
- ompi_datatype_copy_content_same_ddt (dtype , count , (char * ) tmp_buf + (rank * incr ),
1337
- send_buf );
1338
- ompi_datatype_copy_content_same_ddt (dtype , count , (char * ) rbuf , (char * ) tmp_buf );
1339
- } else if (0 == rank && rbuf != send_buf ) {
1340
- /* For Rank 0 we need to copy the send_buf to rbuf manually */
1341
- ompi_datatype_copy_content_same_ddt (dtype , count , (char * ) rbuf , (char * ) send_buf );
1291
+ ptrdiff_t buf_size , gap = 0 ;
1292
+ buf_size = opal_datatype_span (& dtype -> super , (int64_t )count * size , & gap );
1293
+ partial_buf = (char * ) malloc (buf_size );
1294
+ partial_buf_start = partial_buf - gap ;
1295
+ buf_size = opal_datatype_span (& dtype -> super , (int64_t )count , & gap );
1296
+ tmpsend = (char * ) malloc (buf_size );
1297
+ tmpsend_start = tmpsend - gap ;
1298
+
1299
+ err = ompi_datatype_copy_content_same_ddt (dtype , count ,
1300
+ (char * )tmpsend_start ,
1301
+ (char * )sendtmpbuf );
1302
+ if (MPI_SUCCESS != err ) { line = __LINE__ ; goto err_hndl ; }
1303
+
1304
+ // apply allgather data so that each rank has a full copy to do reduce (trade bandwidth for better latency)
1305
+ err = comm -> c_coll -> coll_allgather (tmpsend_start , count , dtype ,
1306
+ partial_buf_start , count , dtype ,
1307
+ comm , comm -> c_coll -> coll_allgather_module );
1308
+ if (MPI_SUCCESS != err ) { line = __LINE__ ; goto err_hndl ; }
1309
+
1310
+ for (int target = 1 ; target < size ; target ++ ) {
1311
+ ompi_op_reduce (op ,
1312
+ partial_buf_start + (ptrdiff_t )target * count * extent ,
1313
+ partial_buf_start ,
1314
+ count ,
1315
+ dtype );
1342
1316
}
1343
1317
1344
- /* Now do local reduction - Rank 0's data is already in rbuf so start from Rank 1 */
1345
- char * inbuf = NULL ;
1346
- for (peer_rank = 1 ; peer_rank < comm_size ; peer_rank ++ ) {
1347
- if (rank == peer_rank && rbuf != send_buf ) {
1348
- inbuf = send_buf ;
1349
- } else {
1350
- inbuf = tmp_buf + (peer_rank * incr );
1351
- }
1352
- ompi_op_reduce (op , (void * ) inbuf , rbuf , count , dtype );
1353
- }
1318
+ // move data to rbuf
1319
+ err = ompi_datatype_copy_content_same_ddt (dtype , count ,
1320
+ (char * )rbuf ,
1321
+ (char * )partial_buf_start );
1322
+ if (MPI_SUCCESS != err ) { line = __LINE__ ; goto err_hndl ; }
1354
1323
1355
- err_hndl :
1356
- if (NULL != tmp_buf_raw )
1357
- free ( tmp_buf_raw ) ;
1324
+ if ( NULL != partial_buf ) free ( partial_buf );
1325
+ if (NULL != tmpsend ) free ( tmpsend );
1326
+ return MPI_SUCCESS ;
1358
1327
1359
- if (NULL != reqs ) {
1360
- if (MPI_ERR_IN_STATUS == err ) {
1361
- for (int i = 0 ; i < reqs_needed ; i ++ ) {
1362
- if (MPI_REQUEST_NULL == reqs [i ])
1363
- continue ;
1364
- if (MPI_ERR_PENDING == reqs [i ]-> req_status .MPI_ERROR )
1365
- continue ;
1366
- if (MPI_SUCCESS != reqs [i ]-> req_status .MPI_ERROR ) {
1367
- err = reqs [i ]-> req_status .MPI_ERROR ;
1368
- break ;
1369
- }
1370
- }
1371
- }
1372
- ompi_coll_base_free_reqs (reqs , reqs_needed );
1328
+ err_hndl :
1329
+ if (NULL != partial_buf ) {
1330
+ free (partial_buf );
1331
+ partial_buf = NULL ;
1332
+ partial_buf_start = NULL ;
1373
1333
}
1374
-
1375
- /* All done */
1334
+ if (NULL != tmpsend ) {
1335
+ free (tmpsend );
1336
+ tmpsend = NULL ;
1337
+ tmpsend_start = NULL ;
1338
+ }
1339
+ OPAL_OUTPUT ((ompi_coll_base_framework .framework_output , "%s:%4d\tError occurred %d, rank %2d" ,
1340
+ __FILE__ , line , err , rank ));
1341
+ (void )line ; // silence compiler warning
1376
1342
return err ;
1377
- }
1378
1343
1344
+ }
1379
1345
/* copied function (with appropriate renaming) ends here */
0 commit comments