Skip to content

Commit 57026cf

Browse files
authored
Merge pull request #8559 from edgargabriel/topic/atomicity
Topic/atomicity
2 parents 1049307 + 40243cf commit 57026cf

13 files changed

+360
-153
lines changed

ompi/mca/fbtl/base/base.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
* Copyright (c) 2004-2005 The Regents of the University of California.
1111
* All rights reserved.
1212
* Copyright (c) 2008 Cisco Systems, Inc. All rights reserved.
13-
* Copyright (c) 2008-2011 University of Houston. All rights reserved.
13+
* Copyright (c) 2008-2021 University of Houston. All rights reserved.
1414
* Copyright (c) 2018 Research Organization for Information Science
1515
* and Technology (RIST). All rights reserved.
1616
* $COPYRIGHT$
@@ -50,6 +50,8 @@ OMPI_DECLSPEC int mca_fbtl_base_find_available(bool enable_progress_threads,
5050
OMPI_DECLSPEC int mca_fbtl_base_init_file (struct ompio_file_t *file);
5151

5252
OMPI_DECLSPEC int mca_fbtl_base_get_param (struct ompio_file_t *file, int keyval);
53+
54+
OMPI_DECLSPEC bool mca_fbtl_base_check_atomicity (struct ompio_file_t *file);
5355
/*
5456
* Globals
5557
*/

ompi/mca/fbtl/base/fbtl_base_file_select.c

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
* University of Stuttgart. All rights reserved.
1010
* Copyright (c) 2004-2005 The Regents of the University of California.
1111
* All rights reserved.
12-
* Copyright (c) 2008-2011 University of Houston. All rights reserved.
12+
* Copyright (c) 2008-2021 University of Houston. All rights reserved.
1313
* Copyright (c) 2018 Research Organization for Information Science
1414
* and Technology (RIST). All rights reserved.
1515
* $COPYRIGHT$
@@ -259,3 +259,13 @@ int mca_fbtl_base_file_select (struct ompio_file_t *file,
259259

260260
return err;
261261
}
262+
263+
264+
bool mca_fbtl_base_check_atomicity (struct ompio_file_t *file)
265+
{
266+
/* by default, return false. An fbtl can overwrite this setting
267+
** if they have support for atomic operations through locks or other
268+
** measures.
269+
*/
270+
return false;
271+
}

ompi/mca/fbtl/fbtl.h

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
* University of Stuttgart. All rights reserved.
1111
* Copyright (c) 2004-2005 The Regents of the University of California.
1212
* All rights reserved.
13-
* Copyright (c) 2008-2014 University of Houston. All rights reserved.
13+
* Copyright (c) 2008-2021 University of Houston. All rights reserved.
1414
* Copyright (c) 2015 Los Alamos National Security, LLC. All rights
1515
* reserved.
1616
* Copyright (c) 2018 Research Organization for Information Science
@@ -135,6 +135,10 @@ typedef bool (*mca_fbtl_base_module_progress_fn_t)
135135

136136
typedef void (*mca_fbtl_base_module_request_free_fn_t)
137137
( struct mca_ompio_request_t *request);
138+
139+
typedef bool (*mca_fbtl_base_module_check_atomicity_fn_t)
140+
(struct ompio_file_t *file);
141+
138142
/*
139143
* ***********************************************************************
140144
* *************************** module structure *************************
@@ -150,12 +154,13 @@ struct mca_fbtl_base_module_1_0_0_t {
150154
mca_fbtl_base_module_finalize_1_0_0_fn_t fbtl_module_finalize;
151155

152156
/* FBTL function pointers */
153-
mca_fbtl_base_module_preadv_fn_t fbtl_preadv;
154-
mca_fbtl_base_module_ipreadv_fn_t fbtl_ipreadv;
155-
mca_fbtl_base_module_pwritev_fn_t fbtl_pwritev;
156-
mca_fbtl_base_module_ipwritev_fn_t fbtl_ipwritev;
157-
mca_fbtl_base_module_progress_fn_t fbtl_progress;
158-
mca_fbtl_base_module_request_free_fn_t fbtl_request_free;
157+
mca_fbtl_base_module_preadv_fn_t fbtl_preadv;
158+
mca_fbtl_base_module_ipreadv_fn_t fbtl_ipreadv;
159+
mca_fbtl_base_module_pwritev_fn_t fbtl_pwritev;
160+
mca_fbtl_base_module_ipwritev_fn_t fbtl_ipwritev;
161+
mca_fbtl_base_module_progress_fn_t fbtl_progress;
162+
mca_fbtl_base_module_request_free_fn_t fbtl_request_free;
163+
mca_fbtl_base_module_check_atomicity_fn_t fbtl_check_atomicity;
159164
};
160165
typedef struct mca_fbtl_base_module_1_0_0_t mca_fbtl_base_module_1_0_0_t;
161166
typedef mca_fbtl_base_module_1_0_0_t mca_fbtl_base_module_t;

ompi/mca/fbtl/ime/fbtl_ime.c

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include "mpi.h"
1212

1313
#include "ompi/mca/fbtl/fbtl.h"
14+
#include "ompi/mca/fbtl/base/base.h"
1415
#include "ompi/mca/fbtl/ime/fbtl_ime.h"
1516

1617
/*
@@ -26,8 +27,8 @@ static mca_fbtl_base_module_1_0_0_t ime = {
2627
mca_fbtl_ime_pwritev, /* blocking write */
2728
mca_fbtl_ime_ipwritev, /* non-blocking write */
2829
mca_fbtl_ime_progress, /* module specific progress */
29-
mca_fbtl_ime_request_free /* free module specific data items on the request */
30-
};
30+
mca_fbtl_ime_request_free, /* free module specific data items on the request */
31+
mca_fbtl_base_check_atomicity /* check whether atomicity is supported on this fs */};
3132
/*
3233
* *******************************************************************
3334
* ************************* structure ends **************************
@@ -179,4 +180,4 @@ void mca_fbtl_ime_complete_cb (struct ime_aiocb *aiocb, int err, ssize_t bytes)
179180
{
180181
ssize_t *req_status = (ssize_t *) aiocb->user_context;
181182
*req_status = err == 0 ? bytes : FBTL_IME_REQ_ERROR;
182-
}
183+
}

ompi/mca/fbtl/posix/fbtl_posix.c

Lines changed: 55 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,13 @@ static mca_fbtl_base_module_1_0_0_t posix = {
5858
#if defined (FBTL_POSIX_HAVE_AIO)
5959
mca_fbtl_posix_ipwritev, /* non-blocking write */
6060
mca_fbtl_posix_progress, /* module specific progress */
61-
mca_fbtl_posix_request_free /* free module specific data items on the request */
61+
mca_fbtl_posix_request_free, /* free module specific data items on the request */
6262
#else
6363
NULL, /* non-blocking write */
6464
NULL, /* module specific progress */
65-
NULL /* free module specific data items on the request */
65+
NULL, /* free module specific data items on the request */
6666
#endif
67+
mca_fbtl_posix_check_atomicity /* check whether atomicity is supported on this fs */
6768
};
6869
/*
6970
* *******************************************************************
@@ -144,34 +145,38 @@ bool mca_fbtl_posix_progress ( mca_ompio_request_t *req)
144145
data->aio_req_status[i] = EINPROGRESS;
145146
start_offset = data->aio_reqs[i].aio_offset;
146147
total_length = data->aio_reqs[i].aio_nbytes;
148+
/* release previous lock */
149+
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh, &data->aio_lock_counter );
150+
147151
if ( data->aio_req_type == FBTL_POSIX_WRITE ) {
148-
ret_code = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_WRLCK, start_offset, total_length, OMPIO_LOCK_ENTIRE_REGION );
152+
ret_code = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_WRLCK, start_offset, total_length,
153+
OMPIO_LOCK_ENTIRE_REGION, &data->aio_lock_counter );
149154
if ( 0 < ret_code ) {
150155
opal_output(1, "mca_fbtl_posix_progress: error in mca_fbtl_posix_lock() %d", ret_code);
151156
/* Just in case some part of the lock actually succeeded. */
152-
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
157+
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh, &data->aio_lock_counter );
153158
return OMPI_ERROR;
154159
}
155160
if (-1 == aio_write(&data->aio_reqs[i])) {
156161
opal_output(1, "mca_fbtl_posix_progress: error in aio_write()");
157-
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
162+
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh, &data->aio_lock_counter );
158163
return OMPI_ERROR;
159164
}
160165
}
161166
else if ( data->aio_req_type == FBTL_POSIX_READ ) {
162-
ret_code = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_RDLCK, start_offset, total_length, OMPIO_LOCK_ENTIRE_REGION );
167+
ret_code = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_RDLCK, start_offset, total_length,
168+
OMPIO_LOCK_ENTIRE_REGION, &data->aio_lock_counter );
163169
if ( 0 < ret_code ) {
164170
opal_output(1, "mca_fbtl_posix_progress: error in mca_fbtl_posix_lock() %d", ret_code);
165171
/* Just in case some part of the lock actually succeeded. */
166-
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
172+
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh, &data->aio_lock_counter );
167173
return OMPI_ERROR;
168174
}
169175
if (-1 == aio_read(&data->aio_reqs[i])) {
170176
opal_output(1, "mca_fbtl_posix_progress: error in aio_read()");
171-
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
177+
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh, &data->aio_lock_counter );
172178
return OMPI_ERROR;
173179
}
174-
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
175180
}
176181
}
177182
else {
@@ -199,10 +204,9 @@ bool mca_fbtl_posix_progress ( mca_ompio_request_t *req)
199204
#if 0
200205
printf("lcount=%d open_reqs=%d\n", lcount, data->aio_open_reqs );
201206
#endif
202-
203207
if ( (lcount == data->aio_req_chunks) && (0 != data->aio_open_reqs )) {
204208
/* release the lock of the previous operations */
205-
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
209+
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh, &data->aio_lock_counter );
206210

207211
/* post the next batch of operations */
208212
data->aio_first_active_req = data->aio_last_active_req;
@@ -218,30 +222,32 @@ bool mca_fbtl_posix_progress ( mca_ompio_request_t *req)
218222
total_length = (end_offset - start_offset);
219223

220224
if ( FBTL_POSIX_READ == data->aio_req_type ) {
221-
ret_code = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_RDLCK, start_offset, total_length, OMPIO_LOCK_ENTIRE_REGION );
225+
ret_code = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_RDLCK, start_offset, total_length,
226+
OMPIO_LOCK_ENTIRE_REGION, &data->aio_lock_counter );
222227
}
223228
else if ( FBTL_POSIX_WRITE == data->aio_req_type ) {
224-
ret_code = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_WRLCK, start_offset, total_length, OMPIO_LOCK_ENTIRE_REGION );
229+
ret_code = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_WRLCK, start_offset, total_length,
230+
OMPIO_LOCK_ENTIRE_REGION, &data->aio_lock_counter );
225231
}
226232
if ( 0 < ret_code ) {
227233
opal_output(1, "mca_fbtl_posix_progress: error in mca_fbtl_posix_lock() %d", ret_code);
228234
/* Just in case some part of the lock actually succeeded. */
229-
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
235+
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh, &data->aio_lock_counter );
230236
return OMPI_ERROR;
231237
}
232238

233239
for ( i=data->aio_first_active_req; i< data->aio_last_active_req; i++ ) {
234240
if ( FBTL_POSIX_READ == data->aio_req_type ) {
235241
if (-1 == aio_read(&data->aio_reqs[i])) {
236242
opal_output(1, "mca_fbtl_posix_progress: error in aio_read()");
237-
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
243+
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh, &data->aio_lock_counter );
238244
return OMPI_ERROR;
239245
}
240246
}
241247
else if ( FBTL_POSIX_WRITE == data->aio_req_type ) {
242248
if (-1 == aio_write(&data->aio_reqs[i])) {
243249
opal_output(1, "mca_fbtl_posix_progress: error in aio_write()");
244-
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
250+
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh, &data->aio_lock_counter );
245251
return OMPI_ERROR;
246252
}
247253
}
@@ -255,8 +261,13 @@ bool mca_fbtl_posix_progress ( mca_ompio_request_t *req)
255261
/* all pending operations are finished for this request */
256262
req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS;
257263
req->req_ompi.req_status._ucount = data->aio_total_len;
258-
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
259-
ret = true;
264+
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh, &data->aio_lock_counter );
265+
266+
if ( data->aio_fh->f_atomicity ) {
267+
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh, &data->aio_lock_counter );
268+
}
269+
270+
ret = true;
260271
}
261272
#endif
262273
return ret;
@@ -268,8 +279,8 @@ void mca_fbtl_posix_request_free ( mca_ompio_request_t *req)
268279
/* Free the fbtl specific data structures */
269280
mca_fbtl_posix_request_data_t *data=(mca_fbtl_posix_request_data_t *)req->req_data;
270281
if (NULL != data ) {
271-
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
272-
if ( NULL != data->aio_reqs ) {
282+
283+
if ( NULL != data->aio_reqs ) {
273284
free ( data->aio_reqs);
274285
}
275286
if ( NULL != data->aio_req_status ) {
@@ -281,3 +292,27 @@ void mca_fbtl_posix_request_free ( mca_ompio_request_t *req)
281292
#endif
282293
return;
283294
}
295+
296+
bool mca_fbtl_posix_check_atomicity ( ompio_file_t *file)
297+
{
298+
struct flock lock;
299+
300+
lock.l_type = F_WRLCK;
301+
lock.l_whence = SEEK_SET;
302+
lock.l_start = 0;
303+
lock.l_len = 0;
304+
lock.l_pid = 0;
305+
306+
if (fcntl(file->fd, F_GETLK, &lock) < 0)
307+
{
308+
#ifdef VERBOSE
309+
printf("Failed to get lock info for '%s': %s\n", filename, strerror(errno));
310+
#endif
311+
return false;
312+
}
313+
314+
#ifdef VERBOSE
315+
printf("Lock would have worked, l_type=%d\n", (int)lock.l_type);
316+
#endif
317+
return true;
318+
}

ompi/mca/fbtl/posix/fbtl_posix.h

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
* University of Stuttgart. All rights reserved.
1010
* Copyright (c) 2004-2005 The Regents of the University of California.
1111
* All rights reserved.
12-
* Copyright (c) 2008-2020 University of Houston. All rights reserved.
12+
* Copyright (c) 2008-2021 University of Houston. All rights reserved.
1313
* Copyright (c) 2018 Research Organization for Information Science
1414
* and Technology (RIST). All rights reserved.
1515
* $COPYRIGHT$
@@ -64,10 +64,12 @@ ssize_t mca_fbtl_posix_ipwritev (ompio_file_t *file,
6464

6565
bool mca_fbtl_posix_progress ( mca_ompio_request_t *req);
6666
void mca_fbtl_posix_request_free ( mca_ompio_request_t *req);
67+
bool mca_fbtl_posix_check_atomicity ( ompio_file_t *file);
6768

6869
int mca_fbtl_posix_lock ( struct flock *lock, ompio_file_t *fh, int op,
69-
OMPI_MPI_OFFSET_TYPE iov_offset, off_t len, int flags);
70-
void mca_fbtl_posix_unlock ( struct flock *lock, ompio_file_t *fh );
70+
OMPI_MPI_OFFSET_TYPE iov_offset, off_t len, int flags,
71+
int *lock_counter);
72+
void mca_fbtl_posix_unlock ( struct flock *lock, ompio_file_t *fh, int *lock_counter );
7173

7274

7375
struct mca_fbtl_posix_request_data_t {
@@ -78,9 +80,10 @@ struct mca_fbtl_posix_request_data_t {
7880
int aio_first_active_req; /* first active posted req */
7981
int aio_last_active_req; /* last currently active poted req */
8082
struct aiocb *aio_reqs; /* pointer array of req structures */
81-
int *aio_req_status; /* array of statuses */
83+
int *aio_req_status; /* array of statuses */
8284
ssize_t aio_total_len; /* total amount of data written */
8385
struct flock aio_lock; /* lock used for certain file systems */
86+
int aio_lock_counter; /* to keep track of no. of lock calls */
8487
ompio_file_t *aio_fh; /* pointer back to the mca_io_ompio_fh structure */
8588
};
8689
typedef struct mca_fbtl_posix_request_data_t mca_fbtl_posix_request_data_t;
@@ -92,6 +95,22 @@ typedef struct mca_fbtl_posix_request_data_t mca_fbtl_posix_request_data_t;
9295
#define FBTL_POSIX_READ 1
9396
#define FBTL_POSIX_WRITE 2
9497

98+
#define OMPIO_SET_ATOMICITY_LOCK(_fh, _lock, _lock_counter, _op) { \
99+
int32_t _orig_flags = _fh->f_flags; \
100+
_fh->f_flags &= ~OMPIO_LOCK_NEVER; \
101+
_fh->f_flags &= ~OMPIO_LOCK_NOT_THIS_OP; \
102+
off_t _end_offset = (off_t)_fh->f_io_array[_fh->f_num_of_io_entries-1].offset + \
103+
(off_t)_fh->f_io_array[_fh->f_num_of_io_entries-1].length; \
104+
off_t _len = _end_offset - (off_t)_fh->f_io_array[0].offset; \
105+
int _ret = mca_fbtl_posix_lock ( &_lock, _fh, _op, (off_t)_fh->f_io_array[0].offset, \
106+
_len, OMPIO_LOCK_ENTIRE_REGION, &_lock_counter); \
107+
if ( _ret == -1 ) { \
108+
opal_output(1, "mca_fbtl_posix: error in mca_fbtl_posix_lock():%s", \
109+
strerror(errno)); \
110+
return OMPI_ERROR; \
111+
} \
112+
_fh->f_flags = _orig_flags; }
113+
95114

96115
/*
97116
* ******************************************************************

ompi/mca/fbtl/posix/fbtl_posix_ipreadv.c

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,13 @@ ssize_t mca_fbtl_posix_ipreadv (ompio_file_t *fh,
7070
free(data);
7171
return 0;
7272
}
73+
data->aio_lock_counter = 0;
7374
data->aio_fh = fh;
7475

76+
if ( fh->f_atomicity ) {
77+
OMPIO_SET_ATOMICITY_LOCK(fh, data->aio_lock, data->aio_lock_counter, F_RDLCK);
78+
}
79+
7580
for ( i=0; i<fh->f_num_of_io_entries; i++ ) {
7681
data->aio_reqs[i].aio_offset = (OMPI_MPI_OFFSET_TYPE)(intptr_t)
7782
fh->f_io_array[i].offset;
@@ -94,10 +99,11 @@ ssize_t mca_fbtl_posix_ipreadv (ompio_file_t *fh,
9499
start_offset = data->aio_reqs[data->aio_first_active_req].aio_offset;
95100
end_offset = data->aio_reqs[data->aio_last_active_req-1].aio_offset + data->aio_reqs[data->aio_last_active_req-1].aio_nbytes;
96101
total_length = (end_offset - start_offset);
97-
ret = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_RDLCK, start_offset, total_length, OMPIO_LOCK_ENTIRE_REGION );
102+
ret = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_RDLCK, start_offset, total_length,
103+
OMPIO_LOCK_ENTIRE_REGION, &data->aio_lock_counter );
98104
if ( 0 < ret ) {
99105
opal_output(1, "mca_fbtl_posix_ipreadv: error in mca_fbtl_posix_lock() error ret=%d %s", ret, strerror(errno));
100-
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
106+
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh, &data->aio_lock_counter );
101107
free(data->aio_reqs);
102108
free(data->aio_req_status);
103109
free(data);
@@ -115,7 +121,7 @@ ssize_t mca_fbtl_posix_ipreadv (ompio_file_t *fh,
115121
}
116122
if ( MAX_ATTEMPTS == counter ) {
117123
opal_output(1, "mca_fbtl_posix_ipreadv: error in aio_read(): errno %d %s", errno, strerror(errno));
118-
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
124+
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh, &data->aio_lock_counter );
119125
free(data->aio_reqs);
120126
free(data->aio_req_status);
121127
free(data);

0 commit comments

Comments
 (0)