Skip to content

Commit 56afecd

Browse files
author
Mamzi Bayatpour mbayatpour@nvidia.com ()
committed
OSC/UCX: Adding locks to win attach/deattach and fixing build warnings
Signed-off-by: Mamzi Bayatpour <mbayatpour@nvidia.com>
1 parent f0101dc commit 56afecd

File tree

3 files changed

+176
-130
lines changed

3 files changed

+176
-130
lines changed

ompi/mca/osc/ucx/osc_ucx.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,5 +228,10 @@ int ompi_osc_ucx_flush_local_all(struct ompi_win_t *win);
228228
int ompi_osc_find_attached_region_position(ompi_osc_dynamic_win_info_t *dynamic_wins,
229229
int min_index, int max_index,
230230
uint64_t base, size_t len, int *insert);
231+
extern inline bool ompi_osc_need_acc_lock(ompi_osc_ucx_module_t *module, int target);
232+
extern inline int ompi_osc_state_lock(ompi_osc_ucx_module_t *module, int target,
233+
bool *lock_acquired, bool force_lock);
234+
extern inline int ompi_osc_state_unlock(ompi_osc_ucx_module_t *module, int target,
235+
bool lock_acquired, void *free_ptr);
231236

232237
#endif /* OMPI_OSC_UCX_H */

ompi/mca/osc/ucx/osc_ucx_comm.c

Lines changed: 60 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,12 @@
2727
return OMPI_ERROR; \
2828
}
2929

30-
#define CHECK_DYNAMIC_WIN(_remote_addr, _module, _target, _ret) \
31-
if (_module->flavor == MPI_WIN_FLAVOR_DYNAMIC) { \
32-
_ret = get_dynamic_win_info(_remote_addr, _module, _target); \
33-
if (_ret != OMPI_SUCCESS) { \
34-
return _ret; \
35-
} \
30+
#define CHECK_DYNAMIC_WIN(_remote_addr, _module, _target, _ret, _lock_required) \
31+
if (_module->flavor == MPI_WIN_FLAVOR_DYNAMIC) { \
32+
_ret = get_dynamic_win_info(_remote_addr, _module, _target, _lock_required); \
33+
if (_ret != OMPI_SUCCESS) { \
34+
return _ret; \
35+
} \
3636
}
3737

3838
typedef struct ucx_iovec {
@@ -251,89 +251,8 @@ static inline int ddt_put_get(ompi_osc_ucx_module_t *module,
251251
return ret;
252252
}
253253

254-
static inline bool need_acc_lock(ompi_osc_ucx_module_t *module, int target)
255-
{
256-
ompi_osc_ucx_lock_t *lock = NULL;
257-
opal_hash_table_get_value_uint32(&module->outstanding_locks,
258-
(uint32_t) target, (void **) &lock);
259-
260-
/* if there is an exclusive lock there is no need to acqurie the accumulate lock */
261-
return !(NULL != lock && LOCK_EXCLUSIVE == lock->type);
262-
}
263-
264-
static inline int start_atomicity(
265-
ompi_osc_ucx_module_t *module,
266-
int target,
267-
bool *lock_acquired) {
268-
uint64_t result_value = -1;
269-
uint64_t remote_addr = (module->state_addrs)[target] + OSC_UCX_STATE_ACC_LOCK_OFFSET;
270-
int ret = OMPI_SUCCESS;
271-
272-
if (need_acc_lock(module, target)) {
273-
for (;;) {
274-
ret = opal_common_ucx_wpmem_cmpswp(module->state_mem,
275-
TARGET_LOCK_UNLOCKED, TARGET_LOCK_EXCLUSIVE,
276-
target, &result_value, sizeof(result_value),
277-
remote_addr);
278-
if (ret != OMPI_SUCCESS) {
279-
OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_cmpswp failed: %d", ret);
280-
return OMPI_ERROR;
281-
}
282-
if (result_value == TARGET_LOCK_UNLOCKED) {
283-
break;
284-
}
285-
286-
opal_common_ucx_wpool_progress(mca_osc_ucx_component.wpool);
287-
}
288-
289-
*lock_acquired = true;
290-
} else {
291-
*lock_acquired = false;
292-
}
293-
294-
return OMPI_SUCCESS;
295-
}
296-
297-
static inline int end_atomicity(
298-
ompi_osc_ucx_module_t *module,
299-
int target,
300-
bool lock_acquired,
301-
void *free_ptr) {
302-
uint64_t remote_addr = (module->state_addrs)[target] + OSC_UCX_STATE_ACC_LOCK_OFFSET;
303-
int ret = OMPI_SUCCESS;
304-
305-
if (lock_acquired) {
306-
uint64_t result_value = 0;
307-
/* fence any still active operations */
308-
ret = opal_common_ucx_wpmem_fence(module->mem);
309-
if (ret != OMPI_SUCCESS) {
310-
OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_fence failed: %d", ret);
311-
return OMPI_ERROR;
312-
}
313-
314-
ret = opal_common_ucx_wpmem_fetch(module->state_mem,
315-
UCP_ATOMIC_FETCH_OP_SWAP, TARGET_LOCK_UNLOCKED,
316-
target, &result_value, sizeof(result_value),
317-
remote_addr);
318-
assert(result_value == TARGET_LOCK_EXCLUSIVE);
319-
} else if (NULL != free_ptr){
320-
/* flush before freeing the buffer */
321-
ret = opal_common_ucx_ctx_flush(module->ctx, OPAL_COMMON_UCX_SCOPE_EP, target);
322-
}
323-
/* TODO: encapsulate in a request and make the release non-blocking */
324-
if (NULL != free_ptr) {
325-
free(free_ptr);
326-
}
327-
if (ret != OMPI_SUCCESS) {
328-
OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_fetch failed: %d", ret);
329-
return OMPI_ERROR;
330-
}
331-
332-
return ret;
333-
}
334-
335254
static inline int get_dynamic_win_info(uint64_t remote_addr, ompi_osc_ucx_module_t *module,
336-
int target) {
255+
int target, bool lock_required) {
337256
uint64_t remote_state_addr = (module->state_addrs)[target] + OSC_UCX_STATE_DYNAMIC_WIN_CNT_OFFSET;
338257
size_t remote_state_len = sizeof(uint64_t) + sizeof(ompi_osc_dynamic_win_info_t) * OMPI_OSC_UCX_ATTACH_MAX;
339258
char *temp_buf = calloc(remote_state_len, 1);
@@ -343,6 +262,17 @@ static inline int get_dynamic_win_info(uint64_t remote_addr, ompi_osc_ucx_module
343262
int insert = -1;
344263
int ret;
345264

265+
bool lock_acquired = false;
266+
if (lock_required) {
267+
/* We need to lock acc-lock even if the process has an exclusive lock.
268+
* Therefore, force lock is needed. Remote process protects its window
269+
* attach/detach operations with an acc-lock */
270+
ret = ompi_osc_state_lock(module, target, &lock_acquired, true);
271+
if (ret != OMPI_SUCCESS) {
272+
return ret;
273+
}
274+
}
275+
346276
ret = opal_common_ucx_wpmem_putget(module->state_mem, OPAL_COMMON_UCX_GET, target,
347277
(void *)((intptr_t)temp_buf),
348278
remote_state_len, remote_state_addr);
@@ -360,32 +290,36 @@ static inline int get_dynamic_win_info(uint64_t remote_addr, ompi_osc_ucx_module
360290

361291
memcpy(&win_count, temp_buf, sizeof(uint64_t));
362292
if (win_count > OMPI_OSC_UCX_ATTACH_MAX) {
363-
return MPI_ERR_RMA_RANGE;
293+
ret = MPI_ERR_RMA_RANGE;
294+
goto cleanup;
364295
}
365296

366297
temp_dynamic_wins = (ompi_osc_dynamic_win_info_t *)(temp_buf + sizeof(uint64_t));
367298
contain = ompi_osc_find_attached_region_position(temp_dynamic_wins, 0, win_count - 1,
368299
remote_addr, 1, &insert);
369-
if (contain < 0 || contain >= win_count) {
370-
return MPI_ERR_RMA_RANGE;
300+
if (contain < 0 || contain >= (int)win_count) {
301+
OSC_UCX_ERROR("Dynamic window index not found contain: %d win_count: %d\n",
302+
contain, win_count);
303+
ret = MPI_ERR_RMA_RANGE;
304+
goto cleanup;
371305
}
372306

373307
assert(module->mem != NULL);
374308

375309
_mem_record_t *mem_rec = NULL;
376310
ret = opal_tsd_tracked_key_get(&module->mem->tls_key, (void **) &mem_rec);
377311
if (OPAL_SUCCESS != ret) {
378-
return ret;
312+
goto cleanup;
379313
}
380314

381315
if (mem_rec == NULL) {
382316
ret = opal_common_ucx_tlocal_fetch_spath(module->mem, target);
383317
if (OPAL_SUCCESS != ret) {
384-
return ret;
318+
goto cleanup;
385319
}
386320
ret = opal_tsd_tracked_key_get(&module->mem->tls_key, (void **) &mem_rec);
387321
if (OPAL_SUCCESS != ret) {
388-
return ret;
322+
goto cleanup;
389323
}
390324

391325
}
@@ -408,12 +342,15 @@ static inline int get_dynamic_win_info(uint64_t remote_addr, ompi_osc_ucx_module
408342

409343
if (ret != UCS_OK) {
410344
MCA_COMMON_UCX_VERBOSE(1, "ucp_ep_rkey_unpack failed: %d", ret);
411-
return OPAL_ERROR;
345+
ret = OPAL_ERROR;
346+
goto cleanup;
412347
}
413348

414349
cleanup:
415350
free(temp_buf);
416351

352+
ompi_osc_state_unlock(module, target, lock_acquired, NULL);
353+
417354
return ret;
418355
}
419356

@@ -486,7 +423,7 @@ static int do_atomic_op_intrinsic(
486423

487424
uint64_t remote_addr = (module->addrs[target]) + target_disp * OSC_UCX_GET_DISP(module, target);
488425

489-
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret);
426+
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret, true);
490427

491428
ucp_atomic_fetch_op_t opcode;
492429
bool is_no_op = false;
@@ -555,7 +492,7 @@ int ompi_osc_ucx_put(const void *origin_addr, int origin_count, struct ompi_data
555492
return ret;
556493
}
557494

558-
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret);
495+
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret, true);
559496

560497
if (!target_count) {
561498
return OMPI_SUCCESS;
@@ -605,13 +542,12 @@ int ompi_osc_ucx_get(void *origin_addr, int origin_count,
605542
return ret;
606543
}
607544

608-
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret);
545+
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret, true);
609546

610547
if (!target_count) {
611548
return OMPI_SUCCESS;
612549
}
613550

614-
615551
ompi_datatype_get_true_extent(origin_dt, &origin_lb, &origin_extent);
616552
ompi_datatype_get_true_extent(target_dt, &target_lb, &target_extent);
617553

@@ -673,13 +609,14 @@ int accumulate_req(const void *origin_addr, int origin_count,
673609
target_disp, NULL, ucx_req);
674610
}
675611

676-
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret);
677-
678-
ret = start_atomicity(module, target, &lock_acquired);
612+
/* Start atomicity by acquiring acc lock */
613+
ret = ompi_osc_state_lock(module, target, &lock_acquired, false);
679614
if (ret != OMPI_SUCCESS) {
680615
return ret;
681616
}
682617

618+
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret, !lock_acquired);
619+
683620
if (op == &ompi_mpi_op_replace.op) {
684621
ret = ompi_osc_ucx_put(origin_addr, origin_count, origin_dt, target,
685622
target_disp, target_count, target_dt, win);
@@ -781,7 +718,7 @@ int accumulate_req(const void *origin_addr, int origin_count,
781718
ompi_request_complete(&ucx_req->super, true);
782719
}
783720

784-
return end_atomicity(module, target, lock_acquired, free_ptr);
721+
return ompi_osc_state_unlock(module, target, lock_acquired, free_ptr);
785722
}
786723

787724
int ompi_osc_ucx_accumulate(const void *origin_addr, int origin_count,
@@ -804,13 +741,14 @@ do_atomic_compare_and_swap(const void *origin_addr, const void *compare_addr,
804741
size_t dt_bytes;
805742
opal_common_ucx_wpmem_t *mem = module->mem;
806743
if (!module->acc_single_intrinsic) {
807-
ret = start_atomicity(module, target, &lock_acquired);
744+
/* Start atomicity by acquiring acc lock */
745+
ret = ompi_osc_state_lock(module, target, &lock_acquired, false);
808746
if (ret != OMPI_SUCCESS) {
809747
return ret;
810748
}
811749
}
812750

813-
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret);
751+
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret, !lock_acquired);
814752

815753
ompi_datatype_type_size(dt, &dt_bytes);
816754
uint64_t compare_val = opal_common_ucx_load_uint64(compare_addr, dt_bytes);
@@ -823,7 +761,7 @@ do_atomic_compare_and_swap(const void *origin_addr, const void *compare_addr,
823761
return ret;
824762
}
825763

826-
return end_atomicity(module, target, lock_acquired, NULL);
764+
return ompi_osc_state_unlock(module, target, lock_acquired, NULL);
827765
}
828766

829767
int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_addr,
@@ -842,8 +780,6 @@ int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_a
842780
return ret;
843781
}
844782

845-
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret);
846-
847783
ompi_datatype_type_size(dt, &dt_bytes);
848784
if (ompi_osc_base_is_atomic_size_supported(remote_addr, dt_bytes)) {
849785
// fast path using UCX atomic operations
@@ -854,11 +790,14 @@ int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_a
854790

855791
/* fall back to get-compare-put */
856792

857-
ret = start_atomicity(module, target, &lock_acquired);
793+
/* Start atomicity by acquiring acc lock */
794+
ret = ompi_osc_state_lock(module, target, &lock_acquired, false);
858795
if (ret != OMPI_SUCCESS) {
859796
return ret;
860797
}
861798

799+
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret, !lock_acquired);
800+
862801
ret = opal_common_ucx_wpmem_putget(mem, OPAL_COMMON_UCX_GET, target,
863802
result_addr, dt_bytes, remote_addr);
864803
if (OPAL_SUCCESS != ret) {
@@ -881,7 +820,7 @@ int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_a
881820
}
882821
}
883822

884-
return end_atomicity(module, target, lock_acquired, NULL);
823+
return ompi_osc_state_unlock(module, target, lock_acquired, NULL);
885824
}
886825

887826
int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr,
@@ -907,13 +846,14 @@ int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr,
907846
bool lock_acquired = false;
908847

909848
if (!module->acc_single_intrinsic) {
910-
ret = start_atomicity(module, target, &lock_acquired);
849+
/* Start atomicity by acquiring acc lock */
850+
ret = ompi_osc_state_lock(module, target, &lock_acquired, false);
911851
if (ret != OMPI_SUCCESS) {
912852
return ret;
913853
}
914854
}
915855

916-
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret);
856+
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret, !lock_acquired);
917857

918858
value = origin_addr ? opal_common_ucx_load_uint64(origin_addr, dt_bytes) : 0;
919859

@@ -934,7 +874,7 @@ int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr,
934874
return ret;
935875
}
936876

937-
return end_atomicity(module, target, lock_acquired, NULL);
877+
return ompi_osc_state_unlock(module, target, lock_acquired, NULL);
938878
} else {
939879
return ompi_osc_ucx_get_accumulate(origin_addr, 1, dt, result_addr, 1, dt,
940880
target, target_disp, 1, dt, op, win);
@@ -970,13 +910,14 @@ int get_accumulate_req(const void *origin_addr, int origin_count,
970910
target_disp, result_addr, ucx_req);
971911
}
972912

973-
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret);
974-
975-
ret = start_atomicity(module, target, &lock_acquired);
913+
/* Start atomicity by acquiring acc lock */
914+
ret = ompi_osc_state_lock(module, target, &lock_acquired, false);
976915
if (ret != OMPI_SUCCESS) {
977916
return ret;
978917
}
979918

919+
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret, !lock_acquired);
920+
980921
ret = ompi_osc_ucx_get(result_addr, result_count, result_dt, target,
981922
target_disp, target_count, target_dt, win);
982923
if (ret != OMPI_SUCCESS) {
@@ -1087,7 +1028,7 @@ int get_accumulate_req(const void *origin_addr, int origin_count,
10871028
}
10881029

10891030

1090-
return end_atomicity(module, target, lock_acquired, free_addr);
1031+
return ompi_osc_state_unlock(module, target, lock_acquired, free_addr);
10911032
}
10921033

10931034
int ompi_osc_ucx_get_accumulate(const void *origin_addr, int origin_count,
@@ -1119,7 +1060,7 @@ int ompi_osc_ucx_rput(const void *origin_addr, int origin_count,
11191060
return ret;
11201061
}
11211062

1122-
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret);
1063+
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret, true);
11231064

11241065
OMPI_OSC_UCX_REQUEST_ALLOC(win, ucx_req);
11251066
assert(NULL != ucx_req);
@@ -1175,7 +1116,7 @@ int ompi_osc_ucx_rget(void *origin_addr, int origin_count,
11751116
return ret;
11761117
}
11771118

1178-
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret);
1119+
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret, true);
11791120

11801121
OMPI_OSC_UCX_REQUEST_ALLOC(win, ucx_req);
11811122
assert(NULL != ucx_req);

0 commit comments

Comments
 (0)