Skip to content

Commit 6839555

Browse files
authored
Merge pull request #11327 from lrbison/mrecv_ack
MTL/OFI: Refactor SSend ACK generation and allow MRecv to process SSend's.
2 parents 861a798 + edb9b60 commit 6839555

File tree

1 file changed

+94
-52
lines changed

1 file changed

+94
-52
lines changed

ompi/mca/mtl/ofi/mtl_ofi.h

Lines changed: 94 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1000,6 +1000,76 @@ ompi_mtl_ofi_send_generic(struct mca_mtl_base_module_t *mtl,
10001000
return ofi_req.status.MPI_ERROR;
10011001
}
10021002

1003+
/*
1004+
* This routine is invoked in the case where a Recv finds the
1005+
* MTL_OFI_IS_SYNC_SEND flag was set, indicating the sender issued an SSend and
1006+
* is blocking while it waits on an ACK message.
1007+
*
1008+
* Issue a fire-and-forget send back to the src with a matching tag so that
1009+
* the sender may continue progress.
1010+
* Requires ofi_req->remote_addr and ofi_req->comm to be set.
1011+
*/
1012+
static int
1013+
ompi_mtl_ofi_gen_ssend_ack(struct fi_cq_tagged_entry *wc,
1014+
ompi_mtl_ofi_request_t *ofi_req)
1015+
{
1016+
/**
1017+
* If this recv is part of an MPI_Ssend operation, then we send an
1018+
* acknowledgment back to the sender.
1019+
* The ack message is sent without generating a completion event in
1020+
* the completion queue by not setting FI_COMPLETION in the flags to
1021+
* fi_tsendmsg(FI_SELECTIVE_COMPLETION).
1022+
* This is done since the 0 byte message requires no
1023+
* notification on the send side for a successful completion.
1024+
* If a failure occurs the provider will notify the error
1025+
* in the cq_readerr during OFI progress. Once the message has been
1026+
* successfully processed the request is marked as completed.
1027+
*/
1028+
int ctxt_id = 0;
1029+
ssize_t ret;
1030+
ompi_proc_t *ompi_proc = NULL;
1031+
mca_mtl_ofi_endpoint_t *endpoint = NULL;
1032+
int src = mtl_ofi_get_source(wc);
1033+
struct fi_msg_tagged tagged_msg;
1034+
1035+
if (ompi_mtl_ofi.total_ctxts_used > 0) {
1036+
ctxt_id = ofi_req->comm->c_contextid.cid_sub.u64 % ompi_mtl_ofi.total_ctxts_used;
1037+
} else {
1038+
ctxt_id = 0;
1039+
}
1040+
1041+
ret = MPI_SUCCESS;
1042+
1043+
/**
1044+
* If the recv request was posted for any source,
1045+
* we need to extract the source's actual address.
1046+
*/
1047+
ompi_proc = ompi_comm_peer_lookup(ofi_req->comm, src);
1048+
endpoint = ompi_mtl_ofi_get_endpoint(ofi_req->mtl, ompi_proc);
1049+
ofi_req->remote_addr = fi_rx_addr(endpoint->peer_fiaddr, ctxt_id, ompi_mtl_ofi.rx_ctx_bits);
1050+
1051+
tagged_msg.msg_iov = NULL;
1052+
tagged_msg.desc = NULL;
1053+
tagged_msg.iov_count = 0;
1054+
tagged_msg.addr = ofi_req->remote_addr;
1055+
/**
1056+
* We must continue to use the user's original tag but remove the
1057+
* sync_send protocol tag bit and instead apply the sync_send_ack
1058+
* tag bit to complete the initiator's sync send receive.
1059+
*/
1060+
tagged_msg.tag = (wc->tag | ompi_mtl_ofi.sync_send_ack) & ~ompi_mtl_ofi.sync_send;
1061+
tagged_msg.context = NULL;
1062+
tagged_msg.data = 0;
1063+
1064+
MTL_OFI_RETRY_UNTIL_DONE(fi_tsendmsg(ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep,
1065+
&tagged_msg, 0), ret);
1066+
if (OPAL_UNLIKELY(0 > ret)) {
1067+
MTL_OFI_LOG_FI_ERR(ret, "fi_tsendmsg failed during ompi_mtl_ofi_gen_ssend_ack");
1068+
ret = OMPI_ERROR;
1069+
}
1070+
return ret;
1071+
}
1072+
10031073
__opal_attribute_always_inline__ static inline int
10041074
ompi_mtl_ofi_isend_generic(struct mca_mtl_base_module_t *mtl,
10051075
struct ompi_communicator_t *comm,
@@ -1136,19 +1206,9 @@ __opal_attribute_always_inline__ static inline int
11361206
ompi_mtl_ofi_recv_callback(struct fi_cq_tagged_entry *wc,
11371207
ompi_mtl_ofi_request_t *ofi_req)
11381208
{
1139-
int ompi_ret, ctxt_id = 0;
1140-
ssize_t ret;
1141-
ompi_proc_t *ompi_proc = NULL;
1142-
mca_mtl_ofi_endpoint_t *endpoint = NULL;
1209+
int ompi_ret;
11431210
int src = mtl_ofi_get_source(wc);
11441211
ompi_status_public_t *status = NULL;
1145-
struct fi_msg_tagged tagged_msg;
1146-
1147-
if (ompi_mtl_ofi.total_ctxts_used > 0) {
1148-
ctxt_id = ofi_req->comm->c_contextid.cid_sub.u64 % ompi_mtl_ofi.total_ctxts_used;
1149-
} else {
1150-
ctxt_id = 0;
1151-
}
11521212

11531213
assert(ofi_req->super.ompi_req);
11541214
status = &ofi_req->super.ompi_req->req_status;
@@ -1159,6 +1219,7 @@ ompi_mtl_ofi_recv_callback(struct fi_cq_tagged_entry *wc,
11591219
*/
11601220
ofi_req->req_started = true;
11611221

1222+
status->MPI_ERROR = MPI_SUCCESS;
11621223
status->MPI_SOURCE = src;
11631224
status->MPI_TAG = MTL_OFI_GET_TAG(wc->tag);
11641225
status->_ucount = wc->len;
@@ -1194,53 +1255,20 @@ ompi_mtl_ofi_recv_callback(struct fi_cq_tagged_entry *wc,
11941255
*/
11951256
assert(!MTL_OFI_IS_SYNC_SEND_ACK(wc->tag));
11961257

1197-
/**
1198-
* If this recv is part of an MPI_Ssend operation, then we send an
1199-
* acknowledgment back to the sender.
1200-
* The ack message is sent without generating a completion event in
1201-
* the completion queue by not setting FI_COMPLETION in the flags to
1202-
* fi_tsendmsg(FI_SELECTIVE_COMPLETION).
1203-
* This is done since the 0 byte message requires no
1204-
* notification on the send side for a successful completion.
1205-
* If a failure occurs the provider will notify the error
1206-
* in the cq_readerr during OFI progress. Once the message has been
1207-
* successfully processed the request is marked as completed.
1208-
*/
12091258
if (OPAL_UNLIKELY(MTL_OFI_IS_SYNC_SEND(wc->tag))) {
1210-
/**
1211-
* If the recv request was posted for any source,
1212-
* we need to extract the source's actual address.
1213-
*/
1214-
if (ompi_mtl_ofi.any_addr == ofi_req->remote_addr) {
1215-
ompi_proc = ompi_comm_peer_lookup(ofi_req->comm, src);
1216-
endpoint = ompi_mtl_ofi_get_endpoint(ofi_req->mtl, ompi_proc);
1217-
ofi_req->remote_addr = fi_rx_addr(endpoint->peer_fiaddr, ctxt_id, ompi_mtl_ofi.rx_ctx_bits);
1218-
}
1259+
ompi_ret = ompi_mtl_ofi_gen_ssend_ack(wc, ofi_req);
12191260

1220-
tagged_msg.msg_iov = NULL;
1221-
tagged_msg.desc = NULL;
1222-
tagged_msg.iov_count = 0;
1223-
tagged_msg.addr = ofi_req->remote_addr;
1224-
/**
1225-
* We must continue to use the user's original tag but remove the
1226-
* sync_send protocol tag bit and instead apply the sync_send_ack
1227-
* tag bit to complete the initiator's sync send receive.
1228-
*/
1229-
tagged_msg.tag = (wc->tag | ompi_mtl_ofi.sync_send_ack) & ~ompi_mtl_ofi.sync_send;
1230-
tagged_msg.context = NULL;
1231-
tagged_msg.data = 0;
1232-
1233-
MTL_OFI_RETRY_UNTIL_DONE(fi_tsendmsg(ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep,
1234-
&tagged_msg, 0), ret);
1235-
if (OPAL_UNLIKELY(0 > ret)) {
1236-
MTL_OFI_LOG_FI_ERR(ret, "fi_tsendmsg failed");
1237-
status->MPI_ERROR = OMPI_ERROR;
1261+
if (OPAL_UNLIKELY(OMPI_SUCCESS != ompi_ret)) {
1262+
opal_output_verbose(1, opal_common_ofi.output,
1263+
"%s:%d: ompi_mtl_ofi_gen_ssend_ack failed: %d",
1264+
__FILE__, __LINE__, ompi_ret);
1265+
status->MPI_ERROR = ompi_ret;
12381266
}
12391267
}
12401268

12411269
ofi_req->super.completion_callback(&ofi_req->super);
12421270

1243-
return OMPI_SUCCESS;
1271+
return status->MPI_ERROR;
12441272
}
12451273

12461274
/**
@@ -1386,14 +1414,26 @@ ompi_mtl_ofi_mrecv_callback(struct fi_cq_tagged_entry *wc,
13861414
status->MPI_TAG = MTL_OFI_GET_TAG(wc->tag);
13871415
status->MPI_ERROR = MPI_SUCCESS;
13881416
status->_ucount = wc->len;
1417+
int ompi_ret;
13891418

13901419
ompi_mtl_ofi_deregister_and_free_buffer(ofi_req);
13911420

1421+
if (OPAL_UNLIKELY(MTL_OFI_IS_SYNC_SEND(wc->tag))) {
1422+
ompi_ret = ompi_mtl_ofi_gen_ssend_ack(wc, ofi_req);
1423+
1424+
if (OPAL_UNLIKELY(OMPI_SUCCESS != ompi_ret)) {
1425+
opal_output_verbose(1, opal_common_ofi.output,
1426+
"%s:%d: ompi_mtl_ofi_gen_ssend_ack failed: %d",
1427+
__FILE__, __LINE__, ompi_ret);
1428+
status->MPI_ERROR = ompi_ret;
1429+
}
1430+
}
1431+
13921432
free(ofi_req);
13931433

13941434
mrecv_req->completion_callback(mrecv_req);
13951435

1396-
return OMPI_SUCCESS;
1436+
return status->MPI_ERROR;
13971437
}
13981438

13991439
/**
@@ -1472,6 +1512,8 @@ ompi_mtl_ofi_imrecv(struct mca_mtl_base_module_t *mtl,
14721512
ofi_req->convertor = convertor;
14731513
ofi_req->status.MPI_ERROR = OMPI_SUCCESS;
14741514
ofi_req->mrecv_req = mtl_request;
1515+
ofi_req->comm = comm;
1516+
14751517

14761518
ompi_ret = ompi_mtl_ofi_register_buffer(convertor, ofi_req, start);
14771519
if (OPAL_UNLIKELY(OMPI_SUCCESS != ompi_ret)) {

0 commit comments

Comments
 (0)