From 402bc1651c1b08e85b05f962207bf3ac4696d306 Mon Sep 17 00:00:00 2001 From: Axel Schneewind Date: Wed, 8 Jan 2025 15:53:33 +0100 Subject: [PATCH 1/4] duplicate part_persist component Signed-off-by: Axel Schneewind --- ompi/mca/part/base/part_base_frame.c | 2 +- ompi/mca/part/persist_aggregated/Makefile.am | 53 ++ .../part_persist_aggregated.c | 543 ++++++++++++++++++ .../part_persist_aggregated.h | 94 +++ .../part_persist_aggregated_component.c | 166 ++++++ .../part_persist_aggregated_component.h | 35 ++ .../part_persist_aggregated_recvreq.c | 43 ++ .../part_persist_aggregated_recvreq.h | 105 ++++ .../part_persist_aggregated_request.c | 40 ++ .../part_persist_aggregated_request.h | 107 ++++ .../part_persist_aggregated_sendreq.c | 41 ++ .../part_persist_aggregated_sendreq.h | 96 ++++ .../part/persist_aggregated/post_configure.sh | 1 + 13 files changed, 1325 insertions(+), 1 deletion(-) create mode 100644 ompi/mca/part/persist_aggregated/Makefile.am create mode 100644 ompi/mca/part/persist_aggregated/part_persist_aggregated.c create mode 100644 ompi/mca/part/persist_aggregated/part_persist_aggregated.h create mode 100644 ompi/mca/part/persist_aggregated/part_persist_aggregated_component.c create mode 100644 ompi/mca/part/persist_aggregated/part_persist_aggregated_component.h create mode 100644 ompi/mca/part/persist_aggregated/part_persist_aggregated_recvreq.c create mode 100644 ompi/mca/part/persist_aggregated/part_persist_aggregated_recvreq.h create mode 100644 ompi/mca/part/persist_aggregated/part_persist_aggregated_request.c create mode 100644 ompi/mca/part/persist_aggregated/part_persist_aggregated_request.h create mode 100644 ompi/mca/part/persist_aggregated/part_persist_aggregated_sendreq.c create mode 100644 ompi/mca/part/persist_aggregated/part_persist_aggregated_sendreq.h create mode 100644 ompi/mca/part/persist_aggregated/post_configure.sh diff --git a/ompi/mca/part/base/part_base_frame.c b/ompi/mca/part/base/part_base_frame.c index f9da3548456..851435b9562 100644 --- a/ompi/mca/part/base/part_base_frame.c +++ b/ompi/mca/part/base/part_base_frame.c @@ -137,8 +137,8 @@ static int mca_part_base_open(mca_base_open_flag_t flags) mca_part_base_selected_component.partm_finalize = NULL; - /* Currently this uses a default with no selection criteria as there is only 1 module. */ opal_pointer_array_add(&mca_part_base_part, strdup("persist")); + opal_pointer_array_add(&mca_part_base_part, strdup("persist_aggregated")); return OMPI_SUCCESS; } diff --git a/ompi/mca/part/persist_aggregated/Makefile.am b/ompi/mca/part/persist_aggregated/Makefile.am new file mode 100644 index 00000000000..a4eef681f82 --- /dev/null +++ b/ompi/mca/part/persist_aggregated/Makefile.am @@ -0,0 +1,53 @@ +# +# Copyright (c) 2004-2006 The Regents of the University of California. +# All rights reserved. +# Copyright (c) 2009-2024 High Performance Computing Center Stuttgart, +# University of Stuttgart. All rights reserved. +# Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. +# Copyright (c) 2017 IBM Corporation. All rights reserved. +# Copyright (c) 2020-2021 Sandia National Laboratories. All rights reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +# Make the output library in this directory, and name it either +# mca__.la (for DSO builds) or libmca__.la +# (for static builds). + +EXTRA_DIST = post_configure.sh + +if MCA_BUILD_ompi_part_persist_aggregated_DSO +component_noinst = +component_install = mca_part_persist_aggregated.la +else +component_noinst = libmca_part_persist_aggregated.la +component_install = +endif + +local_sources = \ + part_persist_aggregated.c \ + part_persist_aggregated.h \ + part_persist_aggregated_component.c \ + part_persist_aggregated_component.h \ + part_persist_aggregated_recvreq.h \ + part_persist_aggregated_recvreq.c \ + part_persist_aggregated_request.h \ + part_persist_aggregated_request.c \ + part_persist_aggregated_sendreq.h \ + part_persist_aggregated_sendreq.c + +mcacomponentdir = $(ompilibdir) +mcacomponent_LTLIBRARIES = $(component_install) +mca_part_persist_aggregated_la_SOURCES = $(local_sources) +mca_part_persist_aggregated_la_LIBADD = $(top_builddir)/ompi/lib@OMPI_LIBMPI_NAME@.la \ + $(part_persist_aggregated_LIBS) +mca_part_persist_aggregated_la_LDFLAGS = -module -avoid-version $(part_persist_aggregated_LDFLAGS) + +noinst_LTLIBRARIES = $(component_noinst) +libmca_part_persist_aggregated_la_SOURCES = $(local_sources) +libmca_part_persist_aggregated_la_LIBADD = $(part_persist_aggregated_LIBS) +libmca_part_persist_aggregated_la_LDFLAGS = -module -avoid-version $(part_persist_aggregated_LDFLAGS) + diff --git a/ompi/mca/part/persist_aggregated/part_persist_aggregated.c b/ompi/mca/part/persist_aggregated/part_persist_aggregated.c new file mode 100644 index 00000000000..9c86907fae5 --- /dev/null +++ b/ompi/mca/part/persist_aggregated/part_persist_aggregated.c @@ -0,0 +1,543 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2006-2007 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2007 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2006 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2015 Los Alamos National Security, LLC. All rights + * reserved. + * Copyright (c) 2011-2021 Sandia National Laboratories. All rights reserved. + * Copyright (c) 2024 Research Organization for Information Science + * and Technology (RIST). All rights reserved. + * Copyright (c) 2024 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "ompi/communicator/communicator.h" +#include "ompi/mca/part/base/part_base_prequest.h" +#include "ompi/mca/part/base/base.h" + +#include "ompi/mca/part/persist_aggregated/part_persist_aggregated.h" +#include "ompi/mca/part/persist_aggregated/part_persist_aggregated_sendreq.h" +#include "ompi/mca/part/persist_aggregated/part_persist_aggregated_recvreq.h" + +static int mca_part_persist_aggregated_progress(void); +static int mca_part_persist_aggregated_precv_init(void *, size_t, size_t, ompi_datatype_t *, int, int, struct ompi_communicator_t *, struct ompi_info_t *, struct ompi_request_t **); +static int mca_part_persist_aggregated_psend_init(const void*, size_t, size_t, ompi_datatype_t*, int, int, ompi_communicator_t*, struct ompi_info_t *, ompi_request_t**); +static int mca_part_persist_aggregated_pready(size_t, size_t, ompi_request_t*); +static int mca_part_persist_aggregated_parrived(size_t, size_t, int*, ompi_request_t*); + +ompi_part_persist_aggregated_t ompi_part_persist_aggregated = { + .super = { + .part_progress = mca_part_persist_aggregated_progress, + .part_precv_init = mca_part_persist_aggregated_precv_init, + .part_psend_init = mca_part_persist_aggregated_psend_init, + .part_start = mca_part_persist_aggregated_start, + .part_pready = mca_part_persist_aggregated_pready, + .part_parrived = mca_part_persist_aggregated_parrived, + } +}; + +/** + * This is a helper function that frees a request. This requires ompi_part_persist_aggregated.lock be held before calling. + */ +__opal_attribute_always_inline__ static inline int +mca_part_persist_aggregated_free_req(struct mca_part_persist_aggregated_request_t* req) +{ + int err = OMPI_SUCCESS; + size_t i; + opal_list_remove_item(ompi_part_persist_aggregated.progress_list, (opal_list_item_t*)req->progress_elem); + OBJ_RELEASE(req->progress_elem); + + for(i = 0; i < req->real_parts; i++) { + ompi_request_free(&(req->persist_reqs[i])); + } + free(req->persist_reqs); + free(req->flags); + + if( MCA_PART_PERSIST_AGGREGATED_REQUEST_PRECV == req->req_type ) { + MCA_PART_PERSIST_AGGREGATED_PRECV_REQUEST_RETURN(req); + } else { + MCA_PART_PERSIST_AGGREGATED_PSEND_REQUEST_RETURN(req); + } + return err; +} + +static void +mca_part_persist_aggregated_complete(struct mca_part_persist_aggregated_request_t* request) +{ + if(MCA_PART_PERSIST_AGGREGATED_REQUEST_PRECV == request->req_type) { + request->req_ompi.req_status.MPI_SOURCE = request->req_peer; + } else { + request->req_ompi.req_status.MPI_SOURCE = request->req_comm->c_my_rank; + } + request->req_ompi.req_complete_cb = NULL; + request->req_ompi.req_status.MPI_TAG = request->req_tag; + request->req_ompi.req_status._ucount = request->req_bytes; + request->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS; + request->req_part_complete = true; + ompi_request_complete(&(request->req_ompi), true ); +} + +/** + * mca_part_persist_aggregated_progress is the progress function that will be registered. It handles + * both send and recv request testing and completion. It also handles freeing requests, + * after MPI_Free is called and the requests have become inactive. + */ +static int +mca_part_persist_aggregated_progress(void) +{ + mca_part_persist_aggregated_list_t *current; + int err; + size_t i; + + /* prevent re-entry, */ + int block_entry = opal_atomic_add_fetch_32(&(ompi_part_persist_aggregated.block_entry), 1); + if(1 < block_entry) + { + block_entry = opal_atomic_add_fetch_32(&(ompi_part_persist_aggregated.block_entry), -1); + return OMPI_SUCCESS; + } + + OPAL_THREAD_LOCK(&ompi_part_persist_aggregated.lock); + + mca_part_persist_aggregated_request_t* to_delete = NULL; + + /* Don't do anything till a function in the module is called. */ + if(-1 == ompi_part_persist_aggregated.init_world) + { + OPAL_THREAD_UNLOCK(&ompi_part_persist_aggregated.lock); + block_entry = opal_atomic_add_fetch_32(&(ompi_part_persist_aggregated.block_entry), -1); + return OMPI_SUCCESS; + } + + /* Can't do anything if we don't have world */ + if(0 == ompi_part_persist_aggregated.init_world) { + ompi_part_persist_aggregated.my_world_rank = ompi_comm_rank(&ompi_mpi_comm_world.comm); + err = ompi_comm_idup(&ompi_mpi_comm_world.comm, &ompi_part_persist_aggregated.part_comm, &ompi_part_persist_aggregated.part_comm_req); + if(err != OMPI_SUCCESS) { + exit(-1); + } + ompi_part_persist_aggregated.part_comm_ready = 0; + err = ompi_comm_idup(&ompi_mpi_comm_world.comm, &ompi_part_persist_aggregated.part_comm_setup, &ompi_part_persist_aggregated.part_comm_sreq); + if(err != OMPI_SUCCESS) { + exit(-1); + } + ompi_part_persist_aggregated.part_comm_sready = 0; + ompi_part_persist_aggregated.init_world = 1; + + OPAL_THREAD_UNLOCK(&ompi_part_persist_aggregated.lock); + block_entry = opal_atomic_add_fetch_32(&(ompi_part_persist_aggregated.block_entry), -1); + return OMPI_SUCCESS; + } + + /* Check to see if Comms are setup */ + if(0 == ompi_part_persist_aggregated.init_comms) { + if(0 == ompi_part_persist_aggregated.part_comm_ready) { + ompi_request_test(&ompi_part_persist_aggregated.part_comm_req, &ompi_part_persist_aggregated.part_comm_ready, MPI_STATUS_IGNORE); + } + if(0 == ompi_part_persist_aggregated.part_comm_sready) { + ompi_request_test(&ompi_part_persist_aggregated.part_comm_sreq, &ompi_part_persist_aggregated.part_comm_sready, MPI_STATUS_IGNORE); + } + if(0 != ompi_part_persist_aggregated.part_comm_ready && 0 != ompi_part_persist_aggregated.part_comm_sready) { + ompi_part_persist_aggregated.init_comms = 1; + } + OPAL_THREAD_UNLOCK(&ompi_part_persist_aggregated.lock); + block_entry = opal_atomic_add_fetch_32(&(ompi_part_persist_aggregated.block_entry), -1); + return OMPI_SUCCESS; + } + + OPAL_LIST_FOREACH(current, ompi_part_persist_aggregated.progress_list, mca_part_persist_aggregated_list_t) { + mca_part_persist_aggregated_request_t *req = (mca_part_persist_aggregated_request_t *) current->item; + + /* Check to see if request is initilaized */ + if(false == req->initialized) { + int done = 0; + + if(true == req->flag_post_setup_recv) { + err = MCA_PML_CALL(irecv(&(req->setup_info[1]), sizeof(struct ompi_mca_persist_setup_t), MPI_BYTE, OMPI_ANY_SOURCE, req->my_recv_tag, ompi_part_persist_aggregated.part_comm_setup, &req->setup_req[1])); + req->flag_post_setup_recv = false; + } + + ompi_request_test(&(req->setup_req[1]), &done, MPI_STATUS_IGNORE); + + if(done) { + size_t dt_size_; + int32_t dt_size; + + if(MCA_PART_PERSIST_AGGREGATED_REQUEST_PSEND == req->req_type) { + /* parse message */ + req->world_peer = req->setup_info[1].world_rank; + + err = opal_datatype_type_size(&(req->req_datatype->super), &dt_size_); + if(OMPI_SUCCESS != err) return OMPI_ERROR; + dt_size = (dt_size_ > (size_t) INT_MAX) ? MPI_UNDEFINED : (int) dt_size_; + int32_t bytes = req->real_count * dt_size; + + /* Set up persistent sends */ + req->persist_reqs = (ompi_request_t**) malloc(sizeof(ompi_request_t*)*(req->real_parts)); + for(i = 0; i < req->real_parts; i++) { + void *buf = ((void*) (((char*)req->req_addr) + (bytes * i))); + err = MCA_PML_CALL(isend_init(buf, req->real_count, req->req_datatype, req->world_peer, req->my_send_tag+i, MCA_PML_BASE_SEND_STANDARD, ompi_part_persist_aggregated.part_comm, &(req->persist_reqs[i]))); + } + } else { + /* parse message */ + req->world_peer = req->setup_info[1].world_rank; + req->my_send_tag = req->setup_info[1].start_tag; + req->my_recv_tag = req->setup_info[1].setup_tag; + req->real_parts = req->setup_info[1].num_parts; + req->real_count = req->setup_info[1].count; + + err = opal_datatype_type_size(&(req->req_datatype->super), &dt_size_); + if(OMPI_SUCCESS != err) return OMPI_ERROR; + dt_size = (dt_size_ > (size_t) INT_MAX) ? MPI_UNDEFINED : (int) dt_size_; + int32_t bytes = req->real_count * dt_size; + + /* Set up persistent sends */ + req->persist_reqs = (ompi_request_t**) malloc(sizeof(ompi_request_t*)*(req->real_parts)); + req->flags = (int*) calloc(req->real_parts,sizeof(int)); + for(i = 0; i < req->real_parts; i++) { + void *buf = ((void*) (((char*)req->req_addr) + (bytes * i))); + err = MCA_PML_CALL(irecv_init(buf, req->real_count, req->req_datatype, req->world_peer, req->my_send_tag+i, ompi_part_persist_aggregated.part_comm, &(req->persist_reqs[i]))); + } + err = req->persist_reqs[0]->req_start(req->real_parts, (&(req->persist_reqs[0]))); + + /* Send back a message */ + req->setup_info[0].world_rank = ompi_part_persist_aggregated.my_world_rank; + err = MCA_PML_CALL(isend(&(req->setup_info[0]), sizeof(struct ompi_mca_persist_setup_t), MPI_BYTE, req->world_peer, req->my_recv_tag, MCA_PML_BASE_SEND_STANDARD, ompi_part_persist_aggregated.part_comm_setup, &req->setup_req[0])); + if(OMPI_SUCCESS != err) return OMPI_ERROR; + } + + req->initialized = true; + } + } else { + if(false == req->req_part_complete && REQUEST_COMPLETED != req->req_ompi.req_complete && OMPI_REQUEST_ACTIVE == req->req_ompi.req_state) { + for(i = 0; i < req->real_parts; i++) { + + /* Check to see if partition is queued for being started. Only applicable to sends. */ + if(-2 == req->flags[i]) { + err = req->persist_reqs[i]->req_start(1, (&(req->persist_reqs[i]))); + req->flags[i] = 0; + } + + if(0 == req->flags[i]) + { + ompi_request_test(&(req->persist_reqs[i]), &(req->flags[i]), MPI_STATUS_IGNORE); + if(0 != req->flags[i]) req->done_count++; + } + } + + /* Check for completion and complete the requests */ + if(req->done_count == req->real_parts) + { + req->first_send = false; + mca_part_persist_aggregated_complete(req); + } + } + + if(true == req->req_free_called && true == req->req_part_complete && REQUEST_COMPLETED == req->req_ompi.req_complete && OMPI_REQUEST_INACTIVE == req->req_ompi.req_state) { + to_delete = req; + } + } + + } + OPAL_THREAD_UNLOCK(&ompi_part_persist_aggregated.lock); + block_entry = opal_atomic_add_fetch_32(&(ompi_part_persist_aggregated.block_entry), -1); + if(to_delete) { + err = mca_part_persist_aggregated_free_req(to_delete); + if (OMPI_SUCCESS != err) { + return OMPI_ERROR; + } + } + + return OMPI_SUCCESS; +} + +static int +mca_part_persist_aggregated_precv_init(void *buf, + size_t parts, + size_t count, + ompi_datatype_t * datatype, + int src, + int tag, + struct ompi_communicator_t *comm, + struct ompi_info_t * info, + struct ompi_request_t **request) +{ + int err = OMPI_SUCCESS; + size_t dt_size_; + int dt_size; + mca_part_persist_aggregated_list_t* new_progress_elem = NULL; + + mca_part_persist_aggregated_precv_request_t *recvreq; + + /* if module hasn't been called before, flag module to init. */ + if(-1 == ompi_part_persist_aggregated.init_world) + { + ompi_part_persist_aggregated.init_world = 0; + } + + /* Allocate a new request */ + MCA_PART_PERSIST_AGGREGATED_PRECV_REQUEST_ALLOC(recvreq); + if (OPAL_UNLIKELY(NULL == recvreq)) return OMPI_ERR_OUT_OF_RESOURCE; + + MCA_PART_PERSIST_AGGREGATED_PRECV_REQUEST_INIT(recvreq, ompi_proc, comm, tag, src, + datatype, buf, parts, count, flags); + + mca_part_persist_aggregated_request_t *req = (mca_part_persist_aggregated_request_t *) recvreq; + + /* Set lazy initializion flags */ + req->initialized = false; + req->first_send = true; + req->flag_post_setup_recv = false; + req->flags = NULL; + /* Non-blocking receive on setup info */ + err = MCA_PML_CALL(irecv(&req->setup_info[1], sizeof(struct ompi_mca_persist_setup_t), MPI_BYTE, src, tag, comm, &req->setup_req[1])); + if(OMPI_SUCCESS != err) return OMPI_ERROR; + + /* Compute total number of bytes */ + err = opal_datatype_type_size(&(req->req_datatype->super), &dt_size_); + if(OMPI_SUCCESS != err) return OMPI_ERROR; + dt_size = (dt_size_ > (size_t) INT_MAX) ? MPI_UNDEFINED : (int) dt_size_; + req->req_bytes = parts * count * dt_size; + + + /* Set ompi request initial values */ + req->req_ompi.req_persistent = true; + req->req_part_complete = true; + req->req_ompi.req_complete = REQUEST_COMPLETED; + req->req_ompi.req_state = OMPI_REQUEST_INACTIVE; + + /* Add element to progress engine */ + new_progress_elem = OBJ_NEW(mca_part_persist_aggregated_list_t); + new_progress_elem->item = req; + req->progress_elem = new_progress_elem; + OPAL_THREAD_LOCK(&ompi_part_persist_aggregated.lock); + opal_list_append(ompi_part_persist_aggregated.progress_list, (opal_list_item_t*)new_progress_elem); + OPAL_THREAD_UNLOCK(&ompi_part_persist_aggregated.lock); + + /* set return values */ + *request = (ompi_request_t*) recvreq; + return err; +} + +static int +mca_part_persist_aggregated_psend_init(const void* buf, + size_t parts, + size_t count, + ompi_datatype_t* datatype, + int dst, + int tag, + ompi_communicator_t* comm, + struct ompi_info_t * info, + ompi_request_t** request) +{ + int err = OMPI_SUCCESS; + size_t dt_size_; + int dt_size; + mca_part_persist_aggregated_list_t* new_progress_elem = NULL; + mca_part_persist_aggregated_psend_request_t *sendreq; + + /* if module hasn't been called before, flag module to init. */ + if(-1 == ompi_part_persist_aggregated.init_world) + { + ompi_part_persist_aggregated.init_world = 0; + } + + /* Create new request object */ + MCA_PART_PERSIST_AGGREGATED_PSEND_REQUEST_ALLOC(sendreq, comm, dst, ompi_proc); + if (OPAL_UNLIKELY(NULL == sendreq)) return OMPI_ERR_OUT_OF_RESOURCE; + MCA_PART_PERSIST_AGGREGATED_PSEND_REQUEST_INIT(sendreq, ompi_proc, comm, tag, dst, + datatype, buf, parts, count, flags); + mca_part_persist_aggregated_request_t *req = (mca_part_persist_aggregated_request_t *) sendreq; + + /* Set lazy initialization variables */ + req->initialized = false; + req->first_send = true; + + + /* Determine total bytes to send. */ + err = opal_datatype_type_size(&(req->req_datatype->super), &dt_size_); + if(OMPI_SUCCESS != err) return OMPI_ERROR; + dt_size = (dt_size_ > (size_t) INT_MAX) ? MPI_UNDEFINED : (int) dt_size_; + req->req_bytes = parts * count * dt_size; + + + + /* non-blocking send set-up data */ + req->setup_info[0].world_rank = ompi_comm_rank(&ompi_mpi_comm_world.comm); + req->setup_info[0].start_tag = ompi_part_persist_aggregated.next_send_tag; ompi_part_persist_aggregated.next_send_tag += parts; + req->my_send_tag = req->setup_info[0].start_tag; + req->setup_info[0].setup_tag = ompi_part_persist_aggregated.next_recv_tag; ompi_part_persist_aggregated.next_recv_tag++; + req->my_recv_tag = req->setup_info[0].setup_tag; + req->setup_info[0].num_parts = parts; + req->real_parts = parts; + req->setup_info[0].count = count; + req->real_count = count; + + + req->flags = (int*) calloc(req->real_parts, sizeof(int)); + + err = MCA_PML_CALL(isend(&(req->setup_info[0]), sizeof(struct ompi_mca_persist_setup_t), MPI_BYTE, dst, tag, MCA_PML_BASE_SEND_STANDARD, comm, &req->setup_req[0])); + if(OMPI_SUCCESS != err) return OMPI_ERROR; + + /* Non-blocking receive on setup info */ + if(1 == ompi_part_persist_aggregated.init_comms) { + err = MCA_PML_CALL(irecv(&(req->setup_info[1]), sizeof(struct ompi_mca_persist_setup_t), MPI_BYTE, MPI_ANY_SOURCE, req->my_recv_tag, ompi_part_persist_aggregated.part_comm_setup, &req->setup_req[1])); + if(OMPI_SUCCESS != err) return OMPI_ERROR; + req->flag_post_setup_recv = false; + } else { + req->flag_post_setup_recv = true; + } + + /* Initilaize completion variables */ + sendreq->req_base.req_ompi.req_persistent = true; + req->req_part_complete = true; + req->req_ompi.req_complete = REQUEST_COMPLETED; + req->req_ompi.req_state = OMPI_REQUEST_INACTIVE; + + /* add element to progress queue */ + new_progress_elem = OBJ_NEW(mca_part_persist_aggregated_list_t); + new_progress_elem->item = req; + req->progress_elem = new_progress_elem; + OPAL_THREAD_LOCK(&ompi_part_persist_aggregated.lock); + opal_list_append(ompi_part_persist_aggregated.progress_list, (opal_list_item_t*)new_progress_elem); + OPAL_THREAD_UNLOCK(&ompi_part_persist_aggregated.lock); + + /* Set return values */ + *request = (ompi_request_t*) sendreq; + return err; +} + +int +mca_part_persist_aggregated_start(size_t count, ompi_request_t** requests) +{ + int err = OMPI_SUCCESS; + size_t _count = count; + + for(size_t i = 0; i < _count && OMPI_SUCCESS == err; i++) { + mca_part_persist_aggregated_request_t *req = (mca_part_persist_aggregated_request_t *)(requests[i]); + /* First use is a special case, to support lazy initialization */ + if(false == req->first_send) + { + if(MCA_PART_PERSIST_AGGREGATED_REQUEST_PSEND == req->req_type) { + req->done_count = 0; + memset((void*)req->flags,0,sizeof(int32_t)*req->real_parts); + } else { + req->done_count = 0; + err = req->persist_reqs[0]->req_start(req->real_parts, req->persist_reqs); + memset((void*)req->flags,0,sizeof(int32_t)*req->real_parts); + } + } else { + if(MCA_PART_PERSIST_AGGREGATED_REQUEST_PSEND == req->req_type) { + req->done_count = 0; + for(size_t j = 0; j < req->real_parts && OMPI_SUCCESS == err; j++) { + req->flags[j] = -1; + } + } else { + req->done_count = 0; + } + } + req->req_ompi.req_state = OMPI_REQUEST_ACTIVE; + req->req_ompi.req_status.MPI_TAG = MPI_ANY_TAG; + req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS; + req->req_ompi.req_status._cancelled = 0; + req->req_part_complete = false; + req->req_ompi.req_complete = false; + OPAL_ATOMIC_SWAP_PTR(&req->req_ompi.req_complete, REQUEST_PENDING); + } + + return err; +} + +static int +mca_part_persist_aggregated_pready(size_t min_part, + size_t max_part, + ompi_request_t* request) +{ + int err = OMPI_SUCCESS; + size_t i; + + mca_part_persist_aggregated_request_t *req = (mca_part_persist_aggregated_request_t *)(request); + if(true == req->initialized) + { + err = req->persist_reqs[min_part]->req_start(max_part-min_part+1, (&(req->persist_reqs[min_part]))); + for(i = min_part; i <= max_part && OMPI_SUCCESS == err; i++) { + req->flags[i] = 0; /* Mark partition as ready for testing */ + } + } + else + { + for(i = min_part; i <= max_part && OMPI_SUCCESS == err; i++) { + req->flags[i] = -2; /* Mark partition as queued */ + } + } + return err; +} + +static int +mca_part_persist_aggregated_parrived(size_t min_part, + size_t max_part, + int* flag, + ompi_request_t* request) +{ + int err = OMPI_SUCCESS; + size_t i; + int _flag = false; + mca_part_persist_aggregated_request_t *req = (mca_part_persist_aggregated_request_t *)request; + + if(0 != req->flags) { + _flag = 1; + if(req->req_parts == req->real_parts) { + for(i = min_part; i <= max_part; i++) { + _flag = _flag && req->flags[i]; + } + } else { + float convert = ((float)req->real_parts) / ((float)req->req_parts); + size_t _min = floor(convert * min_part); + size_t _max = ceil(convert * max_part); + for(i = _min; i <= _max; i++) { + _flag = _flag && req->flags[i]; + } + } + } + + if(!_flag) { + opal_progress(); + } + *flag = _flag; + return err; +} + +/** + * mca_part_persist_aggregated_free marks an entry as free called and sets the request to + * MPI_REQUEST_NULL. Note: requests get freed in the progress engine. + */ +int +mca_part_persist_aggregated_free(ompi_request_t** request) +{ + mca_part_persist_aggregated_request_t* req = *(mca_part_persist_aggregated_request_t**)request; + + if(true == req->req_free_called) return OMPI_ERROR; + req->req_free_called = true; + + *request = MPI_REQUEST_NULL; + return OMPI_SUCCESS; +} + +OBJ_CLASS_INSTANCE(mca_part_persist_aggregated_list_t, + opal_list_item_t, + NULL, + NULL); + diff --git a/ompi/mca/part/persist_aggregated/part_persist_aggregated.h b/ompi/mca/part/persist_aggregated/part_persist_aggregated.h new file mode 100644 index 00000000000..7e731392fdf --- /dev/null +++ b/ompi/mca/part/persist_aggregated/part_persist_aggregated.h @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2004-2006 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2004-2007 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2015-2024 Research Organization for Information Science + * and Technology (RIST). All rights reserved. + * Copyright (c) 2017 Intel, Inc. All rights reserved + * Copyright (c) 2019-2021 The University of Tennessee at Chattanooga and The University + * of Tennessee Research Foundation. All rights reserved. + * Copyright (c) 2019-2021 Sandia National Laboratories. All rights reserved. + * Copyright (c) 2021 University of Alabama at Birmingham. All rights reserved. + * Copyright (c) 2021 Tennessee Technological University. All rights reserved. + * Copyright (c) 2021 Cisco Systems, Inc. All rights reserved + * Copyright (c) 2021 Bull S.A.S. All rights reserved. + * Copyright (c) 2024 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef PART_PERSIST_AGGREGATED_H +#define PART_PERSIST_AGGREGATED_H + +#ifdef HAVE_ALLOCA_H +#include +#endif + +#include + +#include "ompi_config.h" +#include "ompi/request/request.h" +#include "ompi/mca/part/part.h" +#include "ompi/mca/part/base/base.h" +#include "ompi/datatype/ompi_datatype.h" +#include "ompi/communicator/communicator.h" +#include "ompi/request/request.h" +#include "opal/sys/atomic.h" + +#include "ompi/mca/part/persist_aggregated/part_persist_aggregated_request.h" +#include "ompi/mca/part/base/part_base_precvreq.h" +#include "ompi/mca/part/persist_aggregated/part_persist_aggregated_recvreq.h" +#include "ompi/mca/part/persist_aggregated/part_persist_aggregated_sendreq.h" +#include "ompi/message/message.h" +#include "ompi/mca/pml/pml.h" +BEGIN_C_DECLS + +typedef struct mca_part_persist_aggregated_list_t { + opal_list_item_t super; + mca_part_persist_aggregated_request_t *item; +} mca_part_persist_aggregated_list_t; + +OPAL_DECLSPEC OBJ_CLASS_DECLARATION(mca_part_persist_aggregated_list_t); + + +struct ompi_part_persist_aggregated_t { + mca_part_base_module_t super; + int free_list_num; + int free_list_max; + int free_list_inc; + opal_list_t *progress_list; + + int32_t next_send_tag; /**< This is a counter for send tags for the actual data transfer. */ + int32_t next_recv_tag; + ompi_communicator_t *part_comm; /* This approach requires a separate tag space, so we need a dedicated communicator. */ + ompi_request_t *part_comm_req; + int32_t part_comm_ready; + ompi_communicator_t *part_comm_setup; /* We create a second communicator to send set-up messages (rational: these + messages go in the opposite direction of normal messages, need to use MPI_ANY_SOURCE + to support different communicators, and thus need to have a unique tag. Because tags + are controlled by the sender in this model, we cannot assume that the tag will be + unused in part_comm. */ + ompi_request_t *part_comm_sreq; + int32_t part_comm_sready; + int32_t init_comms; + int32_t init_world; + int32_t my_world_rank; /* Because the back end communicators use a world rank, we need to communicate ours + to set up the requests. */ + opal_atomic_int32_t block_entry; + opal_mutex_t lock; +}; +typedef struct ompi_part_persist_aggregated_t ompi_part_persist_aggregated_t; +extern ompi_part_persist_aggregated_t ompi_part_persist_aggregated; + +int mca_part_persist_aggregated_start(size_t, ompi_request_t**); +int mca_part_persist_aggregated_free(ompi_request_t**); + +END_C_DECLS + +#endif /* PART_PERSIST_AGGREGATED_H_HAS_BEEN_INCLUDED */ diff --git a/ompi/mca/part/persist_aggregated/part_persist_aggregated_component.c b/ompi/mca/part/persist_aggregated/part_persist_aggregated_component.c new file mode 100644 index 00000000000..c374642c33b --- /dev/null +++ b/ompi/mca/part/persist_aggregated/part_persist_aggregated_component.c @@ -0,0 +1,166 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2006-2007 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2007 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2006 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2010-2012 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2013-2021 Sandia National Laboratories. All rights reserved. + * Copyright (c) 2015 Los Alamos National Security, LLC. All rights + * reserved. + * Copyright (c) 2024 Research Organization for Information Science + * and Technology (RIST). All rights reserved. + * Copyright (c) 2024 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "ompi/mca/part/persist_aggregated/part_persist_aggregated.h" + +#include "ompi/mca/part/persist_aggregated/part_persist_aggregated_sendreq.h" +#include "ompi/mca/part/persist_aggregated/part_persist_aggregated_recvreq.h" +#include "ompi/mca/part/persist_aggregated/part_persist_aggregated_component.h" + +static int mca_part_persist_aggregated_component_register(void); +static int mca_part_persist_aggregated_component_open(void); +static int mca_part_persist_aggregated_component_close(void); +static mca_part_base_module_t* mca_part_persist_aggregated_component_init( int* priority, + bool enable_progress_threads, bool enable_mpi_threads); +static int mca_part_persist_aggregated_component_fini(void); + +mca_part_base_component_4_0_0_t mca_part_persist_aggregated_component = { + /* First, the mca_base_component_t struct containing meta + * information about the component itself */ + + .partm_version = { + MCA_PART_BASE_VERSION_2_0_0, + + .mca_component_name = "persist_aggregated", + MCA_BASE_MAKE_VERSION(component, OMPI_MAJOR_VERSION, OMPI_MINOR_VERSION, + OMPI_RELEASE_VERSION), + .mca_open_component = mca_part_persist_aggregated_component_open, + .mca_close_component = mca_part_persist_aggregated_component_close, + .mca_register_component_params = mca_part_persist_aggregated_component_register, + }, + .partm_data = { + /* This component is not checkpoint ready */ + MCA_BASE_METADATA_PARAM_NONE + }, + + .partm_init = mca_part_persist_aggregated_component_init, + .partm_finalize = mca_part_persist_aggregated_component_fini, +}; + +static int +mca_part_persist_aggregated_component_register(void) +{ + ompi_part_persist_aggregated.free_list_num = 4; + (void) mca_base_component_var_register(&mca_part_persist_aggregated_component.partm_version, "free_list_num", + "Initial size of request free lists", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, + &ompi_part_persist_aggregated.free_list_num); + + ompi_part_persist_aggregated.free_list_max = -1; + (void) mca_base_component_var_register(&mca_part_persist_aggregated_component.partm_version, "free_list_max", + "Maximum size of request free lists", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, + &ompi_part_persist_aggregated.free_list_max); + + ompi_part_persist_aggregated.free_list_inc = 64; + (void) mca_base_component_var_register(&mca_part_persist_aggregated_component.partm_version, "free_list_inc", + "Number of elements to add when growing request free lists", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, + &ompi_part_persist_aggregated.free_list_inc); + + + return OPAL_SUCCESS; +} + +static void mca_part_persist_aggregated_init_lists(void) +{ + opal_free_list_init (&mca_part_base_precv_requests, + sizeof(mca_part_persist_aggregated_precv_request_t), + opal_cache_line_size, + OBJ_CLASS(mca_part_persist_aggregated_precv_request_t), + 0,opal_cache_line_size, + ompi_part_persist_aggregated.free_list_num, + ompi_part_persist_aggregated.free_list_max, + ompi_part_persist_aggregated.free_list_inc, + NULL, 0, NULL, NULL, NULL); + opal_free_list_init (&mca_part_base_psend_requests, + sizeof(mca_part_persist_aggregated_psend_request_t), + opal_cache_line_size, + OBJ_CLASS(mca_part_persist_aggregated_psend_request_t), + 0,opal_cache_line_size, + ompi_part_persist_aggregated.free_list_num, + ompi_part_persist_aggregated.free_list_max, + ompi_part_persist_aggregated.free_list_inc, + NULL, 0, NULL, NULL, NULL); + ompi_part_persist_aggregated.progress_list = OBJ_NEW(opal_list_t); +} + +static int +mca_part_persist_aggregated_component_open(void) +{ + OBJ_CONSTRUCT(&ompi_part_persist_aggregated.lock, opal_mutex_t); + + ompi_part_persist_aggregated.next_send_tag = 0; /**< This is a counter for send tags for the actual data transfer. */ + ompi_part_persist_aggregated.next_recv_tag = 0; + + mca_part_persist_aggregated_init_lists(); + + ompi_part_persist_aggregated.init_comms = 0; + ompi_part_persist_aggregated.init_world = -1; + + ompi_part_persist_aggregated.part_comm_ready = 0; + ompi_part_persist_aggregated.part_comm_ready = 0; + + ompi_part_persist_aggregated.block_entry = 0; + return OMPI_SUCCESS; +} + + +static int +mca_part_persist_aggregated_component_close(void) +{ + OBJ_DESTRUCT(&ompi_part_persist_aggregated.lock); + return OMPI_SUCCESS; +} + + +static mca_part_base_module_t* +mca_part_persist_aggregated_component_init(int* priority, + bool enable_progress_threads, + bool enable_mpi_threads) +{ + *priority = 1; + + opal_output_verbose( 10, 0, + "in persist part priority is %d\n", *priority); + + return &ompi_part_persist_aggregated.super; +} + + +static int +mca_part_persist_aggregated_component_fini(void) +{ + return OMPI_SUCCESS; +} + diff --git a/ompi/mca/part/persist_aggregated/part_persist_aggregated_component.h b/ompi/mca/part/persist_aggregated/part_persist_aggregated_component.h new file mode 100644 index 00000000000..1688b08bfff --- /dev/null +++ b/ompi/mca/part/persist_aggregated/part_persist_aggregated_component.h @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2006 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2020-2021 Sandia National Laboratories. All rights reserved. + * Copyright (c) 2024 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +/** + * @file + */ + +#ifndef MCA_PART_RMA_COMPONENT_H +#define MCA_PART_RMA_COMPONENT_H + +BEGIN_C_DECLS + +/* + * PART module functions. + */ +OMPI_DECLSPEC extern mca_part_base_component_4_0_0_t mca_part_persist_aggregated_component; + +END_C_DECLS + +#endif diff --git a/ompi/mca/part/persist_aggregated/part_persist_aggregated_recvreq.c b/ompi/mca/part/persist_aggregated/part_persist_aggregated_recvreq.c new file mode 100644 index 00000000000..641a2afd4bd --- /dev/null +++ b/ompi/mca/part/persist_aggregated/part_persist_aggregated_recvreq.c @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2007 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2006 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2020-2021 Sandia National Laboratories. All rights reserved. + * Copyright (c) 2024 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "ompi/mca/part/persist_aggregated/part_persist_aggregated.h" +#include "ompi/mca/part/persist_aggregated/part_persist_aggregated_recvreq.h" + + +static void +mca_part_persist_aggregated_precv_request_construct(mca_part_persist_aggregated_precv_request_t* recvreq) +{ + recvreq->req_base.req_ompi.req_start = mca_part_persist_aggregated_start; + recvreq->req_base.req_ompi.req_free = mca_part_persist_aggregated_free; + recvreq->req_base.req_ompi.req_cancel = NULL; + recvreq->req_base.req_ompi.req_persistent = true; + OBJ_CONSTRUCT( &(recvreq->req_base.req_convertor), opal_convertor_t ); +} + + +OBJ_CLASS_INSTANCE(mca_part_persist_aggregated_precv_request_t, + mca_part_persist_aggregated_request_t, + mca_part_persist_aggregated_precv_request_construct, + NULL); + diff --git a/ompi/mca/part/persist_aggregated/part_persist_aggregated_recvreq.h b/ompi/mca/part/persist_aggregated/part_persist_aggregated_recvreq.h new file mode 100644 index 00000000000..c252cd629fc --- /dev/null +++ b/ompi/mca/part/persist_aggregated/part_persist_aggregated_recvreq.h @@ -0,0 +1,105 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2013 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2006 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2012-2021 Sandia National Laboratories. All rights reserved. + * Copyright (c) 2015 Los Alamos National Security, LLC. All rights + * reserved. + * Copyright (c) 2017 Intel, Inc. All rights reserved + * Copyright (c) 2024 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef PART_PERSIST_AGGREGATED_RECVREQ_H +#define PART_PERSIST_AGGREGATED_RECVREQ_H + +#include "ompi/mca/part/persist_aggregated/part_persist_aggregated_request.h" +#include "ompi/mca/part/base/part_base_precvreq.h" + +struct mca_part_persist_aggregated_precv_request_t { + mca_part_persist_aggregated_request_t req_base; +}; +typedef struct mca_part_persist_aggregated_precv_request_t mca_part_persist_aggregated_precv_request_t; +OBJ_CLASS_DECLARATION(mca_part_persist_aggregated_precv_request_t); + +/** + * Allocate a recv request from the modules free list. + * + * @param rc (OUT) OMPI_SUCCESS or error status on failure. + * @return Receive request. + */ +#define MCA_PART_PERSIST_AGGREGATED_PRECV_REQUEST_ALLOC(precvreq) \ +do { \ + precvreq = (mca_part_persist_aggregated_precv_request_t*) \ + opal_free_list_get (&mca_part_base_precv_requests); \ + precvreq->req_base.req_type = MCA_PART_PERSIST_AGGREGATED_REQUEST_PRECV; \ + } while (0) + +/** + * Initialize a receive request with call parameters. + * + * @param request (IN) Receive request. + * @param addr (IN) User buffer. + * @param count (IN) Number of elements of indicated datatype. + * @param datatype (IN) User defined datatype. + * @param src (IN) Source rank w/in the communicator. + * @param comm (IN) Communicator. + * @param persistent (IN) Is this a ersistent request. + */ +#define MCA_PART_PERSIST_AGGREGATED_PRECV_REQUEST_INIT( request, \ + ompi_proc, \ + comm, \ + tag, \ + src, \ + datatype, \ + addr, \ + parts, \ + count, \ + flags ) \ +do { \ + OBJ_RETAIN(comm); \ + OMPI_DATATYPE_RETAIN(datatype); \ + (request)->req_base.req_comm = comm; \ + (request)->req_base.req_datatype = datatype; \ + (request)->req_base.req_ompi.req_mpi_object.comm = comm; \ + (request)->req_base.req_ompi.req_status.MPI_SOURCE = src; \ + (request)->req_base.req_ompi.req_status.MPI_TAG = tag; \ + (request)->req_base.req_part_complete = true; \ + (request)->req_base.req_ompi.req_status._ucount = count; \ + (request)->req_base.req_free_called = false; \ + (request)->req_base.req_addr = addr; /**< pointer to application buffer */\ + (request)->req_base.req_parts = parts; /**< number of partitions */\ + (request)->req_base.req_count = count; /**< count of user datatype elements */\ + (request)->req_base.req_peer = src; /**< peer process - rank w/in this communicator */\ + (request)->req_base.req_tag = tag; \ +} while(0) + +/** + * Free the PART receive request + */ +#define MCA_PART_PERSIST_AGGREGATED_PRECV_REQUEST_RETURN(recvreq) \ +{ \ + OBJ_RELEASE((recvreq)->req_comm); \ + OMPI_DATATYPE_RELEASE((recvreq)->req_datatype); \ + OMPI_REQUEST_FINI(&(recvreq)->req_ompi); \ + opal_convertor_cleanup( &((recvreq)->req_convertor) ); \ + opal_free_list_return ( &mca_part_base_precv_requests, \ + (opal_free_list_item_t*)(recvreq)); \ +} + +#endif + + diff --git a/ompi/mca/part/persist_aggregated/part_persist_aggregated_request.c b/ompi/mca/part/persist_aggregated/part_persist_aggregated_request.c new file mode 100644 index 00000000000..3e5b08cb01b --- /dev/null +++ b/ompi/mca/part/persist_aggregated/part_persist_aggregated_request.c @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2006 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2006 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2020-2021 Sandia National Laboratories. All rights reserved. + * Copyright (c) 2024 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "ompi/mca/part/persist_aggregated/part_persist_aggregated.h" +#include "ompi/mca/part/persist_aggregated/part_persist_aggregated_request.h" + + +static void mca_part_persist_aggregated_request_construct( mca_part_persist_aggregated_request_t* req) { + OBJ_CONSTRUCT(&req->req_convertor, opal_convertor_t); + req->req_ompi.req_type = OMPI_REQUEST_PART; +} + +static void mca_part_persist_aggregated_request_destruct( mca_part_persist_aggregated_request_t* req) { + OBJ_DESTRUCT(&req->req_convertor); +} + +OBJ_CLASS_INSTANCE(mca_part_persist_aggregated_request_t, + ompi_request_t, + mca_part_persist_aggregated_request_construct, + mca_part_persist_aggregated_request_destruct); diff --git a/ompi/mca/part/persist_aggregated/part_persist_aggregated_request.h b/ompi/mca/part/persist_aggregated/part_persist_aggregated_request.h new file mode 100644 index 00000000000..1c94eb27e93 --- /dev/null +++ b/ompi/mca/part/persist_aggregated/part_persist_aggregated_request.h @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2016 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2006 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2017 Intel, Inc. All rights reserved + * Copyright (c) 2020-2021 Sandia National Laboratories. All rights reserved. + * Copyright (c) 2024 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef PART_PERSIST_AGGREGATED_REQUEST_H +#define PART_PERSIST_AGGREGATED_REQUEST_H + +#include "ompi/mca/part/base/part_base_psendreq.h" +#include "ompi/mca/part/part.h" +#include "opal/sys/atomic.h" +/** + * Type of request. + */ +typedef enum { + MCA_PART_PERSIST_AGGREGATED_REQUEST_PSEND, + MCA_PART_PERSIST_AGGREGATED_REQUEST_PRECV, + MCA_PART_PERSIST_AGGREGATED_REQUEST_NULL +} mca_part_persist_aggregated_request_type_t; + +struct mca_part_persist_aggregated_list_t; + +struct ompi_mca_persist_setup_t { + int world_rank; + int start_tag; + int setup_tag; + size_t num_parts; + size_t dt_size; + size_t count; +}; + + +/** + * Base type for PART PERSIST requests + */ +struct mca_part_persist_aggregated_request_t { + +/* START: These fields have to match the definition of the mca_part_persist_aggregated_request_t */ + ompi_request_t req_ompi; /**< base request */ + volatile int32_t req_part_complete; /**< flag indicating if the pt-2-pt layer is done with this request */ + volatile int32_t req_free_called; /**< flag indicating if the user has freed this request */ + mca_part_persist_aggregated_request_type_t req_type; /**< MPI request type - used for test */ + struct ompi_communicator_t *req_comm; /**< communicator pointer */ + struct ompi_datatype_t *req_datatype; /**< pointer to data type */ + opal_convertor_t req_convertor; /**< always need the convertor */ + + const void *req_addr; /**< pointer to application buffer */ + size_t req_parts; /**< number of partitions */ + size_t req_count; /**< count of user datatype elements */ + int32_t req_peer; /**< peer process - rank w/in this communicator */ + int32_t req_tag; /**< user defined tag */ + struct ompi_proc_t* req_proc; /**< peer process */ + +/* END: These fields have to match the definition of the mca_part_persist_aggregated_request_t */ + + size_t req_bytes; /**< bytes for completion status */ + + size_t real_parts; /**< internal number of partitions */ + size_t real_count; + size_t real_dt_size; /**< receiver needs to know how large the sender's datatype is. */ + size_t part_size; + + ompi_request_t** persist_reqs; /**< requests for persistent sends/recvs */ + ompi_request_t* setup_req [2]; /**< Request structure for setup messages */ + + + int32_t req_partitions_send; /**< Send side number of partitions */ + int32_t req_partitions_recv; /**< Recv side number of partitions */ + + int32_t my_send_tag; /**< This is a counter for send tags for the actual data transfer. */ + int32_t my_recv_tag; /**< This is a counter for receive tags, for incoming setup messages. */ + + int32_t world_peer; /**< peer's rank in MPI_COMM_WORLD */ + + int32_t initialized; /**< flag for initialized state */ + int32_t first_send; /**< flag for whether the first send has happened */ + int32_t flag_post_setup_recv; + size_t done_count; /**< counter for the number of partitions marked ready */ + + int32_t *flags; /**< array of flags to determine whether a partition has arrived */ + + struct ompi_mca_persist_setup_t setup_info[2]; /**< Setup info to send during initialization. */ + + struct mca_part_persist_aggregated_list_t* progress_elem; /**< pointer to progress list element for removal during free. */ + +}; +typedef struct mca_part_persist_aggregated_request_t mca_part_persist_aggregated_request_t; +OBJ_CLASS_DECLARATION(mca_part_persist_aggregated_request_t); + +#endif diff --git a/ompi/mca/part/persist_aggregated/part_persist_aggregated_sendreq.c b/ompi/mca/part/persist_aggregated/part_persist_aggregated_sendreq.c new file mode 100644 index 00000000000..3c0b9e1a132 --- /dev/null +++ b/ompi/mca/part/persist_aggregated/part_persist_aggregated_sendreq.c @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2007 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2006 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2020-2021 Sandia National Laboratories. All rights reserved. + * Copyright (c) 2024 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "ompi/mca/part/persist_aggregated/part_persist_aggregated.h" +#include "ompi/mca/part/persist_aggregated/part_persist_aggregated_sendreq.h" + + +static void mca_part_persist_aggregated_psend_request_construct(mca_part_persist_aggregated_psend_request_t* sendreq) +{ + /* no need to reinit for every send -- never changes */ + sendreq->req_base.req_ompi.req_start = mca_part_persist_aggregated_start; + sendreq->req_base.req_ompi.req_free = mca_part_persist_aggregated_free; + sendreq->req_base.req_ompi.req_persistent = true; + sendreq->req_base.req_ompi.req_cancel = NULL; +} + +OBJ_CLASS_INSTANCE(mca_part_persist_aggregated_psend_request_t, + mca_part_persist_aggregated_request_t, + mca_part_persist_aggregated_psend_request_construct, + NULL); + diff --git a/ompi/mca/part/persist_aggregated/part_persist_aggregated_sendreq.h b/ompi/mca/part/persist_aggregated/part_persist_aggregated_sendreq.h new file mode 100644 index 00000000000..c5a4fbbe232 --- /dev/null +++ b/ompi/mca/part/persist_aggregated/part_persist_aggregated_sendreq.h @@ -0,0 +1,96 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2013 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2006 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2015-2017 Los Alamos National Security, LLC. All rights + * reserved. + * Copyright (c) 2015 Research Organization for Information Science + * and Technology (RIST). All rights reserved. + * Copyright (c) 2017 Intel, Inc. All rights reserved + * Copyright (c) 2020-2021 Sandia National Laboratories. All rights reserved. + * Copyright (c) 2024 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef PART_PERSIST_AGGREGATED_PSENDREQ_H +#define PART_PERSIST_AGGREGATED_PSENDREQ_H + +#include "ompi/mca/part/persist_aggregated/part_persist_aggregated_request.h" +#include "ompi/mca/part/base/part_base_psendreq.h" +#include "ompi/mca/part/part.h" +#include "opal/prefetch.h" + +struct mca_part_persist_aggregated_psend_request_t { + mca_part_persist_aggregated_request_t req_base; +}; +typedef struct mca_part_persist_aggregated_psend_request_t mca_part_persist_aggregated_psend_request_t; +OBJ_CLASS_DECLARATION(mca_part_persist_aggregated_psend_request_t); + + +#define MCA_PART_PERSIST_AGGREGATED_PSEND_REQUEST_ALLOC(sendreq, comm, dst, \ + ompi_proc) \ +do { \ + sendreq = (mca_part_persist_aggregated_psend_request_t*) \ + opal_free_list_wait (&mca_part_base_psend_requests); \ + sendreq->req_base.req_type = MCA_PART_PERSIST_AGGREGATED_REQUEST_PSEND; \ +} while(0) + +#define MCA_PART_PERSIST_AGGREGATED_PSEND_REQUEST_INIT( req_send, \ + ompi_proc, \ + comm, \ + tag, \ + dst, \ + datatype, \ + buf, \ + parts, \ + count, \ + flags ) \ + do { \ + OMPI_REQUEST_INIT(&(sendreq->req_base.req_ompi), \ + false); \ + OBJ_RETAIN(comm); \ + OMPI_DATATYPE_RETAIN(datatype); \ + (req_send)->req_base.req_comm = comm; \ + (req_send)->req_base.req_datatype = datatype; \ + (req_send)->req_base.req_ompi.req_mpi_object.comm = comm; \ + (req_send)->req_base.req_ompi.req_status.MPI_SOURCE = \ + comm->c_my_rank; \ + (req_send)->req_base.req_ompi.req_status.MPI_TAG = tag; \ + (req_send)->req_base.req_part_complete = true; \ + (req_send)->req_base.req_ompi.req_status._ucount = count; \ + (req_send)->req_base.req_free_called = false; \ + (req_send)->req_base.req_addr = buf; /**< pointer to application buffer */\ + (req_send)->req_base.req_parts = parts; /**< number of partitions */\ + (req_send)->req_base.req_count = count; /**< count of user datatype elements */\ + (req_send)->req_base.req_peer = dst; /**< peer process - rank w/in this communicator */\ + (req_send)->req_base.req_tag = tag; /**< user defined tag */\ + } while(0) + +/* + * Release resources associated with a request + */ +#define MCA_PART_PERSIST_AGGREGATED_PSEND_REQUEST_RETURN(sendreq) \ + { \ + /* Let the base handle the reference counts */ \ + OMPI_DATATYPE_RELEASE(sendreq->req_datatype); \ + OBJ_RELEASE(sendreq->req_comm); \ + OMPI_REQUEST_FINI(&sendreq->req_ompi); \ + opal_convertor_cleanup( &(sendreq->req_convertor) ); \ + opal_free_list_return ( &mca_part_base_psend_requests, \ + (opal_free_list_item_t*)sendreq); \ + } + +#endif diff --git a/ompi/mca/part/persist_aggregated/post_configure.sh b/ompi/mca/part/persist_aggregated/post_configure.sh new file mode 100644 index 00000000000..d06ec572027 --- /dev/null +++ b/ompi/mca/part/persist_aggregated/post_configure.sh @@ -0,0 +1 @@ +DIRECT_CALL_HEADER="ompi/mca/part/rma/part_rma.h" From 4f76b101e6fd0e80019f1944d44c3acc73ffe2fe Mon Sep 17 00:00:00 2001 From: Axel Schneewind Date: Fri, 10 Jan 2025 10:09:27 +0100 Subject: [PATCH 2/4] add mca parameters to limit partition size and count Signed-off-by: Axel Schneewind --- .../part_persist_aggregated.h | 4 ++++ .../part_persist_aggregated_component.c | 18 ++++++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/ompi/mca/part/persist_aggregated/part_persist_aggregated.h b/ompi/mca/part/persist_aggregated/part_persist_aggregated.h index 7e731392fdf..812812c9038 100644 --- a/ompi/mca/part/persist_aggregated/part_persist_aggregated.h +++ b/ompi/mca/part/persist_aggregated/part_persist_aggregated.h @@ -80,6 +80,10 @@ struct ompi_part_persist_aggregated_t { int32_t init_world; int32_t my_world_rank; /* Because the back end communicators use a world rank, we need to communicate ours to set up the requests. */ + + uint32_t min_message_size; /* parameters to control internal partitioning */ + uint32_t max_message_count; + opal_atomic_int32_t block_entry; opal_mutex_t lock; }; diff --git a/ompi/mca/part/persist_aggregated/part_persist_aggregated_component.c b/ompi/mca/part/persist_aggregated/part_persist_aggregated_component.c index c374642c33b..11aa049a72c 100644 --- a/ompi/mca/part/persist_aggregated/part_persist_aggregated_component.c +++ b/ompi/mca/part/persist_aggregated/part_persist_aggregated_component.c @@ -88,6 +88,24 @@ mca_part_persist_aggregated_component_register(void) MCA_BASE_VAR_SCOPE_READONLY, &ompi_part_persist_aggregated.free_list_inc); + // variable for minimal internal partition size + ompi_part_persist_aggregated.min_message_size = 4096; + (void) mca_base_component_var_register(&mca_part_persist_aggregated_component.partm_version, "min_message_size", + "Minimal size of transferred messages (internal partitions)", + MCA_BASE_VAR_TYPE_UNSIGNED_INT, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, + &ompi_part_persist_aggregated.min_message_size); + + // variable for maximal internal partition count + ompi_part_persist_aggregated.max_message_count = 4096; + (void) mca_base_component_var_register(&mca_part_persist_aggregated_component.partm_version, "max_message_count", + "Maximal number of transferred messages (internal partitions)", + MCA_BASE_VAR_TYPE_UNSIGNED_INT, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, + &ompi_part_persist_aggregated.max_message_count); + return OPAL_SUCCESS; } From 3a9fb9ca93f76d374fe2d7b61c78b1290601fdb3 Mon Sep 17 00:00:00 2001 From: Axel Schneewind Date: Fri, 10 Jan 2025 11:54:11 +0100 Subject: [PATCH 3/4] implement a simple aggregation scheme This aggregation scheme is intended to allow OpenMPI to transfer larger messages if the user-reported partitions are too small or too high in number. This is achieved by using an internal partitioning where each internal (transfer) partition corresponds to one or multiple user-reported partitions. The scheme provides a method to mark a user partition as ready, that optionally returns a transfer partition that is ready. This is implemented by associating each transfer partition with an atomic counter, tracking the number of corresponding pready-calls already made. If a counter reaches the number of corresponding user-partitions, the corresponding transfer partition is returned. This implementation is thread-safe. Signed-off-by: Axel Schneewind --- ompi/mca/part/persist_aggregated/Makefile.am | 4 +- .../part_persist_aggregated_scheme_regular.c | 82 ++++++++++++++++ .../part_persist_aggregated_scheme_regular.h | 95 +++++++++++++++++++ 3 files changed, 180 insertions(+), 1 deletion(-) create mode 100644 ompi/mca/part/persist_aggregated/schemes/part_persist_aggregated_scheme_regular.c create mode 100644 ompi/mca/part/persist_aggregated/schemes/part_persist_aggregated_scheme_regular.h diff --git a/ompi/mca/part/persist_aggregated/Makefile.am b/ompi/mca/part/persist_aggregated/Makefile.am index a4eef681f82..4146e0c6f96 100644 --- a/ompi/mca/part/persist_aggregated/Makefile.am +++ b/ompi/mca/part/persist_aggregated/Makefile.am @@ -37,7 +37,9 @@ local_sources = \ part_persist_aggregated_request.h \ part_persist_aggregated_request.c \ part_persist_aggregated_sendreq.h \ - part_persist_aggregated_sendreq.c + part_persist_aggregated_sendreq.c \ + schemes/part_persist_aggregated_scheme_regular.h \ + schemes/part_persist_aggregated_scheme_regular.c mcacomponentdir = $(ompilibdir) mcacomponent_LTLIBRARIES = $(component_install) diff --git a/ompi/mca/part/persist_aggregated/schemes/part_persist_aggregated_scheme_regular.c b/ompi/mca/part/persist_aggregated/schemes/part_persist_aggregated_scheme_regular.c new file mode 100644 index 00000000000..0a1f084e116 --- /dev/null +++ b/ompi/mca/part/persist_aggregated/schemes/part_persist_aggregated_scheme_regular.c @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2024 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + * + */ + +#include "part_persist_aggregated_scheme_regular.h" + +#include +#include + +// converts the index of a public partition to the index of its corresponding internal partition +static int internal_partition(struct part_persist_aggregation_state *state, int public_part) +{ + return public_part / state->aggregation_count; +} + +void part_persist_aggregate_regular_init(struct part_persist_aggregation_state *state, + int internal_partition_count, int factor, + int last_internal_partition_size) +{ + state->public_partition_count = (internal_partition_count - 1) * factor + + last_internal_partition_size; + state->internal_partition_count = internal_partition_count; + + // number of user-partitions per internal partition (except for the last one) + state->aggregation_count = factor; + // number of user-partitions corresponding to the last internal partition + state->last_internal_partition_size = last_internal_partition_size; + + // initialize counters + state->public_parts_ready = (opal_atomic_int32_t *) calloc(state->internal_partition_count, + sizeof(opal_atomic_uint32_t)); +} + +void part_persist_aggregate_regular_reset(struct part_persist_aggregation_state *state) +{ + // reset flags + if (NULL != state->public_parts_ready) { + memset((void *) state->public_parts_ready, 0, + state->internal_partition_count * sizeof(opal_atomic_uint32_t)); + } +} + +static inline int is_last_partition(struct part_persist_aggregation_state *state, int partition) +{ + return (partition == state->internal_partition_count - 1); +} + +static inline int num_public_parts(struct part_persist_aggregation_state *state, int partition) +{ + return is_last_partition(state, partition) ? state->last_internal_partition_size + : state->aggregation_count; +} + +void part_persist_aggregate_regular_pready(struct part_persist_aggregation_state *state, + int partition, int *available_partition) +{ + int internal_part = internal_partition(state, partition); + + // this is the new value (after adding) + int count = opal_atomic_add_fetch_32(&state->public_parts_ready[internal_part], 1); + + // push to buffer if internal partition is ready + if (count == num_public_parts(state, internal_part)) { + *available_partition = internal_part; + } else { + *available_partition = -1; + } +} + +void part_persist_aggregate_regular_free(struct part_persist_aggregation_state *state) +{ + if (state->public_parts_ready != NULL) + free((void *) state->public_parts_ready); + state->public_parts_ready = NULL; +} \ No newline at end of file diff --git a/ompi/mca/part/persist_aggregated/schemes/part_persist_aggregated_scheme_regular.h b/ompi/mca/part/persist_aggregated/schemes/part_persist_aggregated_scheme_regular.h new file mode 100644 index 00000000000..d3e8279a1ed --- /dev/null +++ b/ompi/mca/part/persist_aggregated/schemes/part_persist_aggregated_scheme_regular.h @@ -0,0 +1,95 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2024 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + * + */ + +/** + * @file + * This file defines a simple message aggregation scheme: + * A user-provided partitioning into n partitions is mapped + * to an internal partitioning of ceil(n/k) partitions where + * each internal partition corresponds to k public ones + * (with the last partition potentially having a lower size). + * The factor k can be defined to optimize the internal + * number/size of internal partitions. + */ + +#ifndef PART_PERSIST_AGGREGATED_SCHEME_REGULAR_H +#define PART_PERSIST_AGGREGATED_SCHEME_REGULAR_H + +#include "ompi_config.h" + +#include "opal/include/opal/sys/atomic.h" + +/** + * @brief tracks the number of pready calls corresponding to internal partitions + * + */ +struct part_persist_aggregation_state { + // counters for each internal partition + opal_atomic_int32_t *public_parts_ready; + + // number of public partitions + int public_partition_count; + + // number of internal partitions + int internal_partition_count; + + // how many public partitions are aggregated into an internal one + int aggregation_count; + // number of public partitions corresponding to last internal partition + int last_internal_partition_size; +}; + +/** + * @brief initializes the aggregation state + * + * @param[out] state pointer to aggregation state object + * @param[in] internal_partition_count number of internal partitions (i.e. number of messages + * per partitioned transfer) + * @param[in] factor number of public partitions corresponding to each + * internal one other than the last + * @param[in] last_internal_partition_size number of public partitions corresponding to the last + * internal partition internal partition + */ +OMPI_DECLSPEC void part_persist_aggregate_regular_init(struct part_persist_aggregation_state *state, + int internal_partition_count, int factor, + int last_internal_partition_size); + +/** + * @brief resets the aggregation state + * + * @param[out] state pointer to aggregation state object + */ +OMPI_DECLSPEC void +part_persist_aggregate_regular_reset(struct part_persist_aggregation_state *state); + +/** + * @brief marks a public partition as ready and optionally outputs an internal partition that can be + * sent. + * + * @param[in,out] state pointer to aggregation state object + * @param[in] partition index of the public partition to mark ready + * @param[out] available_partition index of the corresponding internal partition if it is ready, + * otherwise -1 + */ +OMPI_DECLSPEC void +part_persist_aggregate_regular_pready(struct part_persist_aggregation_state *state, int partition, + int *available_partition); + +/** + * @brief destroys the aggregation scheme + * + * @param[in,out] state pointer to aggregation state object + */ +OMPI_DECLSPEC void +part_persist_aggregate_regular_free(struct part_persist_aggregation_state *state); + +#endif \ No newline at end of file From 84882d13f7cfe042d080a8ab165c913615753286 Mon Sep 17 00:00:00 2001 From: Axel Schneewind Date: Fri, 10 Jan 2025 13:33:29 +0100 Subject: [PATCH 4/4] use aggregation scheme in part_persist_aggregated component A few changes are made to the component to allow for using a different partitioning internally than reported by the user: During initialization, an internal partitioning is computed from the user-provided one, taking into account the limits imposed by the mca-parameters on partition size and count. This internal partitioning can be equal to or less fine-grained than the one provided by the user. The sender side request objects now need to hold state for message aggregation, therefore an aggregation_state field is added. This is not required for receive side requests, as the receiver side code already supports differing send-side partitionings. As transfer partition sizes might not divide user-partition sizes, the last transfer partition can correspond to fewer user-partitions than the other ones. For this reason, a field (real_remainder) is introduced into request objects that holds the number of elements corresponding to the last transfer partition. This field is also added to the setup information exchanged between sender and receiver during initialization. To account for this potentially smaller last transfer partition, the initialization of the internal persistent requests is adjusted. Signed-off-by: Axel Schneewind --- .../part_persist_aggregated.c | 145 +++++++++++++++--- .../part_persist_aggregated_request.h | 6 +- .../part_persist_aggregated_sendreq.h | 5 + 3 files changed, 136 insertions(+), 20 deletions(-) diff --git a/ompi/mca/part/persist_aggregated/part_persist_aggregated.c b/ompi/mca/part/persist_aggregated/part_persist_aggregated.c index 9c86907fae5..a99bd0f219b 100644 --- a/ompi/mca/part/persist_aggregated/part_persist_aggregated.c +++ b/ompi/mca/part/persist_aggregated/part_persist_aggregated.c @@ -32,6 +32,8 @@ #include "ompi/mca/part/persist_aggregated/part_persist_aggregated_sendreq.h" #include "ompi/mca/part/persist_aggregated/part_persist_aggregated_recvreq.h" +#include "ompi/mca/part/persist_aggregated/schemes/part_persist_aggregated_scheme_regular.h" + static int mca_part_persist_aggregated_progress(void); static int mca_part_persist_aggregated_precv_init(void *, size_t, size_t, ompi_datatype_t *, int, int, struct ompi_communicator_t *, struct ompi_info_t *, struct ompi_request_t **); static int mca_part_persist_aggregated_psend_init(const void*, size_t, size_t, ompi_datatype_t*, int, int, ompi_communicator_t*, struct ompi_info_t *, ompi_request_t**); @@ -49,6 +51,73 @@ ompi_part_persist_aggregated_t ompi_part_persist_aggregated = { } }; +/** + * @brief selects an internal partitioning based on the user-provided partitioning + * and the mca parameters for minimal partition size and maximal partition count. + * + * More precisely, given a partitioning into p partitions of size s, computes + * an internal partitioning into p' partitions of size s' (apart from the last one, + * which has potentially different size r * s): + * p * s = (p' - 1) * s' + r * s + * where + * s' >= s + * p' <= p + * 0 < r * s <= s' + * and + * s' <= max_message_count + * p' >= min_message_size + * (given by mca parameters). + * + * @param[in] partitions number of user-provided partitions + * @param[in] count size of user-provided partitions in elements + * @param[out] internal_partitions number of internal partitions + * @param[out] factor number of public partitions corresponding to each internal + * partitions other than the last one + * @param[out] last_size number of public partitions corresponding to the last internal + * partition + */ +static inline void +part_persist_aggregated_select_internal_partitioning(size_t partitions, + size_t part_size, + size_t *internal_partitions, + size_t *factor, + size_t *remainder) +{ + size_t buffer_size = partitions * part_size; + size_t min_part_size = ompi_part_persist_aggregated.min_message_size; + size_t max_part_count = ompi_part_persist_aggregated.max_message_count; + + // check if max_part_count imposes higher lower bound on partition size + if (max_part_count > 0 && (buffer_size / max_part_count) > min_part_size) { + min_part_size = buffer_size / max_part_count; + } + + // cannot have partitions larger than buffer size + if (min_part_size > buffer_size) { + min_part_size = buffer_size; + } + + if (part_size < min_part_size) { + // have to use larger partititions + // solve p = (p' - 1) * a + r for a (factor) and r (remainder) + *factor = min_part_size / part_size; + *internal_partitions = partitions / *factor; + *remainder = partitions % (*internal_partitions); + + if (*remainder == 0) { // size of last partition must be set + *remainder = *factor; + } else { + // number of partitions was floored, so add 1 for last (smaller) partition + *internal_partitions += 1; + } + } else { + // can keep original partitioning + *internal_partitions = partitions; + *factor = 1; + *remainder = 1; + } +} + /** * This is a helper function that frees a request. This requires ompi_part_persist_aggregated.lock be held before calling. */ @@ -59,6 +128,12 @@ mca_part_persist_aggregated_free_req(struct mca_part_persist_aggregated_request_ size_t i; opal_list_remove_item(ompi_part_persist_aggregated.progress_list, (opal_list_item_t*)req->progress_elem); OBJ_RELEASE(req->progress_elem); + + // if on sender side, free aggregation state + if (MCA_PART_PERSIST_AGGREGATED_REQUEST_PSEND == req->req_type) { + mca_part_persist_aggregated_psend_request_t *sendreq = (mca_part_persist_aggregated_psend_request_t *) req; + part_persist_aggregate_regular_free(&sendreq->aggregation_state); + } for(i = 0; i < req->real_parts; i++) { ompi_request_free(&(req->persist_reqs[i])); @@ -187,17 +262,21 @@ mca_part_persist_aggregated_progress(void) /* Set up persistent sends */ req->persist_reqs = (ompi_request_t**) malloc(sizeof(ompi_request_t*)*(req->real_parts)); - for(i = 0; i < req->real_parts; i++) { + for(i = 0; i < req->real_parts - 1; i++) { void *buf = ((void*) (((char*)req->req_addr) + (bytes * i))); err = MCA_PML_CALL(isend_init(buf, req->real_count, req->req_datatype, req->world_peer, req->my_send_tag+i, MCA_PML_BASE_SEND_STANDARD, ompi_part_persist_aggregated.part_comm, &(req->persist_reqs[i]))); } + // last transfer partition can have different size + void *buf = ((void*) (((char*)req->req_addr) + (bytes * i))); + err = MCA_PML_CALL(isend_init(buf, req->real_remainder, req->req_datatype, req->world_peer, req->my_send_tag+i, MCA_PML_BASE_SEND_STANDARD, ompi_part_persist_aggregated.part_comm, &(req->persist_reqs[i]))); } else { /* parse message */ - req->world_peer = req->setup_info[1].world_rank; - req->my_send_tag = req->setup_info[1].start_tag; - req->my_recv_tag = req->setup_info[1].setup_tag; - req->real_parts = req->setup_info[1].num_parts; - req->real_count = req->setup_info[1].count; + req->world_peer = req->setup_info[1].world_rank; + req->my_send_tag = req->setup_info[1].start_tag; + req->my_recv_tag = req->setup_info[1].setup_tag; + req->real_parts = req->setup_info[1].num_parts; + req->real_count = req->setup_info[1].count; + req->real_remainder = req->setup_info[1].remainder; err = opal_datatype_type_size(&(req->req_datatype->super), &dt_size_); if(OMPI_SUCCESS != err) return OMPI_ERROR; @@ -207,10 +286,14 @@ mca_part_persist_aggregated_progress(void) /* Set up persistent sends */ req->persist_reqs = (ompi_request_t**) malloc(sizeof(ompi_request_t*)*(req->real_parts)); req->flags = (int*) calloc(req->real_parts,sizeof(int)); - for(i = 0; i < req->real_parts; i++) { + for(i = 0; i < req->real_parts - 1; i++) { void *buf = ((void*) (((char*)req->req_addr) + (bytes * i))); err = MCA_PML_CALL(irecv_init(buf, req->real_count, req->req_datatype, req->world_peer, req->my_send_tag+i, ompi_part_persist_aggregated.part_comm, &(req->persist_reqs[i]))); } + // last transfer partition can have different size + void *buf = ((void*) (((char*)req->req_addr) + (bytes * i))); + err = MCA_PML_CALL(irecv_init(buf, req->real_remainder, req->req_datatype, req->world_peer, req->my_send_tag+i, ompi_part_persist_aggregated.part_comm, &(req->persist_reqs[i]))); + err = req->persist_reqs[0]->req_start(req->real_parts, (&(req->persist_reqs[0]))); /* Send back a message */ @@ -373,7 +456,19 @@ mca_part_persist_aggregated_psend_init(const void* buf, dt_size = (dt_size_ > (size_t) INT_MAX) ? MPI_UNDEFINED : (int) dt_size_; req->req_bytes = parts * count * dt_size; + // select internal partitioning (i.e. real_parts) here + size_t factor, remaining_partitions; + part_persist_aggregated_select_internal_partitioning(parts, count, &req->real_parts, &factor, &remaining_partitions); + + req->real_remainder = remaining_partitions * count; // convert to number of elements + req->real_count = factor * count; + req->setup_info[0].num_parts = req->real_parts; // setup info has to contain internal partitioning + req->setup_info[0].count = req->real_count; + req->setup_info[0].remainder = req->real_remainder; + opal_output_verbose(5, ompi_part_base_framework.framework_output, "mapped given %lu*%lu partitioning to internal partitioning of %lu*%lu + %lu\n", parts, count, req->real_parts - 1, req->real_count, req->real_remainder); + // init aggregation state + part_persist_aggregate_regular_init(&sendreq->aggregation_state, req->real_parts, factor, remaining_partitions); /* non-blocking send set-up data */ req->setup_info[0].world_rank = ompi_comm_rank(&ompi_mpi_comm_world.comm); @@ -381,11 +476,6 @@ mca_part_persist_aggregated_psend_init(const void* buf, req->my_send_tag = req->setup_info[0].start_tag; req->setup_info[0].setup_tag = ompi_part_persist_aggregated.next_recv_tag; ompi_part_persist_aggregated.next_recv_tag++; req->my_recv_tag = req->setup_info[0].setup_tag; - req->setup_info[0].num_parts = parts; - req->real_parts = parts; - req->setup_info[0].count = count; - req->real_count = count; - req->flags = (int*) calloc(req->real_parts, sizeof(int)); @@ -428,6 +518,13 @@ mca_part_persist_aggregated_start(size_t count, ompi_request_t** requests) for(size_t i = 0; i < _count && OMPI_SUCCESS == err; i++) { mca_part_persist_aggregated_request_t *req = (mca_part_persist_aggregated_request_t *)(requests[i]); + + // reset aggregation state here + if (MCA_PART_PERSIST_AGGREGATED_REQUEST_PSEND == req->req_type) { + mca_part_persist_aggregated_psend_request_t *sendreq = (mca_part_persist_aggregated_psend_request_t *)(req); + part_persist_aggregate_regular_reset(&sendreq->aggregation_state); + } + /* First use is a special case, to support lazy initialization */ if(false == req->first_send) { @@ -470,19 +567,31 @@ mca_part_persist_aggregated_pready(size_t min_part, size_t i; mca_part_persist_aggregated_request_t *req = (mca_part_persist_aggregated_request_t *)(request); + int flag_value; if(true == req->initialized) { - err = req->persist_reqs[min_part]->req_start(max_part-min_part+1, (&(req->persist_reqs[min_part]))); - for(i = min_part; i <= max_part && OMPI_SUCCESS == err; i++) { - req->flags[i] = 0; /* Mark partition as ready for testing */ - } + flag_value = 0; /* Mark partition as ready for testing */ } else { - for(i = min_part; i <= max_part && OMPI_SUCCESS == err; i++) { - req->flags[i] = -2; /* Mark partition as queued */ + flag_value = -2; /* Mark partition as queued */ + } + + mca_part_persist_aggregated_psend_request_t *sendreq = (mca_part_persist_aggregated_psend_request_t *)(request); + int internal_part_ready; + for(i = min_part; i <= max_part && OMPI_SUCCESS == err; i++) { + part_persist_aggregate_regular_pready(&sendreq->aggregation_state, i, &internal_part_ready); + + if (-1 != internal_part_ready) { + // transfer partition is ready + if(true == req->initialized) { + err = req->persist_reqs[internal_part_ready]->req_start(1, (&(req->persist_reqs[internal_part_ready]))); + } + + req->flags[internal_part_ready] = flag_value; } } + return err; } diff --git a/ompi/mca/part/persist_aggregated/part_persist_aggregated_request.h b/ompi/mca/part/persist_aggregated/part_persist_aggregated_request.h index 1c94eb27e93..7a59c6ef4f7 100644 --- a/ompi/mca/part/persist_aggregated/part_persist_aggregated_request.h +++ b/ompi/mca/part/persist_aggregated/part_persist_aggregated_request.h @@ -44,6 +44,7 @@ struct ompi_mca_persist_setup_t { size_t num_parts; size_t dt_size; size_t count; + size_t remainder; }; @@ -52,7 +53,7 @@ struct ompi_mca_persist_setup_t { */ struct mca_part_persist_aggregated_request_t { -/* START: These fields have to match the definition of the mca_part_persist_aggregated_request_t */ +/* START: These fields have to match the definition of the mca_part_base_request_t */ ompi_request_t req_ompi; /**< base request */ volatile int32_t req_part_complete; /**< flag indicating if the pt-2-pt layer is done with this request */ volatile int32_t req_free_called; /**< flag indicating if the user has freed this request */ @@ -68,12 +69,13 @@ struct mca_part_persist_aggregated_request_t { int32_t req_tag; /**< user defined tag */ struct ompi_proc_t* req_proc; /**< peer process */ -/* END: These fields have to match the definition of the mca_part_persist_aggregated_request_t */ +/* END: These fields have to match the definition of the mca_part_base_request_t */ size_t req_bytes; /**< bytes for completion status */ size_t real_parts; /**< internal number of partitions */ size_t real_count; + size_t real_remainder; /**< size of last internal partition (in elements) */ size_t real_dt_size; /**< receiver needs to know how large the sender's datatype is. */ size_t part_size; diff --git a/ompi/mca/part/persist_aggregated/part_persist_aggregated_sendreq.h b/ompi/mca/part/persist_aggregated/part_persist_aggregated_sendreq.h index c5a4fbbe232..40f72a25d67 100644 --- a/ompi/mca/part/persist_aggregated/part_persist_aggregated_sendreq.h +++ b/ompi/mca/part/persist_aggregated/part_persist_aggregated_sendreq.h @@ -33,8 +33,13 @@ #include "ompi/mca/part/part.h" #include "opal/prefetch.h" +#include "ompi/mca/part/persist_aggregated/schemes/part_persist_aggregated_scheme_regular.h" + + struct mca_part_persist_aggregated_psend_request_t { mca_part_persist_aggregated_request_t req_base; + + struct part_persist_aggregation_state aggregation_state; }; typedef struct mca_part_persist_aggregated_psend_request_t mca_part_persist_aggregated_psend_request_t; OBJ_CLASS_DECLARATION(mca_part_persist_aggregated_psend_request_t);