Skip to content

Commit 25b8dd2

Browse files
committed
common/ompio: add pipelined file_iwrite and iread
Pipelined iread/iwrite operations require the notion of subrequests, i.e. a user level request can contain multiple internal subrequests that all have to complete before the user level operation is considered finished. This requires adjustments to the internal ompio progress engine and data structures. Note: this is purely just a pipelined algorithm, no overlap between different iterations. Extract the file view into a separate datastructure. This is required in the next step since we need to cache file view and position of the file pointer on the request. Replicate the file view and the file pointer position on the request. This is required to correctly increment where to read/write data for every subrequest, and handle the potential situation that the code changes the file after posting an iread/iwrite but before the operation finishes. Furthermore, when initiating an iread/iwrite we need to immidiatly move the position of the handle to the end of the operation, such that subsequent read/write operations use the correct position to start out with, and don't accidentaly interfere with the already ongoing non-blocking operation. Signed-off-by: Edgar Gabriel <edgar.gabriel@amd.com> Signed-off-by: Edgar Gabriel <edgar.gabriel@amd.com>
1 parent c0e3a7c commit 25b8dd2

25 files changed

+649
-502
lines changed

ompi/mca/common/ompio/common_ompio.h

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
* Copyright (c) 2018 Research Organization for Information Science
1515
* and Technology (RIST). All rights reserved.
1616
* Copyright (c) 2018 DataDirect Networks. All rights reserved.
17+
* Copyright (c) 2022-2023 Advanced Micro Devices, Inc. All rights reserved.
1718
* $COPYRIGHT$
1819
*
1920
* Additional copyrights may follow
@@ -140,18 +141,32 @@ typedef int (*mca_common_ompio_generate_current_file_view_fn_t) (struct ompio_fi
140141
temporary buffer on aggregators from the fcoll modules */
141142
typedef int (*mca_common_ompio_get_mca_parameter_value_fn_t) ( char *mca_parameter_name, int name_length );
142143

143-
144144
struct mca_common_ompio_print_queue;
145145

146+
/* File View parameters and position in file*/
147+
struct ompio_fview_t {
148+
uint32_t f_flags;
149+
OMPI_MPI_OFFSET_TYPE f_offset; /* byte offset of current position */
150+
OMPI_MPI_OFFSET_TYPE f_disp; /* file_view displacement */
151+
struct iovec *f_decoded_iov;
152+
uint32_t f_iov_count;
153+
size_t f_position_in_file_view; /* in bytes */
154+
size_t f_total_bytes; /* total bytes read/written within 1 Fview*/
155+
int f_index_in_file_view;
156+
ptrdiff_t f_view_extent;
157+
size_t f_view_size;
158+
size_t f_etype_size;
159+
};
160+
typedef struct ompio_fview_t ompio_fview_t;
161+
146162
/**
147163
* Back-end structure for MPI_File
148164
*/
149165
struct ompio_file_t {
150166
/* General parameters */
151167
int fd;
152168
struct ompi_file_t *f_fh; /* pointer back to the file_t structure */
153-
OMPI_MPI_OFFSET_TYPE f_offset; /* byte offset of current position */
154-
OMPI_MPI_OFFSET_TYPE f_disp; /* file_view displacement */
169+
uint32_t f_flags;
155170
int f_rank;
156171
int f_size;
157172
int f_amode;
@@ -163,7 +178,6 @@ struct ompio_file_t {
163178
opal_convertor_t *f_mem_convertor;
164179
opal_convertor_t *f_file_convertor;
165180
opal_info_t *f_info;
166-
int32_t f_flags;
167181
void *f_fs_ptr;
168182
int f_fs_block_size;
169183
int f_atomicity;
@@ -180,24 +194,16 @@ struct ompio_file_t {
180194
*/
181195
void *f_sharedfp_data;
182196

183-
184197
/* File View parameters */
185-
struct iovec *f_decoded_iov;
186-
uint32_t f_iov_count;
187-
ompi_datatype_t *f_iov_type;
188-
size_t f_position_in_file_view; /* in bytes */
189-
size_t f_total_bytes; /* total bytes read/written within 1 Fview*/
190-
int f_index_in_file_view;
191-
ptrdiff_t f_view_extent;
192-
size_t f_view_size;
193-
ompi_datatype_t *f_etype;
194-
ompi_datatype_t *f_filetype;
195-
ompi_datatype_t *f_orig_filetype; /* the fileview passed by the user to us */
196-
size_t f_etype_size;
198+
struct ompio_fview_t f_fview;
199+
ompi_datatype_t *f_iov_type;
200+
ompi_datatype_t *f_etype;
201+
ompi_datatype_t *f_filetype;
202+
ompi_datatype_t *f_orig_filetype; /* the fileview passed by the user to us */
197203

198204
/* contains IO requests that needs to be read/written */
199205
mca_common_ompio_io_array_t *f_io_array;
200-
int f_num_of_io_entries;
206+
int f_num_of_io_entries;
201207

202208
/* Hooks for modules to hang things */
203209
mca_base_component_t *f_fs_component;
@@ -272,9 +278,9 @@ OMPI_DECLSPEC int mca_common_ompio_file_iwrite_all (ompio_file_t *fp, const void
272278
OMPI_DECLSPEC int mca_common_ompio_file_iwrite_at_all (ompio_file_t *fp, OMPI_MPI_OFFSET_TYPE offset, const void *buf,
273279
int count, struct ompi_datatype_t *datatype, ompi_request_t **request);
274280

275-
OMPI_DECLSPEC int mca_common_ompio_build_io_array ( ompio_file_t *fh, int index, int cycles,
281+
OMPI_DECLSPEC int mca_common_ompio_build_io_array ( ompio_fview_t *fview, int index, int cycles,
276282
size_t bytes_per_cycle, size_t max_data, uint32_t iov_count,
277-
struct iovec *decoded_iov, int *ii, int *jj, size_t *tbw,
283+
struct iovec *decoded_iov, int *ii, size_t *tbw,
278284
size_t *spc, mca_common_ompio_io_array_t **io_array,
279285
int *num_io_entries );
280286

@@ -342,6 +348,8 @@ OMPI_DECLSPEC int mca_common_ompio_decode_datatype (struct ompio_file_t *fh,
342348
struct iovec **iov,
343349
uint32_t *iov_count);
344350

351+
OMPI_DECLSPEC int mca_common_ompio_fview_duplicate (struct ompio_fview_t *outfv, struct ompio_fview_t *infv);
352+
345353
OMPI_DECLSPEC int mca_common_ompio_set_callbacks(mca_common_ompio_generate_current_file_view_fn_t generate_current_file_view,
346354
mca_common_ompio_get_mca_parameter_value_fn_t get_mca_parameter_value);
347355
#endif /* MCA_COMMON_OMPIO_H */

ompi/mca/common/ompio/common_ompio_aggregators.c

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -239,13 +239,13 @@ int mca_common_ompio_fview_based_grouping(ompio_file_t *fh,
239239
OMPI_MPI_OFFSET_TYPE *start_offsets_lens = NULL;
240240

241241
//Store start offset,length and corresponding rank in an array
242-
if(NULL == fh->f_decoded_iov){
242+
if(NULL == fh->f_fview.f_decoded_iov){
243243
start_offset_len[0] = 0;
244244
start_offset_len[1] = 0;
245245
}
246246
else{
247-
start_offset_len[0] = (OMPI_MPI_OFFSET_TYPE) fh->f_decoded_iov[0].iov_base;
248-
start_offset_len[1] = fh->f_decoded_iov[0].iov_len;
247+
start_offset_len[0] = (OMPI_MPI_OFFSET_TYPE) fh->f_fview.f_decoded_iov[0].iov_base;
248+
start_offset_len[1] = fh->f_fview.f_decoded_iov[0].iov_len;
249249
}
250250
start_offset_len[2] = fh->f_rank;
251251

@@ -1275,13 +1275,13 @@ int mca_common_ompio_prepare_to_group(ompio_file_t *fh,
12751275
int ret=OMPI_SUCCESS;
12761276

12771277
//Store start offset and length in an array //also add bytes per process
1278-
if(NULL == fh->f_decoded_iov){
1278+
if(NULL == fh->f_fview.f_decoded_iov){
12791279
start_offset_len[0] = 0;
12801280
start_offset_len[1] = 0;
12811281
}
12821282
else{
1283-
start_offset_len[0] = (OMPI_MPI_OFFSET_TYPE) fh->f_decoded_iov[0].iov_base;
1284-
start_offset_len[1] = fh->f_decoded_iov[0].iov_len;
1283+
start_offset_len[0] = (OMPI_MPI_OFFSET_TYPE) fh->f_fview.f_decoded_iov[0].iov_base;
1284+
start_offset_len[1] = fh->f_fview.f_decoded_iov[0].iov_len;
12851285
}
12861286
start_offset_len[2] = bytes_per_proc;
12871287
start_offsets_lens_tmp = (OMPI_MPI_OFFSET_TYPE* )malloc (3 * fh->f_init_procs_per_group * sizeof(OMPI_MPI_OFFSET_TYPE));

ompi/mca/common/ompio/common_ompio_buffer.h

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,10 @@
3131
opal_output(1, "common_ompio: error allocating memory\n"); \
3232
return OMPI_ERR_OUT_OF_RESOURCE; \
3333
} \
34-
_decoded_iov = (struct iovec *) malloc ( sizeof ( struct iovec )); \
35-
if ( NULL == _decoded_iov ) { \
36-
opal_output(1, "common_ompio: could not allocate memory.\n"); \
37-
return OMPI_ERR_OUT_OF_RESOURCE; \
38-
} \
39-
_decoded_iov->iov_base = _tbuf; \
40-
_decoded_iov->iov_len = _max_data; \
41-
_iov_count=1;}
34+
if (NULL != _decoded_iov) { \
35+
((struct iovec*)_decoded_iov)->iov_base = _tbuf; \
36+
((struct iovec*)_decoded_iov)->iov_len = _max_data; \
37+
_iov_count=1;}}
4238

4339
#define OMPIO_PREPARE_READ_BUF(_fh,_buf,_count,_datatype,_tbuf,_convertor,_max_data,_tmp_buf_size,_decoded_iov,_iov_count){ \
4440
OBJ_CONSTRUCT( _convertor, opal_convertor_t); \
@@ -49,14 +45,10 @@
4945
opal_output(1, "common_ompio: error allocating memory\n"); \
5046
return OMPI_ERR_OUT_OF_RESOURCE; \
5147
} \
52-
_decoded_iov = (struct iovec *) malloc ( sizeof ( struct iovec )); \
53-
if ( NULL == _decoded_iov ) { \
54-
opal_output(1, "common_ompio: could not allocate memory.\n"); \
55-
return OMPI_ERR_OUT_OF_RESOURCE; \
56-
} \
57-
_decoded_iov->iov_base = _tbuf; \
58-
_decoded_iov->iov_len = _max_data; \
59-
_iov_count=1;}
48+
if (NULL != _decoded_iov) { \
49+
((struct iovec*)_decoded_iov)->iov_base = _tbuf; \
50+
((struct iovec*)_decoded_iov)->iov_len = _max_data; \
51+
_iov_count=1;}}
6052

6153
void mca_common_ompio_check_gpu_buf ( ompio_file_t *fh, const void *buf,
6254
int *is_gpu, int *is_managed);

ompi/mca/common/ompio/common_ompio_file_open.c

Lines changed: 40 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,8 @@ int mca_common_ompio_file_open (ompi_communicator_t *comm,
9999
ompio_fh->f_info = info;
100100

101101
/* set some function pointers required for fcoll, fbtls and sharedfp modules*/
102-
ompio_fh->f_generate_current_file_view=generate_current_file_view_fn;
103-
ompio_fh->f_get_mca_parameter_value=get_mca_parameter_value_fn;
102+
ompio_fh->f_generate_current_file_view = generate_current_file_view_fn;
103+
ompio_fh->f_get_mca_parameter_value = get_mca_parameter_value_fn;
104104

105105
ompio_fh->f_filename = filename;
106106
if (opal_path_is_absolute(filename) ) {
@@ -343,9 +343,9 @@ int mca_common_ompio_file_close (ompio_file_t *ompio_fh)
343343
ompio_fh->f_procs_in_group = NULL;
344344
}
345345

346-
if (NULL != ompio_fh->f_decoded_iov) {
347-
free (ompio_fh->f_decoded_iov);
348-
ompio_fh->f_decoded_iov = NULL;
346+
if (NULL != ompio_fh->f_fview.f_decoded_iov) {
347+
free (ompio_fh->f_fview.f_decoded_iov);
348+
ompio_fh->f_fview.f_decoded_iov = NULL;
349349
}
350350

351351
if (NULL != ompio_fh->f_mem_convertor) {
@@ -415,21 +415,21 @@ int mca_common_ompio_file_get_position (ompio_file_t *fh,
415415
{
416416
OMPI_MPI_OFFSET_TYPE off;
417417

418-
if ( 0 == fh->f_view_extent ||
419-
0 == fh->f_view_size ||
420-
0 == fh->f_etype_size ) {
418+
if ( 0 == fh->f_fview.f_view_extent ||
419+
0 == fh->f_fview.f_view_size ||
420+
0 == fh->f_fview.f_etype_size ) {
421421
/* not sure whether we should raise an error here */
422422
*offset = 0;
423423
return OMPI_SUCCESS;
424424
}
425425
/* No. of copies of the entire file view */
426-
off = (fh->f_offset - fh->f_disp)/fh->f_view_extent;
426+
off = (fh->f_fview.f_offset - fh->f_fview.f_disp)/fh->f_fview.f_view_extent;
427427

428428
/* No. of elements per view */
429-
off *= (fh->f_view_size / fh->f_etype_size);
429+
off *= (fh->f_fview.f_view_size / fh->f_fview.f_etype_size);
430430

431431
/* No of elements used in the current copy of the view */
432-
off += fh->f_total_bytes / fh->f_etype_size;
432+
off += fh->f_fview.f_total_bytes / fh->f_fview.f_etype_size;
433433

434434
*offset = off;
435435
return OMPI_SUCCESS;
@@ -445,9 +445,9 @@ int mca_common_ompio_set_file_defaults (ompio_file_t *fh)
445445
ptrdiff_t d[2], base;
446446
int i, flag;
447447

448-
fh->f_io_array = NULL;
449-
fh->f_perm = OMPIO_PERM_NULL;
450-
fh->f_flags = 0;
448+
fh->f_flags = 0;
449+
fh->f_perm = OMPIO_PERM_NULL;
450+
fh->f_io_array = NULL;
451451

452452
fh->f_bytes_per_agg = OMPIO_MCA_GET(fh, bytes_per_agg);
453453
opal_info_get (fh->f_info, "cb_buffer_size", &stripe_str, &flag);
@@ -458,43 +458,41 @@ int mca_common_ompio_set_file_defaults (ompio_file_t *fh)
458458
OBJ_RELEASE(stripe_str);
459459
}
460460

461-
fh->f_atomicity = 0;
462461
fh->f_fs_block_size = 4096;
462+
fh->f_atomicity = 0;
463+
fh->f_stripe_size = 0;
464+
fh->f_stripe_count = 0;
463465

464-
fh->f_offset = 0;
465-
fh->f_disp = 0;
466-
fh->f_position_in_file_view = 0;
467-
fh->f_index_in_file_view = 0;
468-
fh->f_total_bytes = 0;
466+
/* File View */
467+
fh->f_fview.f_flags = 0;
468+
fh->f_fview.f_offset = 0;
469+
fh->f_fview.f_disp = 0;
470+
fh->f_fview.f_position_in_file_view = 0;
471+
fh->f_fview.f_index_in_file_view = 0;
472+
fh->f_fview.f_total_bytes = 0;
473+
fh->f_fview.f_decoded_iov = NULL;
474+
475+
fh->f_iov_type = MPI_DATATYPE_NULL;
476+
fh->f_etype = MPI_DATATYPE_NULL;
477+
fh->f_filetype = MPI_DATATYPE_NULL;
478+
fh->f_orig_filetype = MPI_DATATYPE_NULL;
469479

470480
fh->f_init_procs_per_group = -1;
471-
fh->f_init_procs_in_group = NULL;
472-
473-
fh->f_procs_per_group = -1;
474-
fh->f_procs_in_group = NULL;
475-
476-
fh->f_init_num_aggrs = -1;
477-
fh->f_init_aggr_list = NULL;
478-
479-
fh->f_num_aggrs = -1;
480-
fh->f_aggr_list = NULL;
481-
482-
/* Default file View */
483-
fh->f_iov_type = MPI_DATATYPE_NULL;
484-
fh->f_stripe_size = 0;
485-
/*Decoded iovec of the file-view*/
486-
fh->f_decoded_iov = NULL;
487-
fh->f_etype = MPI_DATATYPE_NULL;
488-
fh->f_filetype = MPI_DATATYPE_NULL;
489-
fh->f_orig_filetype = MPI_DATATYPE_NULL;
490-
fh->f_datarep = NULL;
481+
fh->f_init_procs_in_group = NULL;
482+
fh->f_procs_per_group = -1;
483+
fh->f_procs_in_group = NULL;
484+
fh->f_init_num_aggrs = -1;
485+
fh->f_init_aggr_list = NULL;
486+
fh->f_num_aggrs = -1;
487+
fh->f_aggr_list = NULL;
488+
fh->f_datarep = NULL;
491489

492490
/*Create a derived datatype for the created iovec */
493491
types[0] = &ompi_mpi_long.dt;
494492
types[1] = &ompi_mpi_long.dt;
495493

496-
d[0] = (ptrdiff_t) fh->f_decoded_iov;
497-
d[1] = (ptrdiff_t) &fh->f_decoded_iov[0].iov_len;
494+
d[0] = (ptrdiff_t) fh->f_fview.f_decoded_iov;
495+
d[1] = (ptrdiff_t) &fh->f_fview.f_decoded_iov[0].iov_len;
498496

499497
base = d[0];
500498
for (i=0 ; i<2 ; i++) {

0 commit comments

Comments
 (0)