Skip to content

Commit aa26a72

Browse files
xinzhao3artpol84
authored andcommitted
opal/common/ucx: introduce internal UCX request in wpool.
Signed-off-by: Artem Polyakov <artpol84@gmail.com>
1 parent 07cb413 commit aa26a72

File tree

7 files changed

+68
-20
lines changed

7 files changed

+68
-20
lines changed

opal/mca/common/ucx/Makefile.am

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@ headers = \
1616
common_ucx.h \
1717
common_ucx_int.h \
1818
common_ucx_wpool.h \
19-
common_ucx_wpool_int.h
19+
common_ucx_wpool_int.h \
20+
common_ucx_request.h
2021

2122
# Source files
2223

2324
sources = \
2425
common_ucx.c \
25-
common_ucx_wpool.c
26+
common_ucx_wpool.c \
27+
common_ucx_request.c
2628

2729
# Help file
2830

opal/mca/common/ucx/common_ucx.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,6 @@
1515

1616
#include "common_ucx_int.h"
1717
#include "common_ucx_wpool.h"
18+
#include "common_ucx_request.h"
1819

1920
#endif

opal/mca/common/ucx/common_ucx_int.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#define COMMON_UCX_INT_H
33

44
#include "opal_config.h"
5+
#include "common_ucx_request.h"
56

67
#include <stdint.h>
78

@@ -170,10 +171,11 @@ static inline
170171
ucs_status_ptr_t opal_common_ucx_atomic_fetch_nb(ucp_ep_h ep, ucp_atomic_fetch_op_t opcode,
171172
uint64_t value, void *result, size_t op_size,
172173
uint64_t remote_addr, ucp_rkey_h rkey,
174+
ucp_send_callback_t req_handler,
173175
ucp_worker_h worker)
174176
{
175177
return ucp_atomic_fetch_nb(ep, opcode, value, result, op_size,
176-
remote_addr, rkey, opal_common_ucx_empty_complete_cb);
178+
remote_addr, rkey, req_handler);
177179
}
178180

179181
static inline
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#include "common_ucx_request.h"
2+
3+
OPAL_DECLSPEC void
4+
opal_common_ucx_req_init(void *request) {
5+
opal_common_ucx_request_t *req = (opal_common_ucx_request_t *)request;
6+
req->ext_req = NULL;
7+
req->ext_cb = NULL;
8+
}
9+
10+
OPAL_DECLSPEC void
11+
opal_common_ucx_req_completion(void *request, ucs_status_t status) {
12+
opal_common_ucx_request_t *req = (opal_common_ucx_request_t *)request;
13+
if (req->ext_cb != NULL) {
14+
(*req->ext_cb)(req->ext_req);
15+
}
16+
ucp_request_release(req);
17+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#ifndef COMMON_UCX_REQUEST_H
2+
#define COMMON_UCX_REQUEST_H
3+
4+
#include "opal_config.h"
5+
#include <ucp/api/ucp.h>
6+
7+
typedef void (*opal_common_ucx_user_req_handler_t)(void *request);
8+
9+
typedef struct {
10+
void *ext_req;
11+
opal_common_ucx_user_req_handler_t ext_cb;
12+
} opal_common_ucx_request_t;
13+
14+
OPAL_DECLSPEC void opal_common_ucx_req_init(void *request);
15+
OPAL_DECLSPEC void opal_common_ucx_req_completion(void *request, ucs_status_t status);
16+
17+
#endif // COMMON_UCX_REQUEST_H

opal/mca/common/ucx/common_ucx_wpool.c

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -126,9 +126,7 @@ opal_common_ucx_wpool_free(opal_common_ucx_wpool_t *wpool)
126126

127127
OPAL_DECLSPEC int
128128
opal_common_ucx_wpool_init(opal_common_ucx_wpool_t *wpool,
129-
int proc_world_size,
130-
ucp_request_init_callback_t req_init_ptr,
131-
size_t req_size, bool enable_mt)
129+
int proc_world_size, bool enable_mt)
132130
{
133131
ucp_config_t *config = NULL;
134132
ucp_params_t context_params;
@@ -164,8 +162,8 @@ opal_common_ucx_wpool_init(opal_common_ucx_wpool_t *wpool,
164162
UCP_FEATURE_AMO64;
165163
context_params.mt_workers_shared = (enable_mt ? 1 : 0);
166164
context_params.estimated_num_eps = proc_world_size;
167-
context_params.request_init = req_init_ptr;
168-
context_params.request_size = req_size;
165+
context_params.request_init = opal_common_ucx_req_init;
166+
context_params.request_size = sizeof(opal_common_ucx_request_t);
169167

170168
status = ucp_init(&context_params, config, &wpool->ucp_ctx);
171169
ucp_config_release(config);
@@ -1272,4 +1270,3 @@ opal_common_ucx_wpmem_fence(opal_common_ucx_wpmem_t *mem) {
12721270
/* TODO */
12731271
return OPAL_SUCCESS;
12741272
}
1275-

opal/mca/common/ucx/common_ucx_wpool.h

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include "opal_config.h"
66

77
#include "common_ucx_int.h"
8+
#include "common_ucx_request.h"
89
#include <stdint.h>
910

1011
#include <ucp/api/ucp.h>
@@ -176,9 +177,7 @@ static inline void opal_common_ucx_wpool_dbg_init(void)
176177
OPAL_DECLSPEC opal_common_ucx_wpool_t * opal_common_ucx_wpool_allocate(void);
177178
OPAL_DECLSPEC void opal_common_ucx_wpool_free(opal_common_ucx_wpool_t *wpool);
178179
OPAL_DECLSPEC int opal_common_ucx_wpool_init(opal_common_ucx_wpool_t *wpool,
179-
int proc_world_size,
180-
ucp_request_init_callback_t req_init_ptr,
181-
size_t req_size, bool enable_mt);
180+
int proc_world_size, bool enable_mt);
182181
OPAL_DECLSPEC void opal_common_ucx_wpool_finalize(opal_common_ucx_wpool_t *wpool);
183182
OPAL_DECLSPEC void opal_common_ucx_wpool_progress(opal_common_ucx_wpool_t *wpool);
184183

@@ -394,27 +393,40 @@ opal_common_ucx_wpmem_fetch(opal_common_ucx_wpmem_t *mem,
394393

395394
static inline int
396395
opal_common_ucx_wpmem_fetch_nb(opal_common_ucx_wpmem_t *mem,
397-
ucp_atomic_fetch_op_t opcode,
398-
uint64_t value,
399-
int target, void *buffer, size_t len,
400-
uint64_t rem_addr, ucs_status_ptr_t *ptr)
396+
ucp_atomic_fetch_op_t opcode,
397+
uint64_t value,
398+
int target, void *buffer, size_t len,
399+
uint64_t rem_addr,
400+
opal_common_ucx_user_req_handler_t user_req_cb,
401+
void *user_req_ptr)
401402
{
402403
ucp_ep_h ep = NULL;
403404
ucp_rkey_h rkey = NULL;
404405
opal_common_ucx_winfo_t *winfo = NULL;
405406
int rc = OPAL_SUCCESS;
407+
opal_common_ucx_request_t *req;
408+
406409
rc = opal_common_ucx_tlocal_fetch(mem, target, &ep, &rkey, &winfo);
407410
if(OPAL_UNLIKELY(OPAL_SUCCESS != rc)){
408411
MCA_COMMON_UCX_ERROR("tlocal_fetch failed: %d", rc);
409412
return rc;
410413
}
411414
/* Perform the operation */
412415
opal_mutex_lock(&winfo->mutex);
413-
(*ptr) = opal_common_ucx_atomic_fetch_nb(ep, opcode, value,
414-
buffer, len,
415-
rem_addr, rkey,
416-
winfo->worker);
416+
req = opal_common_ucx_atomic_fetch_nb(ep, opcode, value, buffer, len,
417+
rem_addr, rkey, opal_common_ucx_req_completion,
418+
winfo->worker);
417419
opal_mutex_unlock(&winfo->mutex);
420+
421+
if (UCS_PTR_IS_PTR(req)) {
422+
req->ext_req = user_req_ptr;
423+
req->ext_cb = user_req_cb;
424+
} else {
425+
if (user_req_cb != NULL) {
426+
(*user_req_cb)(user_req_ptr);
427+
}
428+
}
429+
418430
return rc;
419431
}
420432

0 commit comments

Comments
 (0)