Skip to content

Commit 018ca4e

Browse files
authored
Merge pull request #5339 from hoopoepg/topic/atomic-module-api-refactoring
SHMEM/ATOMIC: refactoring of module API
2 parents 4962651 + 8080283 commit 018ca4e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+498
-560
lines changed

ompi/mca/osc/ucx/osc_ucx_active_target.c

Lines changed: 14 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ static inline void ompi_osc_ucx_handle_incoming_post(ompi_osc_ucx_module_t *modu
6060

6161
int ompi_osc_ucx_fence(int assert, struct ompi_win_t *win) {
6262
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
63-
ucs_status_t status;
63+
int ret;
6464

6565
if (module->epoch_type.access != NONE_EPOCH &&
6666
module->epoch_type.access != FENCE_EPOCH) {
@@ -74,12 +74,9 @@ int ompi_osc_ucx_fence(int assert, struct ompi_win_t *win) {
7474
}
7575

7676
if (!(assert & MPI_MODE_NOPRECEDE)) {
77-
status = opal_common_ucx_worker_flush(mca_osc_ucx_component.ucp_worker);
78-
if (status != UCS_OK) {
79-
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
80-
"%s:%d: ucp_worker_flush failed: %d\n",
81-
__FILE__, __LINE__, status);
82-
return OMPI_ERROR;
77+
ret = opal_common_ucx_worker_flush(mca_osc_ucx_component.ucp_worker);
78+
if (ret != OMPI_SUCCESS) {
79+
return ret;
8380
}
8481
}
8582

@@ -176,12 +173,9 @@ int ompi_osc_ucx_complete(struct ompi_win_t *win) {
176173

177174
module->epoch_type.access = NONE_EPOCH;
178175

179-
status = opal_common_ucx_worker_flush(mca_osc_ucx_component.ucp_worker);
180-
if (status != UCS_OK) {
181-
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
182-
"%s:%d: ucp_worker_flush failed: %d\n",
183-
__FILE__, __LINE__, status);
184-
return OMPI_ERROR;
176+
ret = opal_common_ucx_worker_flush(mca_osc_ucx_component.ucp_worker);
177+
if (ret != OMPI_SUCCESS) {
178+
return ret;
185179
}
186180
module->global_ops_num = 0;
187181
memset(module->per_target_ops_nums, 0,
@@ -201,12 +195,7 @@ int ompi_osc_ucx_complete(struct ompi_win_t *win) {
201195
__FILE__, __LINE__, status);
202196
}
203197

204-
status = opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker);
205-
if (status != UCS_OK) {
206-
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
207-
"%s:%d: ucp_ep_flush failed: %d\n",
208-
__FILE__, __LINE__, status);
209-
}
198+
opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker);
210199
}
211200

212201
OBJ_RELEASE(module->start_group);
@@ -232,7 +221,6 @@ int ompi_osc_ucx_post(struct ompi_group_t *group, int assert, struct ompi_win_t
232221
ompi_group_t *win_group = NULL;
233222
int *ranks_in_grp = NULL, *ranks_in_win_grp = NULL;
234223
int myrank = ompi_comm_rank(module->comm);
235-
ucs_status_t status;
236224

237225
size = ompi_group_size(module->post_group);
238226
ranks_in_grp = malloc(sizeof(int) * size);
@@ -260,29 +248,19 @@ int ompi_osc_ucx_post(struct ompi_group_t *group, int assert, struct ompi_win_t
260248
uint64_t curr_idx = 0, result = 0;
261249

262250
/* do fop first to get an post index */
263-
status = opal_common_ucx_atomic_fetch(ep, UCP_ATOMIC_FETCH_OP_FADD, 1,
264-
&result, sizeof(result),
265-
remote_addr, rkey, mca_osc_ucx_component.ucp_worker);
266-
if (status != UCS_OK) {
267-
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
268-
"%s:%d: ucp_atomic_fadd64 failed: %d\n",
269-
__FILE__, __LINE__, status);
270-
}
251+
opal_common_ucx_atomic_fetch(ep, UCP_ATOMIC_FETCH_OP_FADD, 1,
252+
&result, sizeof(result),
253+
remote_addr, rkey, mca_osc_ucx_component.ucp_worker);
271254

272255
curr_idx = result & (OMPI_OSC_UCX_POST_PEER_MAX - 1);
273256

274257
remote_addr = (module->state_info_array)[ranks_in_win_grp[i]].addr + OSC_UCX_STATE_POST_STATE_OFFSET + sizeof(uint64_t) * curr_idx;
275258

276259
/* do cas to send post message */
277260
do {
278-
status = opal_common_ucx_atomic_cswap(ep, 0, (uint64_t)myrank + 1, &result,
279-
sizeof(result), remote_addr, rkey,
280-
mca_osc_ucx_component.ucp_worker);
281-
if (status != UCS_OK) {
282-
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
283-
"%s:%d: ucp_atomic_cswap64 failed: %d\n",
284-
__FILE__, __LINE__, status);
285-
}
261+
opal_common_ucx_atomic_cswap(ep, 0, (uint64_t)myrank + 1, &result,
262+
sizeof(result), remote_addr, rkey,
263+
mca_osc_ucx_component.ucp_worker);
286264

287265
if (result == 0)
288266
break;

ompi/mca/osc/ucx/osc_ucx_comm.c

Lines changed: 25 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -61,17 +61,14 @@ static inline int check_sync_state(ompi_osc_ucx_module_t *module, int target,
6161

6262
static inline int incr_and_check_ops_num(ompi_osc_ucx_module_t *module, int target,
6363
ucp_ep_h ep) {
64-
ucs_status_t status;
64+
int status;
6565

6666
module->global_ops_num++;
6767
module->per_target_ops_nums[target]++;
6868
if (module->global_ops_num >= OSC_UCX_OPS_THRESHOLD) {
6969
status = opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker);
70-
if (status != UCS_OK) {
71-
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
72-
"%s:%d: ucp_ep_flush failed: %d\n",
73-
__FILE__, __LINE__, status);
74-
return OMPI_ERROR;
70+
if (status != OMPI_SUCCESS) {
71+
return status;
7572
}
7673
module->global_ops_num -= module->per_target_ops_nums[target];
7774
module->per_target_ops_nums[target] = 0;
@@ -309,16 +306,13 @@ static inline int end_atomicity(ompi_osc_ucx_module_t *module, ucp_ep_h ep, int
309306
uint64_t result_value = 0;
310307
ucp_rkey_h rkey = (module->state_info_array)[target].rkey;
311308
uint64_t remote_addr = (module->state_info_array)[target].addr + OSC_UCX_STATE_ACC_LOCK_OFFSET;
312-
ucs_status_t status;
309+
int ret;
313310

314-
status = opal_common_ucx_atomic_fetch(ep, UCP_ATOMIC_FETCH_OP_SWAP, TARGET_LOCK_UNLOCKED,
315-
&result_value, sizeof(result_value),
316-
remote_addr, rkey, mca_osc_ucx_component.ucp_worker);
317-
if (status != UCS_OK) {
318-
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
319-
"%s:%d: ucp_atomic_swap64 failed: %d\n",
320-
__FILE__, __LINE__, status);
321-
return OMPI_ERROR;
311+
ret = opal_common_ucx_atomic_fetch(ep, UCP_ATOMIC_FETCH_OP_SWAP, TARGET_LOCK_UNLOCKED,
312+
&result_value, sizeof(result_value),
313+
remote_addr, rkey, mca_osc_ucx_component.ucp_worker);
314+
if (OMPI_SUCCESS != ret) {
315+
return ret;
322316
}
323317

324318
assert(result_value == TARGET_LOCK_EXCLUSIVE);
@@ -336,6 +330,7 @@ static inline int get_dynamic_win_info(uint64_t remote_addr, ompi_osc_ucx_module
336330
uint64_t win_count;
337331
int contain, insert = -1;
338332
ucs_status_t status;
333+
int ret;
339334

340335
if ((module->win_info_array[target]).rkey_init == true) {
341336
ucp_rkey_destroy((module->win_info_array[target]).rkey);
@@ -350,12 +345,9 @@ static inline int get_dynamic_win_info(uint64_t remote_addr, ompi_osc_ucx_module
350345
return OMPI_ERROR;
351346
}
352347

353-
status = opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker);
354-
if (status != UCS_OK) {
355-
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
356-
"%s:%d: ucp_ep_flush failed: %d\n",
357-
__FILE__, __LINE__, status);
358-
return OMPI_ERROR;
348+
ret = opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker);
349+
if (ret != OMPI_SUCCESS) {
350+
return ret;
359351
}
360352

361353
memcpy(&win_count, temp_buf, sizeof(uint64_t));
@@ -529,7 +521,6 @@ int ompi_osc_ucx_accumulate(const void *origin_addr, int origin_count,
529521
uint32_t temp_count;
530522
ompi_datatype_t *temp_dt;
531523
ptrdiff_t temp_lb, temp_extent;
532-
ucs_status_t status;
533524
bool is_origin_contig = ompi_datatype_is_contiguous_memory_layout(origin_dt, origin_count);
534525

535526
if (ompi_datatype_is_predefined(target_dt)) {
@@ -553,12 +544,9 @@ int ompi_osc_ucx_accumulate(const void *origin_addr, int origin_count,
553544
return ret;
554545
}
555546

556-
status = opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker);
557-
if (status != UCS_OK) {
558-
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
559-
"%s:%d: ucp_ep_flush failed: %d\n",
560-
__FILE__, __LINE__, status);
561-
return OMPI_ERROR;
547+
ret = opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker);
548+
if (ret != OMPI_SUCCESS) {
549+
return ret;
562550
}
563551

564552
if (ompi_datatype_is_predefined(origin_dt) || is_origin_contig) {
@@ -610,12 +598,9 @@ int ompi_osc_ucx_accumulate(const void *origin_addr, int origin_count,
610598
return ret;
611599
}
612600

613-
status = opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker);
614-
if (status != UCS_OK) {
615-
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
616-
"%s:%d: ucp_ep_flush failed: %d\n",
617-
__FILE__, __LINE__, status);
618-
return OMPI_ERROR;
601+
ret = opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker);
602+
if (ret != OMPI_SUCCESS) {
603+
return ret;
619604
}
620605

621606
free(temp_addr_holder);
@@ -781,7 +766,6 @@ int ompi_osc_ucx_get_accumulate(const void *origin_addr, int origin_count,
781766
uint32_t temp_count;
782767
ompi_datatype_t *temp_dt;
783768
ptrdiff_t temp_lb, temp_extent;
784-
ucs_status_t status;
785769
bool is_origin_contig = ompi_datatype_is_contiguous_memory_layout(origin_dt, origin_count);
786770

787771
if (ompi_datatype_is_predefined(target_dt)) {
@@ -805,12 +789,9 @@ int ompi_osc_ucx_get_accumulate(const void *origin_addr, int origin_count,
805789
return ret;
806790
}
807791

808-
status = opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker);
809-
if (status != UCS_OK) {
810-
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
811-
"%s:%d: ucp_ep_flush failed: %d\n",
812-
__FILE__, __LINE__, status);
813-
return OMPI_ERROR;
792+
ret = opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker);
793+
if (ret != OMPI_SUCCESS) {
794+
return ret;
814795
}
815796

816797
if (ompi_datatype_is_predefined(origin_dt) || is_origin_contig) {
@@ -861,12 +842,9 @@ int ompi_osc_ucx_get_accumulate(const void *origin_addr, int origin_count,
861842
return ret;
862843
}
863844

864-
status = opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker);
865-
if (status != UCS_OK) {
866-
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
867-
"%s:%d: ucp_ep_flush failed: %d\n",
868-
__FILE__, __LINE__, status);
869-
return OMPI_ERROR;
845+
ret = opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker);
846+
if (ret != OMPI_SUCCESS) {
847+
return ret;
870848
}
871849

872850
free(temp_addr_holder);

ompi/mca/osc/ucx/osc_ucx_passive_target.c

Lines changed: 26 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,14 @@ static inline int start_shared(ompi_osc_ucx_module_t *module, int target) {
2424
ucp_rkey_h rkey = (module->state_info_array)[target].rkey;
2525
uint64_t remote_addr = (module->state_info_array)[target].addr + OSC_UCX_STATE_LOCK_OFFSET;
2626
ucs_status_t status;
27+
int ret;
2728

2829
while (true) {
29-
status = opal_common_ucx_atomic_fetch(ep, UCP_ATOMIC_FETCH_OP_FADD, 1,
30-
&result_value, sizeof(result_value),
31-
remote_addr, rkey, mca_osc_ucx_component.ucp_worker);
32-
if (status != UCS_OK) {
33-
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
34-
"%s:%d: ucp_atomic_fadd64 failed: %d\n",
35-
__FILE__, __LINE__, status);
36-
return OMPI_ERROR;
30+
ret = opal_common_ucx_atomic_fetch(ep, UCP_ATOMIC_FETCH_OP_FADD, 1,
31+
&result_value, sizeof(result_value),
32+
remote_addr, rkey, mca_osc_ucx_component.ucp_worker);
33+
if (OMPI_SUCCESS != ret) {
34+
return ret;
3735
}
3836
assert((int64_t)result_value >= 0);
3937
if (result_value >= TARGET_LOCK_EXCLUSIVE) {
@@ -99,16 +97,13 @@ static inline int end_exclusive(ompi_osc_ucx_module_t *module, int target) {
9997
ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, target);
10098
ucp_rkey_h rkey = (module->state_info_array)[target].rkey;
10199
uint64_t remote_addr = (module->state_info_array)[target].addr + OSC_UCX_STATE_LOCK_OFFSET;
102-
ucs_status_t status;
100+
int ret;
103101

104-
status = opal_common_ucx_atomic_fetch(ep, UCP_ATOMIC_FETCH_OP_SWAP, TARGET_LOCK_UNLOCKED,
105-
&result_value, sizeof(result_value),
106-
remote_addr, rkey, mca_osc_ucx_component.ucp_worker);
107-
if (status != UCS_OK) {
108-
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
109-
"%s:%d: ucp_atomic_swap64 failed: %d\n",
110-
__FILE__, __LINE__, status);
111-
return OMPI_ERROR;
102+
ret = opal_common_ucx_atomic_fetch(ep, UCP_ATOMIC_FETCH_OP_SWAP, TARGET_LOCK_UNLOCKED,
103+
&result_value, sizeof(result_value),
104+
remote_addr, rkey, mca_osc_ucx_component.ucp_worker);
105+
if (OMPI_SUCCESS != ret) {
106+
return ret;
112107
}
113108

114109
assert(result_value >= TARGET_LOCK_EXCLUSIVE);
@@ -169,7 +164,6 @@ int ompi_osc_ucx_lock(int lock_type, int target, int assert, struct ompi_win_t *
169164
int ompi_osc_ucx_unlock(int target, struct ompi_win_t *win) {
170165
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t *)win->w_osc_module;
171166
ompi_osc_ucx_lock_t *lock = NULL;
172-
ucs_status_t status;
173167
int ret = OMPI_SUCCESS;
174168
ucp_ep_h ep;
175169

@@ -186,12 +180,9 @@ int ompi_osc_ucx_unlock(int target, struct ompi_win_t *win) {
186180
(uint32_t)target);
187181

188182
ep = OSC_UCX_GET_EP(module->comm, target);
189-
status = opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker);
190-
if (status != UCS_OK) {
191-
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
192-
"%s:%d: ucp_ep_flush failed: %d\n",
193-
__FILE__, __LINE__, status);
194-
return OMPI_ERROR;
183+
ret = opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker);
184+
if (ret != OMPI_SUCCESS) {
185+
return ret;
195186
}
196187

197188
module->global_ops_num -= module->per_target_ops_nums[target];
@@ -252,7 +243,6 @@ int ompi_osc_ucx_lock_all(int assert, struct ompi_win_t *win) {
252243
int ompi_osc_ucx_unlock_all(struct ompi_win_t *win) {
253244
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*)win->w_osc_module;
254245
int comm_size = ompi_comm_size(module->comm);
255-
ucs_status_t status;
256246
int ret = OMPI_SUCCESS;
257247

258248
if (module->epoch_type.access != PASSIVE_ALL_EPOCH) {
@@ -261,12 +251,9 @@ int ompi_osc_ucx_unlock_all(struct ompi_win_t *win) {
261251

262252
assert(module->lock_count == 0);
263253

264-
status = opal_common_ucx_worker_flush(mca_osc_ucx_component.ucp_worker);
265-
if (status != UCS_OK) {
266-
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
267-
"%s:%d: ucp_worker_flush failed: %d\n",
268-
__FILE__, __LINE__, status);
269-
return OMPI_ERROR;
254+
ret = opal_common_ucx_worker_flush(mca_osc_ucx_component.ucp_worker);
255+
if (ret != OMPI_SUCCESS) {
256+
return ret;
270257
}
271258

272259
module->global_ops_num = 0;
@@ -309,20 +296,17 @@ int ompi_osc_ucx_sync(struct ompi_win_t *win) {
309296
int ompi_osc_ucx_flush(int target, struct ompi_win_t *win) {
310297
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
311298
ucp_ep_h ep;
312-
ucs_status_t status;
299+
int ret;
313300

314301
if (module->epoch_type.access != PASSIVE_EPOCH &&
315302
module->epoch_type.access != PASSIVE_ALL_EPOCH) {
316303
return OMPI_ERR_RMA_SYNC;
317304
}
318305

319306
ep = OSC_UCX_GET_EP(module->comm, target);
320-
status = opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker);
321-
if (status != UCS_OK) {
322-
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
323-
"%s:%d: ucp_ep_flush failed: %d\n",
324-
__FILE__, __LINE__, status);
325-
return OMPI_ERROR;
307+
ret = opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker);
308+
if (ret != OMPI_SUCCESS) {
309+
return ret;
326310
}
327311

328312
module->global_ops_num -= module->per_target_ops_nums[target];
@@ -333,19 +317,16 @@ int ompi_osc_ucx_flush(int target, struct ompi_win_t *win) {
333317

334318
int ompi_osc_ucx_flush_all(struct ompi_win_t *win) {
335319
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t *)win->w_osc_module;
336-
ucs_status_t status;
320+
int ret;
337321

338322
if (module->epoch_type.access != PASSIVE_EPOCH &&
339323
module->epoch_type.access != PASSIVE_ALL_EPOCH) {
340324
return OMPI_ERR_RMA_SYNC;
341325
}
342326

343-
status = opal_common_ucx_worker_flush(mca_osc_ucx_component.ucp_worker);
344-
if (status != UCS_OK) {
345-
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
346-
"%s:%d: ucp_worker_flush failed: %d\n",
347-
__FILE__, __LINE__, status);
348-
return OMPI_ERROR;
327+
ret = opal_common_ucx_worker_flush(mca_osc_ucx_component.ucp_worker);
328+
if (ret != OMPI_SUCCESS) {
329+
return ret;
349330
}
350331

351332
module->global_ops_num = 0;

0 commit comments

Comments
 (0)