32
32
33
33
static ssize_t mca_fbtl_posix_pwritev_datasieving (ompio_file_t * fh , struct flock * lock , int * lock_counter );
34
34
static ssize_t mca_fbtl_posix_pwritev_generic (ompio_file_t * fh , struct flock * lock , int * lock_counter );
35
+ static ssize_t mca_fbtl_posix_pwritev_single (ompio_file_t * fh , struct flock * lock , int * lock_counter );
36
+
37
+ #define MAX_RETRIES 10
35
38
36
39
ssize_t mca_fbtl_posix_pwritev (ompio_file_t * fh )
37
40
{
38
- ssize_t bytes_written = 0 , ret_code = 0 ;
41
+ ssize_t bytes_written = 0 ;
39
42
struct flock lock ;
40
43
int lock_counter = 0 , ret ;
41
44
int32_t orig_flags ;
@@ -56,6 +59,12 @@ ssize_t mca_fbtl_posix_pwritev(ompio_file_t *fh )
56
59
off_t len = end_offset - (off_t )fh -> f_io_array [0 ].offset ;
57
60
ret = mca_fbtl_posix_lock ( & lock , fh , F_WRLCK , (off_t )fh -> f_io_array [0 ].offset ,
58
61
len , OMPIO_LOCK_ENTIRE_REGION , & lock_counter );
62
+ if ( 0 < ret ) {
63
+ opal_output (1 , "mca_fbtl_posix_pwritev: error in mca_fbtl_posix_lock() ret=%d: %s" ,
64
+ ret , strerror (errno ));
65
+ mca_fbtl_posix_unlock ( & lock , fh , & lock_counter );
66
+ return OMPI_ERROR ;
67
+ }
59
68
fh -> f_flags = orig_flags ;
60
69
}
61
70
@@ -92,38 +101,50 @@ ssize_t mca_fbtl_posix_pwritev(ompio_file_t *fh )
92
101
}
93
102
else {
94
103
// i.e. fh->f_num_of_io_entries == 1
95
- ret = mca_fbtl_posix_lock ( & lock , fh , F_WRLCK , (off_t )fh -> f_io_array [0 ].offset ,
96
- (off_t )fh -> f_io_array [0 ].length , OMPIO_LOCK_ENTIRE_REGION ,
97
- & lock_counter );
98
- if ( 0 < ret ) {
99
- opal_output (1 , "mca_fbtl_posix_pwritev: error in mca_fbtl_posix_lock() ret=%d: %s" ,
100
- ret , strerror (errno ));
101
- /* Just in case some part of the lock worked */
102
- mca_fbtl_posix_unlock ( & lock , fh , & lock_counter );
103
- bytes_written = OMPI_ERROR ;
104
- goto exit ;
105
- }
106
-
107
- ret_code = pwrite (fh -> fd , fh -> f_io_array [0 ].memory_address , fh -> f_io_array [0 ].length ,
108
- (off_t )fh -> f_io_array [0 ].offset );
109
- mca_fbtl_posix_unlock ( & lock , fh , & lock_counter );
110
- if ( ret_code == -1 ) {
111
- opal_output (1 , "mca_fbtl_posix_pwritev: error in (p)write(v):%s" , strerror (errno ));
112
- bytes_written = OMPI_ERROR ;
113
- goto exit ;
114
- }
115
-
116
- bytes_written += ret_code ;
104
+ bytes_written = mca_fbtl_posix_pwritev_single (fh , & lock , & lock_counter );
117
105
}
118
106
119
- exit :
120
107
if ( fh -> f_atomicity ) {
121
108
mca_fbtl_posix_unlock ( & lock , fh , & lock_counter );
122
109
}
123
110
124
111
return bytes_written ;
125
112
}
126
113
114
+ ssize_t mca_fbtl_posix_pwritev_single (ompio_file_t * fh , struct flock * lock , int * lock_counter )
115
+ {
116
+ int ret ;
117
+ ssize_t bytes_written = 0 , ret_code ;
118
+ size_t total_bytes = 0 ;
119
+ size_t len = fh -> f_io_array [0 ].length ;
120
+
121
+ ret = mca_fbtl_posix_lock ( lock , fh , F_WRLCK , (off_t )fh -> f_io_array [0 ].offset ,
122
+ (off_t )fh -> f_io_array [0 ].length , OMPIO_LOCK_ENTIRE_REGION ,
123
+ lock_counter );
124
+ if ( 0 < ret ) {
125
+ opal_output (1 , "mca_fbtl_posix_pwritev_single: error in mca_fbtl_posix_lock() ret=%d: %s" ,
126
+ ret , strerror (errno ));
127
+ /* Just in case some part of the lock worked */
128
+ mca_fbtl_posix_unlock ( lock , fh , lock_counter );
129
+ return OMPI_ERROR ;
130
+ }
131
+ while ( total_bytes < len ) {
132
+ ret_code = pwrite (fh -> fd , (char * )fh -> f_io_array [0 ].memory_address + total_bytes ,
133
+ fh -> f_io_array [0 ].length - total_bytes ,
134
+ (off_t )fh -> f_io_array [0 ].offset + total_bytes );
135
+ if ( ret_code == -1 ) {
136
+ opal_output (1 , "mca_fbtl_posix_pwritev: error in (p)write(v):%s" , strerror (errno ));
137
+ mca_fbtl_posix_unlock ( lock , fh , lock_counter );
138
+ return OMPI_ERROR ;
139
+ }
140
+ total_bytes += ret_code ;
141
+ }
142
+ mca_fbtl_posix_unlock ( lock , fh , lock_counter );
143
+
144
+ bytes_written = total_bytes ;
145
+ return bytes_written ;
146
+ }
147
+
127
148
ssize_t mca_fbtl_posix_pwritev_datasieving (ompio_file_t * fh , struct flock * lock , int * lock_counter )
128
149
{
129
150
size_t start , end , len ;
@@ -135,6 +156,7 @@ ssize_t mca_fbtl_posix_pwritev_datasieving (ompio_file_t *fh, struct flock *lock
135
156
int startindex = 0 ;
136
157
int endindex = 0 ;
137
158
bool done = false;
159
+ size_t total_bytes = 0 ;
138
160
139
161
while (!done ) {
140
162
// Break the io_array into chunks such that the size of the temporary
@@ -188,17 +210,29 @@ ssize_t mca_fbtl_posix_pwritev_datasieving (ompio_file_t *fh, struct flock *lock
188
210
return OMPI_ERROR ;
189
211
}
190
212
191
- ret_code = pread (fh -> fd , temp_buf , len , start );
192
- if ( ret_code == -1 ) {
193
- //opal_output(1, "mca_fbtl_posix_pwritev_datasieving: error in pwrite:%s", strerror(errno));
194
- opal_output (1 , "mca_fbtl_posix_pwritev_datasieving: error in pwrite:%s" , strerror (errno ));
195
- /* Just in case some part of the lock worked */
196
- mca_fbtl_posix_unlock ( lock , fh , lock_counter );
197
- free ( temp_buf );
198
- return OMPI_ERROR ;
213
+ int retries = 0 ;
214
+ while ( total_bytes < len ) {
215
+ ret_code = pread (fh -> fd , temp_buf , len , start );
216
+ if ( ret_code == -1 ) {
217
+ opal_output (1 , "mca_fbtl_posix_pwritev_datasieving: error in pread:%s" , strerror (errno ));
218
+ mca_fbtl_posix_unlock ( lock , fh , lock_counter );
219
+ free ( temp_buf );
220
+ return OMPI_ERROR ;
221
+ }
222
+ if ( ret_code == 0 ) {
223
+ // end of file
224
+ retries ++ ;
225
+ if ( retries == MAX_RETRIES ) {
226
+ break ;
227
+ }
228
+ else {
229
+ continue ;
230
+ }
231
+ }
232
+ total_bytes += ret_code ;
199
233
}
200
234
201
- // Copy out the elements to write into temporary buffer.
235
+ // Copy the elements to write into temporary buffer.
202
236
size_t pos = 0 ;
203
237
size_t num_bytes ;
204
238
size_t start_offset = (size_t ) fh -> f_io_array [startindex ].offset ;
@@ -208,13 +242,17 @@ ssize_t mca_fbtl_posix_pwritev_datasieving (ompio_file_t *fh, struct flock *lock
208
242
memcpy (temp_buf + pos , fh -> f_io_array [i ].memory_address , num_bytes );
209
243
bytes_written += num_bytes ;
210
244
}
211
- ret_code = pwrite (fh -> fd , temp_buf , len , start );
212
- if ( ret_code == -1 ) {
213
- opal_output (1 , "mca_fbtl_posix_pwritev_datasieving: error in pwrite:%s" , strerror (errno ));
214
- /* Just in case some part of the lock worked */
215
- mca_fbtl_posix_unlock ( lock , fh , lock_counter );
216
- free ( temp_buf );
217
- return OMPI_ERROR ;
245
+
246
+ total_bytes = 0 ;
247
+ while ( total_bytes < len ) {
248
+ ret_code = pwrite (fh -> fd , temp_buf + total_bytes , len - total_bytes , start + total_bytes );
249
+ if ( ret_code == -1 ) {
250
+ opal_output (1 , "mca_fbtl_posix_pwritev_datasieving: error in pwrite:%s" , strerror (errno ));
251
+ mca_fbtl_posix_unlock ( lock , fh , lock_counter );
252
+ free ( temp_buf );
253
+ return OMPI_ERROR ;
254
+ }
255
+ total_bytes += ret_code ;
218
256
}
219
257
220
258
mca_fbtl_posix_unlock ( lock , fh , lock_counter );
0 commit comments