Skip to content

Commit 9a36555

Browse files
authored
Merge pull request #5879 from hoopoepg/topic/fixed-zero-size-window
OSC/UCX: fixed zero-size window processing
2 parents 98ad78e + ae6f819 commit 9a36555

File tree

2 files changed

+54
-13
lines changed

2 files changed

+54
-13
lines changed

ompi/mca/osc/ucx/osc_ucx_comm.c

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,13 @@
1717
#include "osc_ucx.h"
1818
#include "osc_ucx_request.h"
1919

20+
21+
#define CHECK_VALID_RKEY(_module, _target, _count) \
22+
if (!((_module)->win_info_array[_target]).rkey_init && ((_count) > 0)) { \
23+
OSC_UCX_VERBOSE(1, "window with non-zero length does not have an rkey"); \
24+
return OMPI_ERROR; \
25+
}
26+
2027
typedef struct ucx_iovec {
2128
void *addr;
2229
size_t len;
@@ -380,6 +387,12 @@ int ompi_osc_ucx_put(const void *origin_addr, int origin_count, struct ompi_data
380387
}
381388
}
382389

390+
CHECK_VALID_RKEY(module, target, target_count);
391+
392+
if (!target_count) {
393+
return OMPI_SUCCESS;
394+
}
395+
383396
rkey = (module->win_info_array[target]).rkey;
384397

385398
ompi_datatype_get_true_extent(origin_dt, &origin_lb, &origin_extent);
@@ -434,6 +447,12 @@ int ompi_osc_ucx_get(void *origin_addr, int origin_count,
434447
}
435448
}
436449

450+
CHECK_VALID_RKEY(module, target, target_count);
451+
452+
if (!target_count) {
453+
return OMPI_SUCCESS;
454+
}
455+
437456
rkey = (module->win_info_array[target]).rkey;
438457

439458
ompi_datatype_get_true_extent(origin_dt, &origin_lb, &origin_extent);
@@ -860,6 +879,8 @@ int ompi_osc_ucx_rput(const void *origin_addr, int origin_count,
860879
}
861880
}
862881

882+
CHECK_VALID_RKEY(module, target, target_count);
883+
863884
rkey = (module->win_info_array[target]).rkey;
864885

865886
OMPI_OSC_UCX_REQUEST_ALLOC(win, ucx_req);
@@ -919,6 +940,8 @@ int ompi_osc_ucx_rget(void *origin_addr, int origin_count,
919940
}
920941
}
921942

943+
CHECK_VALID_RKEY(module, target, target_count);
944+
922945
rkey = (module->win_info_array[target]).rkey;
923946

924947
OMPI_OSC_UCX_REQUEST_ALLOC(win, ucx_req);

ompi/mca/osc/ucx/osc_ucx_component.c

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@
2020
#include "osc_ucx.h"
2121
#include "osc_ucx_request.h"
2222

23+
#define memcpy_off(_dst, _src, _len, _off) \
24+
memcpy(((char*)(_dst)) + (_off), _src, _len); \
25+
(_off) += (_len);
26+
2327
static int component_open(void);
2428
static int component_register(void);
2529
static int component_init(bool enable_progress_threads, bool enable_mpi_threads);
@@ -278,6 +282,8 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
278282
int disps[comm_size];
279283
int rkey_sizes[comm_size];
280284
uint64_t zero = 0;
285+
size_t info_offset;
286+
uint64_t size_u64;
281287

282288
/* the osc/sm component is the exclusive provider for support for
283289
* shared memory windows */
@@ -521,22 +527,27 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
521527
goto error;
522528
}
523529

524-
my_info_len = 2 * sizeof(uint64_t) + rkey_buffer_size + state_rkey_buffer_size;
530+
size_u64 = (uint64_t)size;
531+
my_info_len = 3 * sizeof(uint64_t) + rkey_buffer_size + state_rkey_buffer_size;
525532
my_info = malloc(my_info_len);
526533
if (my_info == NULL) {
527534
ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
528535
goto error;
529536
}
530537

538+
info_offset = 0;
539+
531540
if (flavor == MPI_WIN_FLAVOR_ALLOCATE || flavor == MPI_WIN_FLAVOR_CREATE) {
532-
memcpy(my_info, base, sizeof(uint64_t));
541+
memcpy_off(my_info, base, sizeof(uint64_t), info_offset);
533542
} else {
534-
memcpy(my_info, &zero, sizeof(uint64_t));
543+
memcpy_off(my_info, &zero, sizeof(uint64_t), info_offset);
535544
}
536-
memcpy((void *)((char *)my_info + sizeof(uint64_t)), &state_base, sizeof(uint64_t));
537-
memcpy((void *)((char *)my_info + 2 * sizeof(uint64_t)), rkey_buffer, rkey_buffer_size);
538-
memcpy((void *)((char *)my_info + 2 * sizeof(uint64_t) + rkey_buffer_size),
539-
state_rkey_buffer, state_rkey_buffer_size);
545+
memcpy_off(my_info, &state_base, sizeof(uint64_t), info_offset);
546+
memcpy_off(my_info, &size_u64, sizeof(uint64_t), info_offset);
547+
memcpy_off(my_info, rkey_buffer, rkey_buffer_size, info_offset);
548+
memcpy_off(my_info, state_rkey_buffer, state_rkey_buffer_size, info_offset);
549+
550+
assert(my_info_len == info_offset);
540551

541552
ret = allgather_len_and_info(my_info, (int)my_info_len, &recv_buf, disps, module->comm);
542553
if (ret != OMPI_SUCCESS) {
@@ -552,25 +563,32 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
552563

553564
for (i = 0; i < comm_size; i++) {
554565
ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, i);
566+
uint64_t dest_size;
555567
assert(ep != NULL);
556568

557-
memcpy(&(module->win_info_array[i]).addr, &recv_buf[disps[i]], sizeof(uint64_t));
558-
memcpy(&(module->state_info_array[i]).addr, &recv_buf[disps[i] + sizeof(uint64_t)],
559-
sizeof(uint64_t));
569+
info_offset = disps[i];
570+
571+
memcpy(&(module->win_info_array[i]).addr, &recv_buf[info_offset], sizeof(uint64_t));
572+
info_offset += sizeof(uint64_t);
573+
memcpy(&(module->state_info_array[i]).addr, &recv_buf[info_offset], sizeof(uint64_t));
574+
info_offset += sizeof(uint64_t);
575+
memcpy(&dest_size, &recv_buf[info_offset], sizeof(uint64_t));
576+
info_offset += sizeof(uint64_t);
560577

561578
(module->win_info_array[i]).rkey_init = false;
562-
if (size > 0 && (flavor == MPI_WIN_FLAVOR_ALLOCATE || flavor == MPI_WIN_FLAVOR_CREATE)) {
563-
status = ucp_ep_rkey_unpack(ep, &(recv_buf[disps[i] + 2 * sizeof(uint64_t)]),
579+
if (dest_size > 0 && (flavor == MPI_WIN_FLAVOR_ALLOCATE || flavor == MPI_WIN_FLAVOR_CREATE)) {
580+
status = ucp_ep_rkey_unpack(ep, &recv_buf[info_offset],
564581
&((module->win_info_array[i]).rkey));
565582
if (status != UCS_OK) {
566583
OSC_UCX_VERBOSE(1, "ucp_ep_rkey_unpack failed: %d", status);
567584
ret = OMPI_ERROR;
568585
goto error;
569586
}
587+
info_offset += rkey_sizes[i];
570588
(module->win_info_array[i]).rkey_init = true;
571589
}
572590

573-
status = ucp_ep_rkey_unpack(ep, &(recv_buf[disps[i] + 2 * sizeof(uint64_t) + rkey_sizes[i]]),
591+
status = ucp_ep_rkey_unpack(ep, &recv_buf[info_offset],
574592
&((module->state_info_array[i]).rkey));
575593
if (status != UCS_OK) {
576594
OSC_UCX_VERBOSE(1, "ucp_ep_rkey_unpack failed: %d", status);

0 commit comments

Comments
 (0)