9
9
* University of Stuttgart. All rights reserved.
10
10
* Copyright (c) 2004-2005 The Regents of the University of California.
11
11
* All rights reserved.
12
- * Copyright (c) 2008-2015 University of Houston. All rights reserved.
12
+ * Copyright (c) 2008-2021 University of Houston. All rights reserved.
13
13
* Copyright (c) 2017-2018 Research Organization for Information Science
14
14
* and Technology (RIST). All rights reserved.
15
15
* Copyright (c) 2017 IBM Corporation. All rights reserved.
@@ -56,7 +56,6 @@ mca_fcoll_dynamic_gen2_file_read_all (ompio_file_t *fh,
56
56
struct ompi_datatype_t * datatype ,
57
57
ompi_status_public_t * status )
58
58
{
59
- MPI_Aint position = 0 ;
60
59
MPI_Aint total_bytes = 0 ; /* total bytes to be read */
61
60
MPI_Aint bytes_to_read_in_cycle = 0 ; /* left to be read in a cycle*/
62
61
MPI_Aint bytes_per_cycle = 0 ; /* total read in each cycle by each process*/
@@ -75,7 +74,6 @@ mca_fcoll_dynamic_gen2_file_read_all (ompio_file_t *fh,
75
74
int iov_index = 0 ;
76
75
size_t current_position = 0 ;
77
76
struct iovec * local_iov_array = NULL , * global_iov_array = NULL ;
78
- char * receive_buf = NULL ;
79
77
MPI_Aint * memory_displacements = NULL ;
80
78
/* global iovec at the readers that contain the iovecs created from
81
79
file_set_view */
@@ -96,12 +94,12 @@ mca_fcoll_dynamic_gen2_file_read_all (ompio_file_t *fh,
96
94
size_t max_data = 0 ;
97
95
MPI_Aint * total_bytes_per_process = NULL ;
98
96
ompi_datatype_t * * sendtype = NULL ;
99
- MPI_Request * send_req = NULL , recv_req = NULL ;
97
+ MPI_Request * send_req = NULL ;
98
+ MPI_Request recv_req = MPI_REQUEST_NULL ;
100
99
int my_aggregator = -1 ;
101
- bool recvbuf_is_contiguous = false;
102
- size_t ftype_size ;
103
- ptrdiff_t ftype_extent , lb ;
104
100
101
+ int * blocklength_proc = NULL ;
102
+ ptrdiff_t * displs_proc = NULL ;
105
103
106
104
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
107
105
double read_time = 0.0 , start_read_time = 0.0 , end_read_time = 0.0 ;
@@ -113,32 +111,16 @@ mca_fcoll_dynamic_gen2_file_read_all (ompio_file_t *fh,
113
111
/**************************************************************************
114
112
** 1. In case the data is not contigous in memory, decode it into an iovec
115
113
**************************************************************************/
116
-
117
- opal_datatype_type_size ( & datatype -> super , & ftype_size );
118
- opal_datatype_get_extent ( & datatype -> super , & lb , & ftype_extent );
119
-
120
- if ( (ftype_extent == (ptrdiff_t ) ftype_size ) &&
121
- opal_datatype_is_contiguous_memory_layout (& datatype -> super ,1 ) &&
122
- 0 == lb ) {
123
- recvbuf_is_contiguous = true;
124
- }
125
-
126
-
127
- if (! recvbuf_is_contiguous ) {
128
- ret = mca_common_ompio_decode_datatype ((struct ompio_file_t * )fh ,
129
- datatype ,
130
- count ,
131
- buf ,
132
- & max_data ,
133
- fh -> f_mem_convertor ,
134
- & decoded_iov ,
135
- & iov_count );
136
- if (OMPI_SUCCESS != ret ){
137
- goto exit ;
138
- }
139
- }
140
- else {
141
- max_data = count * datatype -> super .size ;
114
+ ret = mca_common_ompio_decode_datatype ((struct ompio_file_t * )fh ,
115
+ datatype ,
116
+ count ,
117
+ buf ,
118
+ & max_data ,
119
+ fh -> f_mem_convertor ,
120
+ & decoded_iov ,
121
+ & iov_count );
122
+ if (OMPI_SUCCESS != ret ){
123
+ goto exit ;
142
124
}
143
125
144
126
if ( MPI_STATUS_IGNORE != status ) {
@@ -743,6 +725,7 @@ mca_fcoll_dynamic_gen2_file_read_all (ompio_file_t *fh,
743
725
start_rcomm_time = MPI_Wtime ();
744
726
#endif
745
727
for (i = 0 ;i < fh -> f_procs_per_group ;i ++ ){
728
+ size_t datatype_size ;
746
729
send_req [i ] = MPI_REQUEST_NULL ;
747
730
if ( 0 < disp_index [i ] ) {
748
731
ompi_datatype_create_hindexed (disp_index [i ],
@@ -751,16 +734,20 @@ mca_fcoll_dynamic_gen2_file_read_all (ompio_file_t *fh,
751
734
MPI_BYTE ,
752
735
& sendtype [i ]);
753
736
ompi_datatype_commit (& sendtype [i ]);
754
- ret = MCA_PML_CALL (isend (global_buf ,
755
- 1 ,
756
- sendtype [i ],
757
- fh -> f_procs_in_group [i ],
758
- 123 ,
759
- MCA_PML_BASE_SEND_STANDARD ,
760
- fh -> f_comm ,
761
- & send_req [i ]));
762
- if (OMPI_SUCCESS != ret ){
763
- goto exit ;
737
+ opal_datatype_type_size (& sendtype [i ]-> super , & datatype_size );
738
+
739
+ if (datatype_size ) {
740
+ ret = MCA_PML_CALL (isend (global_buf ,
741
+ 1 ,
742
+ sendtype [i ],
743
+ fh -> f_procs_in_group [i ],
744
+ FCOLL_DYNAMIC_GEN2_SHUFFLE_TAG ,
745
+ MCA_PML_BASE_SEND_STANDARD ,
746
+ fh -> f_comm ,
747
+ & send_req [i ]));
748
+ if (OMPI_SUCCESS != ret ){
749
+ goto exit ;
750
+ }
764
751
}
765
752
}
766
753
}
@@ -773,35 +760,80 @@ mca_fcoll_dynamic_gen2_file_read_all (ompio_file_t *fh,
773
760
/**********************************************************
774
761
*** 7f. Scatter the Data from the readers
775
762
*********************************************************/
776
- if ( recvbuf_is_contiguous ) {
777
- receive_buf = & ((char * )buf )[position ];
778
- }
779
- else if (bytes_received ) {
780
- /* allocate a receive buffer and copy the data that needs
781
- to be received into it in case the data is non-contigous
782
- in memory */
783
- receive_buf = malloc (bytes_received );
784
- if (NULL == receive_buf ) {
763
+ if (bytes_received ) {
764
+ size_t remaining = bytes_received ;
765
+ int block_index = -1 ;
766
+ int blocklength_size = INIT_LEN ;
767
+
768
+ ptrdiff_t recv_mem_address = 0 ;
769
+ ompi_datatype_t * newType = MPI_DATATYPE_NULL ;
770
+
771
+ blocklength_proc = (int * ) calloc (blocklength_size , sizeof (int ));
772
+ displs_proc = (ptrdiff_t * ) calloc (blocklength_size , sizeof (ptrdiff_t ));
773
+
774
+ if (NULL == blocklength_proc || NULL == displs_proc ) {
785
775
opal_output (1 , "OUT OF MEMORY\n" );
786
776
ret = OMPI_ERR_OUT_OF_RESOURCE ;
787
777
goto exit ;
788
778
}
789
- }
779
+
780
+ while (remaining ) {
781
+ block_index ++ ;
782
+
783
+ if (0 == block_index ) {
784
+ recv_mem_address = (ptrdiff_t ) (decoded_iov [iov_index ].iov_base ) + current_position ;
785
+ }
786
+ else {
787
+ // Reallocate more memory if blocklength_size is not enough
788
+ if (0 == block_index % INIT_LEN ) {
789
+ blocklength_size += INIT_LEN ;
790
+ blocklength_proc = (int * ) realloc (blocklength_proc , blocklength_size * sizeof (int ));
791
+ displs_proc = (ptrdiff_t * ) realloc (displs_proc , blocklength_size * sizeof (ptrdiff_t ));
792
+ }
793
+ displs_proc [block_index ] = (ptrdiff_t ) (decoded_iov [iov_index ].iov_base ) +
794
+ current_position - recv_mem_address ;
795
+ }
796
+
797
+ if (remaining >= (decoded_iov [iov_index ].iov_len - current_position )) {
798
+ blocklength_proc [block_index ] = decoded_iov [iov_index ].iov_len - current_position ;
799
+
800
+ remaining = remaining - blocklength_proc [block_index ];
801
+ iov_index = iov_index + 1 ;
802
+ current_position = 0 ;
803
+ }
804
+ else {
805
+ blocklength_proc [block_index ] = remaining ;
806
+ current_position += remaining ;
807
+ remaining = 0 ;
808
+ }
809
+ }
810
+
811
+ ompi_datatype_create_hindexed (block_index + 1 ,
812
+ blocklength_proc ,
813
+ displs_proc ,
814
+ MPI_BYTE ,
815
+ & newType );
816
+ ompi_datatype_commit (& newType );
790
817
791
818
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
792
- start_rcomm_time = MPI_Wtime ();
819
+ start_rcomm_time = MPI_Wtime ();
793
820
#endif
794
- ret = MCA_PML_CALL (irecv (receive_buf ,
795
- bytes_received ,
796
- MPI_BYTE ,
797
- my_aggregator ,
798
- 123 ,
799
- fh -> f_comm ,
800
- & recv_req ));
801
- if (OMPI_SUCCESS != ret ){
802
- goto exit ;
803
- }
821
+ ret = MCA_PML_CALL (irecv ((char * )recv_mem_address ,
822
+ 1 ,
823
+ newType ,
824
+ my_aggregator ,
825
+ FCOLL_DYNAMIC_GEN2_SHUFFLE_TAG ,
826
+ fh -> f_comm ,
827
+ & recv_req ));
828
+
829
+ if ( MPI_DATATYPE_NULL != newType ) {
830
+ ompi_datatype_destroy (& newType );
831
+ }
804
832
833
+ if (OMPI_SUCCESS != ret ){
834
+ goto exit ;
835
+ }
836
+ }
805
837
806
838
if (my_aggregator == fh -> f_rank ){
807
839
ret = ompi_request_wait_all (fh -> f_procs_per_group ,
@@ -816,50 +848,12 @@ mca_fcoll_dynamic_gen2_file_read_all (ompio_file_t *fh,
816
848
if (OMPI_SUCCESS != ret ){
817
849
goto exit ;
818
850
}
819
- position += bytes_received ;
820
-
821
- /* If data is not contigous in memory, copy the data from the
822
- receive buffer into the buffer passed in */
823
- if (!recvbuf_is_contiguous ) {
824
- ptrdiff_t mem_address ;
825
- size_t remaining = 0 ;
826
- size_t temp_position = 0 ;
827
851
828
- remaining = bytes_received ;
829
-
830
- while (remaining ) {
831
- mem_address = (ptrdiff_t )
832
- (decoded_iov [iov_index ].iov_base ) + current_position ;
833
-
834
- if (remaining >=
835
- (decoded_iov [iov_index ].iov_len - current_position )) {
836
- memcpy ((IOVBASE_TYPE * ) mem_address ,
837
- receive_buf + temp_position ,
838
- decoded_iov [iov_index ].iov_len - current_position );
839
- remaining = remaining -
840
- (decoded_iov [iov_index ].iov_len - current_position );
841
- temp_position = temp_position +
842
- (decoded_iov [iov_index ].iov_len - current_position );
843
- iov_index = iov_index + 1 ;
844
- current_position = 0 ;
845
- }
846
- else {
847
- memcpy ((IOVBASE_TYPE * ) mem_address ,
848
- receive_buf + temp_position ,
849
- remaining );
850
- current_position = current_position + remaining ;
851
- remaining = 0 ;
852
- }
853
- }
854
-
855
- if (NULL != receive_buf ) {
856
- free (receive_buf );
857
- receive_buf = NULL ;
858
- }
859
- }
860
852
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
861
- end_rcomm_time = MPI_Wtime ();
862
- rcomm_time += end_rcomm_time - start_rcomm_time ;
853
+ if (bytes_received ) {
854
+ end_rcomm_time = MPI_Wtime ();
855
+ rcomm_time += end_rcomm_time - start_rcomm_time ;
856
+ }
863
857
#endif
864
858
} /* end for (index=0; index < cycles; index ++) */
865
859
@@ -881,12 +875,6 @@ mca_fcoll_dynamic_gen2_file_read_all (ompio_file_t *fh,
881
875
#endif
882
876
883
877
exit :
884
- if (!recvbuf_is_contiguous ) {
885
- if (NULL != receive_buf ) {
886
- free (receive_buf );
887
- receive_buf = NULL ;
888
- }
889
- }
890
878
if (NULL != global_buf ) {
891
879
free (global_buf );
892
880
global_buf = NULL ;
@@ -916,6 +904,17 @@ mca_fcoll_dynamic_gen2_file_read_all (ompio_file_t *fh,
916
904
free (displs );
917
905
displs = NULL ;
918
906
}
907
+
908
+ if (NULL != blocklength_proc ) {
909
+ free (blocklength_proc );
910
+ blocklength_proc = NULL ;
911
+ }
912
+
913
+ if (NULL != displs_proc ) {
914
+ free (displs_proc );
915
+ displs_proc = NULL ;
916
+ }
917
+
919
918
if (my_aggregator == fh -> f_rank ) {
920
919
921
920
if (NULL != sorted_file_offsets ){
0 commit comments