Skip to content

Commit 20b210d

Browse files
authored
Merge pull request #10362 from MamziB/mamzi/ucx-shared-windows-support
OSC/UCX: Adding shared memory windows support and a fix in wpmem_map
2 parents fa08eac + 356f2ab commit 20b210d

File tree

3 files changed

+231
-13
lines changed

3 files changed

+231
-13
lines changed

ompi/mca/osc/ucx/osc_ucx.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
#include "ompi/communicator/communicator.h"
1717
#include "opal/mca/common/ucx/common_ucx.h"
1818
#include "opal/mca/common/ucx/common_ucx_wpool.h"
19+
#include "opal/mca/shmem/shmem.h"
20+
#include "opal/mca/shmem/base/base.h"
1921

2022
#define OSC_UCX_ASSERT MCA_COMMON_UCX_ASSERT
2123
#define OSC_UCX_ERROR MCA_COMMON_UCX_ERROR
@@ -36,6 +38,8 @@ typedef struct ompi_osc_ucx_component {
3638
bool no_locks; /* Default value of the no_locks info key for new windows */
3739
bool acc_single_intrinsic;
3840
unsigned int priority;
41+
/* directory where to place backing files */
42+
char *backing_directory;
3943
} ompi_osc_ucx_component_t;
4044

4145
OMPI_DECLSPEC extern ompi_osc_ucx_component_t mca_osc_ucx_component;
@@ -120,6 +124,15 @@ typedef struct ompi_osc_ucx_module {
120124
opal_common_ucx_ctx_t *ctx;
121125
opal_common_ucx_wpmem_t *mem;
122126
opal_common_ucx_wpmem_t *state_mem;
127+
128+
bool noncontig_shared_win;
129+
size_t *sizes;
130+
/* in shared windows, shmem_addrs can be used for direct load store to
131+
* remote windows */
132+
uint64_t *shmem_addrs;
133+
void *segment_base;
134+
/** opal shared memory structure for the shared memory segment */
135+
opal_shmem_ds_t seg_ds;
123136
} ompi_osc_ucx_module_t;
124137

125138
typedef enum locktype {
@@ -137,6 +150,8 @@ typedef struct ompi_osc_ucx_lock {
137150
#define OSC_UCX_GET_EP(comm_, rank_) (ompi_comm_peer_lookup(comm_, rank_)->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_UCX])
138151
#define OSC_UCX_GET_DISP(module_, rank_) ((module_->disp_unit < 0) ? module_->disp_units[rank_] : module_->disp_unit)
139152

153+
int ompi_osc_ucx_shared_query(struct ompi_win_t *win, int rank, size_t *size,
154+
int *disp_unit, void * baseptr);
140155
int ompi_osc_ucx_win_attach(struct ompi_win_t *win, void *base, size_t len);
141156
int ompi_osc_ucx_win_detach(struct ompi_win_t *win, const void *base);
142157
int ompi_osc_ucx_free(struct ompi_win_t *win);

ompi/mca/osc/ucx/osc_ucx_component.c

Lines changed: 211 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
#include "osc_ucx.h"
2424
#include "osc_ucx_request.h"
25+
#include "opal/util/sys_limits.h"
2526

2627
#define memcpy_off(_dst, _src, _len, _off) \
2728
memcpy(((char*)(_dst)) + (_off), _src, _len); \
@@ -81,6 +82,7 @@ ompi_osc_ucx_component_t mca_osc_ucx_component = {
8182

8283
ompi_osc_ucx_module_t ompi_osc_ucx_module_template = {
8384
{
85+
.osc_win_shared_query = ompi_osc_ucx_shared_query,
8486
.osc_win_attach = ompi_osc_ucx_win_attach,
8587
.osc_win_detach = ompi_osc_ucx_win_detach,
8688
.osc_free = ompi_osc_ucx_free,
@@ -182,6 +184,19 @@ static int component_register(void) {
182184

183185
opal_common_ucx_mca_var_register(&mca_osc_ucx_component.super.osc_version);
184186

187+
if (0 == access ("/dev/shm", W_OK)) {
188+
mca_osc_ucx_component.backing_directory = "/dev/shm";
189+
} else {
190+
mca_osc_ucx_component.backing_directory = ompi_process_info.proc_session_dir;
191+
}
192+
193+
(void) mca_base_component_var_register (&mca_osc_ucx_component.super.osc_version, "backing_directory",
194+
"Directory to place backing files for memory windows. "
195+
"This directory should be on a local filesystem such as /tmp or "
196+
"/dev/shm (default: (linux) /dev/shm, (others) session directory)",
197+
MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0, OPAL_INFO_LVL_3,
198+
MCA_BASE_VAR_SCOPE_READONLY, &mca_osc_ucx_component.backing_directory);
199+
185200
return OMPI_SUCCESS;
186201
}
187202

@@ -210,7 +225,6 @@ static int component_finalize(void) {
210225

211226
static int component_query(struct ompi_win_t *win, void **base, size_t size, int disp_unit,
212227
struct ompi_communicator_t *comm, struct opal_info_t *info, int flavor) {
213-
if (MPI_WIN_FLAVOR_SHARED == flavor) return -1;
214228
return mca_osc_ucx_component.priority;
215229
}
216230

@@ -299,14 +313,54 @@ static const char* ompi_osc_ucx_set_no_lock_info(opal_infosubscriber_t *obj, con
299313
return module->no_locks ? "true" : "false";
300314
}
301315

316+
int ompi_osc_ucx_shared_query(struct ompi_win_t *win, int rank, size_t *size,
317+
int *disp_unit, void *baseptr)
318+
{
319+
ompi_osc_ucx_module_t *module =
320+
(ompi_osc_ucx_module_t*) win->w_osc_module;
321+
322+
if (module->flavor != MPI_WIN_FLAVOR_SHARED) {
323+
return MPI_ERR_WIN;
324+
}
325+
326+
if (MPI_PROC_NULL != rank) {
327+
*size = module->sizes[rank];
328+
*((void**) baseptr) = module->shmem_addrs[rank];
329+
if (module->disp_unit == -1) {
330+
*disp_unit = module->disp_units[rank];
331+
} else {
332+
*disp_unit = module->disp_unit;
333+
}
334+
} else {
335+
int i = 0;
336+
337+
*size = 0;
338+
*((void**) baseptr) = NULL;
339+
*disp_unit = 0;
340+
for (i = 0 ; i < ompi_comm_size(module->comm) ; ++i) {
341+
if (0 != module->sizes[i]) {
342+
*size = module->sizes[i];
343+
*((void**) baseptr) = module->shmem_addrs[i];
344+
if (module->disp_unit == -1) {
345+
*disp_unit = module->disp_units[rank];
346+
} else {
347+
*disp_unit = module->disp_unit;
348+
}
349+
break;
350+
}
351+
}
352+
}
353+
354+
return OMPI_SUCCESS;
355+
}
356+
302357
static int component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit,
303358
struct ompi_communicator_t *comm, struct opal_info_t *info,
304359
int flavor, int *model) {
305360
ompi_osc_ucx_module_t *module = NULL;
306361
char *name = NULL;
307362
long values[2];
308363
int ret = OMPI_SUCCESS;
309-
//ucs_status_t status;
310364
int i, comm_size = ompi_comm_size(comm);
311365
bool env_initialized = false;
312366
void *state_base = NULL;
@@ -316,12 +370,10 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
316370
int my_mem_addr_size;
317371
void * my_info = NULL;
318372
char *recv_buf = NULL;
319-
320-
/* the osc/sm component is the exclusive provider for support for
321-
* shared memory windows */
322-
if (flavor == MPI_WIN_FLAVOR_SHARED) {
323-
return OMPI_ERR_NOT_SUPPORTED;
324-
}
373+
unsigned long total, *rbuf;
374+
int flag;
375+
size_t pagesize;
376+
bool unlink_needed = false;
325377

326378
/* May be called concurrently - protect */
327379
_osc_ucx_init_lock();
@@ -442,17 +494,151 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
442494
goto error;
443495
}
444496

445-
if (flavor == MPI_WIN_FLAVOR_ALLOCATE || flavor == MPI_WIN_FLAVOR_CREATE) {
497+
if (flavor == MPI_WIN_FLAVOR_SHARED) {
498+
/* create the segment */
499+
opal_output_verbose(MCA_BASE_VERBOSE_DEBUG, ompi_osc_base_framework.framework_output,
500+
"allocating shared memory region of size %ld\n", (long) size);
501+
/* get the pagesize */
502+
pagesize = opal_getpagesize();
503+
504+
rbuf = malloc(sizeof(unsigned long) * comm_size);
505+
if (NULL == rbuf) return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
506+
507+
/* Note that the alloc_shared_noncontig info key only has
508+
* meaning during window creation. Once the window is
509+
* created, we can't move memory around without making
510+
* everything miserable. So we intentionally do not subcribe
511+
* to updates on the info key, because there's no useful
512+
* update to occur. */
513+
module->noncontig_shared_win = false;
514+
if (OMPI_SUCCESS != opal_info_get_bool(info, "alloc_shared_noncontig",
515+
&module->noncontig_shared_win, &flag)) {
516+
goto error;
517+
}
518+
519+
if (module->noncontig_shared_win) {
520+
opal_output_verbose(MCA_BASE_VERBOSE_DEBUG, ompi_osc_base_framework.framework_output,
521+
"allocating window using non-contiguous strategy");
522+
total = ((size - 1) / pagesize + 1) * pagesize;
523+
} else {
524+
opal_output_verbose(MCA_BASE_VERBOSE_DEBUG, ompi_osc_base_framework.framework_output,
525+
"allocating window using contiguous strategy");
526+
total = size;
527+
}
528+
ret = module->comm->c_coll->coll_allgather(&total, 1, MPI_UNSIGNED_LONG,
529+
rbuf, 1, MPI_UNSIGNED_LONG,
530+
module->comm,
531+
module->comm->c_coll->coll_allgather_module);
532+
if (OMPI_SUCCESS != ret) return ret;
533+
534+
total = 0;
535+
for (i = 0 ; i < comm_size ; ++i) {
536+
total += rbuf[i];
537+
}
538+
539+
module->segment_base = NULL;
540+
module->shmem_addrs = NULL;
541+
module->sizes = NULL;
542+
543+
if (total != 0) {
544+
/* user opal/shmem directly to create a shared memory segment */
545+
if (0 == ompi_comm_rank (module->comm)) {
546+
char *data_file;
547+
ret = opal_asprintf (&data_file, "%s" OPAL_PATH_SEP "osc_ucx.%s.%x.%d.%s",
548+
mca_osc_ucx_component.backing_directory, ompi_process_info.nodename,
549+
OMPI_PROC_MY_NAME->jobid, (int) OMPI_PROC_MY_NAME->vpid,
550+
ompi_comm_print_cid(module->comm));
551+
if (ret < 0) {
552+
free(rbuf);
553+
return OMPI_ERR_OUT_OF_RESOURCE;
554+
}
555+
556+
ret = opal_shmem_segment_create (&module->seg_ds, data_file, total);
557+
free(data_file);
558+
if (OPAL_SUCCESS != ret) {
559+
free(rbuf);
560+
goto error;
561+
}
562+
563+
unlink_needed = true;
564+
}
565+
566+
ret = module->comm->c_coll->coll_bcast (&module->seg_ds, sizeof (module->seg_ds), MPI_BYTE, 0,
567+
module->comm, module->comm->c_coll->coll_bcast_module);
568+
if (OMPI_SUCCESS != ret) {
569+
free(rbuf);
570+
goto error;
571+
}
572+
573+
module->segment_base = opal_shmem_segment_attach (&module->seg_ds);
574+
if (NULL == module->segment_base) {
575+
free(rbuf);
576+
goto error;
577+
}
578+
579+
/* wait for all processes to attach */
580+
ret = module->comm->c_coll->coll_barrier (module->comm, module->comm->c_coll->coll_barrier_module);
581+
if (OMPI_SUCCESS != ret) {
582+
free(rbuf);
583+
goto error;
584+
}
585+
586+
if (0 == ompi_comm_rank (module->comm)) {
587+
opal_shmem_unlink (&module->seg_ds);
588+
unlink_needed = false;
589+
}
590+
}
591+
592+
/* Although module->segment_base is pointing to a same physical address
593+
* for all the processes, its value which is a virtual address can be
594+
* different between different processes. To use direct load/store,
595+
* shmem_addrs can be used, however, for RDMA, virtual address of
596+
* remote process that will be stored in module->addrs should be used */
597+
module->sizes = malloc(sizeof(size_t) * comm_size);
598+
if (NULL == module->sizes) {
599+
free(rbuf);
600+
ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
601+
goto error;
602+
}
603+
module->shmem_addrs = malloc(sizeof(uint64_t) * comm_size);
604+
if (NULL == module->shmem_addrs) {
605+
free(module->sizes);
606+
free(rbuf);
607+
ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
608+
goto error;
609+
}
610+
611+
612+
for (i = 0, total = 0; i < comm_size ; ++i) {
613+
module->sizes[i] = rbuf[i];
614+
if (module->sizes[i] || !module->noncontig_shared_win) {
615+
module->shmem_addrs[i] = ((uint64_t) module->segment_base) + total;
616+
total += rbuf[i];
617+
} else {
618+
module->shmem_addrs[i] = (uint64_t)NULL;
619+
}
620+
}
621+
622+
free(rbuf);
623+
624+
module->size = module->sizes[ompi_comm_rank(module->comm)];
625+
*base = module->shmem_addrs[ompi_comm_rank(module->comm)];
626+
}
627+
628+
if (flavor == MPI_WIN_FLAVOR_ALLOCATE || flavor == MPI_WIN_FLAVOR_CREATE ||
629+
flavor == MPI_WIN_FLAVOR_SHARED) {
446630
switch (flavor) {
447631
case MPI_WIN_FLAVOR_ALLOCATE:
448632
mem_type = OPAL_COMMON_UCX_MEM_ALLOCATE_MAP;
449633
break;
450634
case MPI_WIN_FLAVOR_CREATE:
451635
mem_type = OPAL_COMMON_UCX_MEM_MAP;
452636
break;
637+
case MPI_WIN_FLAVOR_SHARED:
638+
mem_type = OPAL_COMMON_UCX_MEM_MAP;
639+
break;
453640
}
454-
455-
ret = opal_common_ucx_wpmem_create(module->ctx, base, size,
641+
ret = opal_common_ucx_wpmem_create(module->ctx, base, module->size,
456642
mem_type, &exchange_len_info,
457643
OPAL_COMMON_UCX_WPMEM_ADDR_EXCHANGE_FULL,
458644
(void *)module->comm,
@@ -483,7 +669,8 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
483669
goto error;
484670
}
485671

486-
if (flavor == MPI_WIN_FLAVOR_ALLOCATE || flavor == MPI_WIN_FLAVOR_CREATE) {
672+
if (flavor == MPI_WIN_FLAVOR_ALLOCATE || flavor == MPI_WIN_FLAVOR_CREATE ||
673+
flavor == MPI_WIN_FLAVOR_SHARED) {
487674
memcpy(my_info, base, sizeof(uint64_t));
488675
} else {
489676
memcpy(my_info, &zero, sizeof(uint64_t));
@@ -563,6 +750,9 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
563750
mca_osc_ucx_component.env_initialized = false;
564751
}
565752

753+
if (0 == ompi_comm_rank (module->comm) && unlink_needed) {
754+
opal_shmem_unlink (&module->seg_ds);
755+
}
566756
ompi_osc_ucx_unregister_progress();
567757
return ret;
568758
}
@@ -700,6 +890,15 @@ int ompi_osc_ucx_free(struct ompi_win_t *win) {
700890
return ret;
701891
}
702892

893+
if (module->flavor == MPI_WIN_FLAVOR_SHARED) {
894+
if (module->segment_base != NULL)
895+
opal_shmem_segment_detach(&module->seg_ds);
896+
if (module->shmem_addrs != NULL)
897+
free(module->shmem_addrs);
898+
if (module->sizes != NULL)
899+
free(module->sizes);
900+
}
901+
703902
/* MPI_Win_free should detach any memory attached to dynamic windows */
704903
for (i = 0; i < module->state.dynamic_win_count; i++) {
705904
assert(module->local_dynamic_win_info[i].refcnt == 1);

opal/mca/common/ucx/common_ucx_wpool.c

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include "opal/mca/pmix/pmix-internal.h"
99
#include "opal/memoryhooks/memory.h"
1010
#include "opal/util/proc.h"
11+
#include "opal/util/sys_limits.h"
1112

1213
#include <ucm/api/ucm.h>
1314

@@ -508,7 +509,10 @@ static int _comm_ucx_wpmem_map(opal_common_ucx_wpool_t *wpool, void **base, size
508509

509510
assert(mem_attrs.length >= size);
510511
if (mem_type != OPAL_COMMON_UCX_MEM_ALLOCATE_MAP) {
511-
assert(mem_attrs.address == (*base));
512+
/* Returned mapped address is aligned to ucs rcache->params.alignment.
513+
* Alignment is less than page size */
514+
assert(((mem_attrs.address <= (*base)) && ((*base) - opal_getpagesize()
515+
< mem_attrs.address)) || (size == 0 && mem_attrs.address == NULL));
512516
} else {
513517
(*base) = mem_attrs.address;
514518
}

0 commit comments

Comments
 (0)