Skip to content

Commit edb9b60

Browse files
author
Luke Robison
committed
MTL/OFI: Refactor SSend ACK generation and allow MRecv to process SSend's.
This commit refactors the generation of 0-byte ACKs from the body of ompi_mtl_ofi_recv_callback so that it can be used by the ompi_mtl_ofi_mrecv_callback function as well. Previously the MRecv() callback did not generate the required ACKs, and the sender would deadlock if MRecv'ing an SSend message. Signed-off-by: Luke Robison <lrbison@amazon.com>
1 parent 6c1e9f7 commit edb9b60

File tree

1 file changed

+94
-52
lines changed

1 file changed

+94
-52
lines changed

ompi/mca/mtl/ofi/mtl_ofi.h

Lines changed: 94 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -999,6 +999,76 @@ ompi_mtl_ofi_send_generic(struct mca_mtl_base_module_t *mtl,
999999
return ofi_req.status.MPI_ERROR;
10001000
}
10011001

1002+
/*
1003+
* This routine is invoked in the case where a Recv finds the
1004+
* MTL_OFI_IS_SYNC_SEND flag was set, indicating the sender issued an SSend and
1005+
* is blocking while it waits on an ACK message.
1006+
*
1007+
* Issue a fire-and-forget send back to the src with a matching tag so that
1008+
* the sender may continue progress.
1009+
* Requires ofi_req->remote_addr and ofi_req->comm to be set.
1010+
*/
1011+
static int
1012+
ompi_mtl_ofi_gen_ssend_ack(struct fi_cq_tagged_entry *wc,
1013+
ompi_mtl_ofi_request_t *ofi_req)
1014+
{
1015+
/**
1016+
* If this recv is part of an MPI_Ssend operation, then we send an
1017+
* acknowledgment back to the sender.
1018+
* The ack message is sent without generating a completion event in
1019+
* the completion queue by not setting FI_COMPLETION in the flags to
1020+
* fi_tsendmsg(FI_SELECTIVE_COMPLETION).
1021+
* This is done since the 0 byte message requires no
1022+
* notification on the send side for a successful completion.
1023+
* If a failure occurs the provider will notify the error
1024+
* in the cq_readerr during OFI progress. Once the message has been
1025+
* successfully processed the request is marked as completed.
1026+
*/
1027+
int ctxt_id = 0;
1028+
ssize_t ret;
1029+
ompi_proc_t *ompi_proc = NULL;
1030+
mca_mtl_ofi_endpoint_t *endpoint = NULL;
1031+
int src = mtl_ofi_get_source(wc);
1032+
struct fi_msg_tagged tagged_msg;
1033+
1034+
if (ompi_mtl_ofi.total_ctxts_used > 0) {
1035+
ctxt_id = ofi_req->comm->c_contextid.cid_sub.u64 % ompi_mtl_ofi.total_ctxts_used;
1036+
} else {
1037+
ctxt_id = 0;
1038+
}
1039+
1040+
ret = MPI_SUCCESS;
1041+
1042+
/**
1043+
* If the recv request was posted for any source,
1044+
* we need to extract the source's actual address.
1045+
*/
1046+
ompi_proc = ompi_comm_peer_lookup(ofi_req->comm, src);
1047+
endpoint = ompi_mtl_ofi_get_endpoint(ofi_req->mtl, ompi_proc);
1048+
ofi_req->remote_addr = fi_rx_addr(endpoint->peer_fiaddr, ctxt_id, ompi_mtl_ofi.rx_ctx_bits);
1049+
1050+
tagged_msg.msg_iov = NULL;
1051+
tagged_msg.desc = NULL;
1052+
tagged_msg.iov_count = 0;
1053+
tagged_msg.addr = ofi_req->remote_addr;
1054+
/**
1055+
* We must continue to use the user's original tag but remove the
1056+
* sync_send protocol tag bit and instead apply the sync_send_ack
1057+
* tag bit to complete the initiator's sync send receive.
1058+
*/
1059+
tagged_msg.tag = (wc->tag | ompi_mtl_ofi.sync_send_ack) & ~ompi_mtl_ofi.sync_send;
1060+
tagged_msg.context = NULL;
1061+
tagged_msg.data = 0;
1062+
1063+
MTL_OFI_RETRY_UNTIL_DONE(fi_tsendmsg(ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep,
1064+
&tagged_msg, 0), ret);
1065+
if (OPAL_UNLIKELY(0 > ret)) {
1066+
MTL_OFI_LOG_FI_ERR(ret, "fi_tsendmsg failed during ompi_mtl_ofi_gen_ssend_ack");
1067+
ret = OMPI_ERROR;
1068+
}
1069+
return ret;
1070+
}
1071+
10021072
__opal_attribute_always_inline__ static inline int
10031073
ompi_mtl_ofi_isend_generic(struct mca_mtl_base_module_t *mtl,
10041074
struct ompi_communicator_t *comm,
@@ -1134,19 +1204,9 @@ __opal_attribute_always_inline__ static inline int
11341204
ompi_mtl_ofi_recv_callback(struct fi_cq_tagged_entry *wc,
11351205
ompi_mtl_ofi_request_t *ofi_req)
11361206
{
1137-
int ompi_ret, ctxt_id = 0;
1138-
ssize_t ret;
1139-
ompi_proc_t *ompi_proc = NULL;
1140-
mca_mtl_ofi_endpoint_t *endpoint = NULL;
1207+
int ompi_ret;
11411208
int src = mtl_ofi_get_source(wc);
11421209
ompi_status_public_t *status = NULL;
1143-
struct fi_msg_tagged tagged_msg;
1144-
1145-
if (ompi_mtl_ofi.total_ctxts_used > 0) {
1146-
ctxt_id = ofi_req->comm->c_contextid.cid_sub.u64 % ompi_mtl_ofi.total_ctxts_used;
1147-
} else {
1148-
ctxt_id = 0;
1149-
}
11501210

11511211
assert(ofi_req->super.ompi_req);
11521212
status = &ofi_req->super.ompi_req->req_status;
@@ -1157,6 +1217,7 @@ ompi_mtl_ofi_recv_callback(struct fi_cq_tagged_entry *wc,
11571217
*/
11581218
ofi_req->req_started = true;
11591219

1220+
status->MPI_ERROR = MPI_SUCCESS;
11601221
status->MPI_SOURCE = src;
11611222
status->MPI_TAG = MTL_OFI_GET_TAG(wc->tag);
11621223
status->_ucount = wc->len;
@@ -1192,53 +1253,20 @@ ompi_mtl_ofi_recv_callback(struct fi_cq_tagged_entry *wc,
11921253
*/
11931254
assert(!MTL_OFI_IS_SYNC_SEND_ACK(wc->tag));
11941255

1195-
/**
1196-
* If this recv is part of an MPI_Ssend operation, then we send an
1197-
* acknowledgment back to the sender.
1198-
* The ack message is sent without generating a completion event in
1199-
* the completion queue by not setting FI_COMPLETION in the flags to
1200-
* fi_tsendmsg(FI_SELECTIVE_COMPLETION).
1201-
* This is done since the 0 byte message requires no
1202-
* notification on the send side for a successful completion.
1203-
* If a failure occurs the provider will notify the error
1204-
* in the cq_readerr during OFI progress. Once the message has been
1205-
* successfully processed the request is marked as completed.
1206-
*/
12071256
if (OPAL_UNLIKELY(MTL_OFI_IS_SYNC_SEND(wc->tag))) {
1208-
/**
1209-
* If the recv request was posted for any source,
1210-
* we need to extract the source's actual address.
1211-
*/
1212-
if (ompi_mtl_ofi.any_addr == ofi_req->remote_addr) {
1213-
ompi_proc = ompi_comm_peer_lookup(ofi_req->comm, src);
1214-
endpoint = ompi_mtl_ofi_get_endpoint(ofi_req->mtl, ompi_proc);
1215-
ofi_req->remote_addr = fi_rx_addr(endpoint->peer_fiaddr, ctxt_id, ompi_mtl_ofi.rx_ctx_bits);
1216-
}
1257+
ompi_ret = ompi_mtl_ofi_gen_ssend_ack(wc, ofi_req);
12171258

1218-
tagged_msg.msg_iov = NULL;
1219-
tagged_msg.desc = NULL;
1220-
tagged_msg.iov_count = 0;
1221-
tagged_msg.addr = ofi_req->remote_addr;
1222-
/**
1223-
* We must continue to use the user's original tag but remove the
1224-
* sync_send protocol tag bit and instead apply the sync_send_ack
1225-
* tag bit to complete the initiator's sync send receive.
1226-
*/
1227-
tagged_msg.tag = (wc->tag | ompi_mtl_ofi.sync_send_ack) & ~ompi_mtl_ofi.sync_send;
1228-
tagged_msg.context = NULL;
1229-
tagged_msg.data = 0;
1230-
1231-
MTL_OFI_RETRY_UNTIL_DONE(fi_tsendmsg(ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep,
1232-
&tagged_msg, 0), ret);
1233-
if (OPAL_UNLIKELY(0 > ret)) {
1234-
MTL_OFI_LOG_FI_ERR(ret, "fi_tsendmsg failed");
1235-
status->MPI_ERROR = OMPI_ERROR;
1259+
if (OPAL_UNLIKELY(OMPI_SUCCESS != ompi_ret)) {
1260+
opal_output_verbose(1, opal_common_ofi.output,
1261+
"%s:%d: ompi_mtl_ofi_gen_ssend_ack failed: %d",
1262+
__FILE__, __LINE__, ompi_ret);
1263+
status->MPI_ERROR = ompi_ret;
12361264
}
12371265
}
12381266

12391267
ofi_req->super.completion_callback(&ofi_req->super);
12401268

1241-
return OMPI_SUCCESS;
1269+
return status->MPI_ERROR;
12421270
}
12431271

12441272
/**
@@ -1384,14 +1412,26 @@ ompi_mtl_ofi_mrecv_callback(struct fi_cq_tagged_entry *wc,
13841412
status->MPI_TAG = MTL_OFI_GET_TAG(wc->tag);
13851413
status->MPI_ERROR = MPI_SUCCESS;
13861414
status->_ucount = wc->len;
1415+
int ompi_ret;
13871416

13881417
ompi_mtl_ofi_deregister_and_free_buffer(ofi_req);
13891418

1419+
if (OPAL_UNLIKELY(MTL_OFI_IS_SYNC_SEND(wc->tag))) {
1420+
ompi_ret = ompi_mtl_ofi_gen_ssend_ack(wc, ofi_req);
1421+
1422+
if (OPAL_UNLIKELY(OMPI_SUCCESS != ompi_ret)) {
1423+
opal_output_verbose(1, opal_common_ofi.output,
1424+
"%s:%d: ompi_mtl_ofi_gen_ssend_ack failed: %d",
1425+
__FILE__, __LINE__, ompi_ret);
1426+
status->MPI_ERROR = ompi_ret;
1427+
}
1428+
}
1429+
13901430
free(ofi_req);
13911431

13921432
mrecv_req->completion_callback(mrecv_req);
13931433

1394-
return OMPI_SUCCESS;
1434+
return status->MPI_ERROR;
13951435
}
13961436

13971437
/**
@@ -1470,6 +1510,8 @@ ompi_mtl_ofi_imrecv(struct mca_mtl_base_module_t *mtl,
14701510
ofi_req->convertor = convertor;
14711511
ofi_req->status.MPI_ERROR = OMPI_SUCCESS;
14721512
ofi_req->mrecv_req = mtl_request;
1513+
ofi_req->comm = comm;
1514+
14731515

14741516
ompi_ret = ompi_mtl_ofi_register_buffer(convertor, ofi_req, start);
14751517
if (OPAL_UNLIKELY(OMPI_SUCCESS != ompi_ret)) {

0 commit comments

Comments
 (0)