Skip to content

Commit 3351742

Browse files
xinzhao3artpol84
authored andcommitted
opal/common/ucx: add periodical flush and counter to opal directory.
Signed-off-by: Xin Zhao <xinz@mellanox.com>
1 parent 1fa7054 commit 3351742

File tree

3 files changed

+182
-29
lines changed

3 files changed

+182
-29
lines changed

opal/mca/common/ucx/common_ucx_int.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ BEGIN_C_DECLS
2525
# define MCA_COMMON_UCX_ASSERT(_x)
2626
#endif
2727

28+
#define MCA_COMMON_UCX_PER_TARGET_OPS_THRESHOLD 100000
29+
#define MCA_COMMON_UCX_GLOBAL_OPS_THRESHOLD 1000000
30+
2831
#define _MCA_COMMON_UCX_QUOTE(_x) \
2932
# _x
3033
#define MCA_COMMON_UCX_QUOTE(_x) \

opal/mca/common/ucx/common_ucx_wpool.c

Lines changed: 79 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ _winfo_create(opal_common_ucx_wpool_t *wpool)
6262
winfo->endpoints = NULL;
6363
winfo->comm_size = 0;
6464
winfo->released = 0;
65+
winfo->inflight_ops = NULL;
66+
winfo->global_inflight_ops = 0;
67+
winfo->inflight_req = UCS_OK;
6568

6669
WPOOL_DBG_OUT(_dbg_winfo, "winfo = %p, worker = %p\n",
6770
(void*)winfo, (void *)winfo->worker);
@@ -76,14 +79,24 @@ _winfo_create(opal_common_ucx_wpool_t *wpool)
7679
static void
7780
_winfo_reset(opal_common_ucx_winfo_t *winfo)
7881
{
82+
if (winfo->inflight_req != UCS_OK) {
83+
opal_common_ucx_wait_request(winfo->inflight_req, winfo->worker,
84+
"opal_common_ucx_flush");
85+
winfo->inflight_req = UCS_OK;
86+
}
87+
88+
assert(winfo->global_inflight_ops == 0);
89+
7990
if(winfo->comm_size != 0) {
8091
size_t i;
8192
for (i = 0; i < winfo->comm_size; i++) {
8293
if (NULL != winfo->endpoints[i]){
8394
ucp_ep_destroy(winfo->endpoints[i]);
8495
}
96+
assert(winfo->inflight_ops[i] == 0);
8597
}
8698
free(winfo->endpoints);
99+
free(winfo->inflight_ops);
87100
}
88101
winfo->endpoints = NULL;
89102
winfo->comm_size = 0;
@@ -372,6 +385,7 @@ _wpool_get_idle(opal_common_ucx_wpool_t *wpool, size_t comm_size)
372385
(void *)wpool, (void *)winfo);
373386

374387
winfo->endpoints = calloc(comm_size, sizeof(ucp_ep_h));
388+
winfo->inflight_ops = calloc(comm_size, sizeof(short));
375389
winfo->comm_size = comm_size;
376390
return winfo;
377391
}
@@ -1213,6 +1227,46 @@ opal_common_ucx_tlocal_fetch_spath(opal_common_ucx_wpmem_t *mem, int target)
12131227
return OPAL_SUCCESS;
12141228
}
12151229

1230+
OPAL_DECLSPEC int
1231+
opal_common_ucx_flush(ucp_ep_h ep, ucp_worker_h worker,
1232+
opal_common_ucx_flush_type_t type,
1233+
opal_common_ucx_flush_scope_t scope,
1234+
ucs_status_ptr_t *req_ptr)
1235+
{
1236+
ucs_status_ptr_t req;
1237+
ucs_status_t status = UCS_OK;
1238+
int rc = OPAL_SUCCESS;
1239+
1240+
#if HAVE_DECL_UCP_EP_FLUSH_NB
1241+
if (scope == OPAL_COMMON_UCX_SCOPE_EP) {
1242+
req = ucp_ep_flush_nb(ep, 0, opal_common_ucx_empty_complete_cb);
1243+
} else {
1244+
req = ucp_worker_flush_nb(worker, 0, opal_common_ucx_empty_complete_cb);
1245+
}
1246+
if(OPAL_COMMON_UCX_FLUSH_B) {
1247+
rc = opal_common_ucx_wait_request(req, worker, "ucp_ep_flush_nb");
1248+
} else {
1249+
*req_ptr = req;
1250+
}
1251+
return rc;
1252+
#endif
1253+
switch (type) {
1254+
case OPAL_COMMON_UCX_FLUSH_NB_PREFERRED:
1255+
case OPAL_COMMON_UCX_FLUSH_B:
1256+
if (scope == OPAL_COMMON_UCX_SCOPE_EP) {
1257+
status = ucp_ep_flush(ep);
1258+
} else {
1259+
status = ucp_worker_flush(worker);
1260+
}
1261+
rc = (status == UCS_OK) ? OPAL_SUCCESS : OPAL_ERROR;
1262+
case OPAL_COMMON_UCX_FLUSH_NB:
1263+
default:
1264+
rc = OPAL_ERROR;
1265+
}
1266+
return rc;
1267+
}
1268+
1269+
12161270
OPAL_DECLSPEC int
12171271
opal_common_ucx_wpmem_flush(opal_common_ucx_wpmem_t *mem,
12181272
opal_common_ucx_flush_scope_t scope,
@@ -1228,37 +1282,36 @@ opal_common_ucx_wpmem_flush(opal_common_ucx_wpmem_t *mem,
12281282
opal_mutex_lock(&ctx->mutex);
12291283

12301284
OPAL_LIST_FOREACH(item, &ctx->tls_workers, _ctx_record_list_item_t) {
1285+
if ((scope == OPAL_COMMON_UCX_SCOPE_EP) &&
1286+
(NULL == item->ptr->endpoints[target])) {
1287+
continue;
1288+
}
1289+
opal_mutex_lock(&item->ptr->mutex);
1290+
rc = opal_common_ucx_flush(item->ptr->endpoints[target],
1291+
item->ptr->worker, OPAL_COMMON_UCX_FLUSH_B,
1292+
scope, NULL);
12311293
switch (scope) {
12321294
case OPAL_COMMON_UCX_SCOPE_WORKER:
1233-
opal_mutex_lock(&item->ptr->mutex);
1234-
rc = opal_common_ucx_worker_flush(item->ptr->worker);
1235-
if (rc != OPAL_SUCCESS) {
1236-
MCA_COMMON_UCX_ERROR("opal_common_ucx_worker_flush failed: %d",
1237-
rc);
1238-
rc = OPAL_ERROR;
1239-
}
1240-
WPOOL_DBG_OUT(_dbg_tls || _dbg_mem, "worker = %p\n",
1241-
(void *)item->ptr->worker);
1242-
opal_mutex_unlock(&item->ptr->mutex);
1295+
item->ptr->global_inflight_ops = 0;
1296+
memset(item->ptr->inflight_ops, 0, item->ptr->comm_size * sizeof(short));
12431297
break;
12441298
case OPAL_COMMON_UCX_SCOPE_EP:
1245-
if (NULL != item->ptr->endpoints[target] ) {
1246-
opal_mutex_lock(&item->ptr->mutex);
1247-
rc = opal_common_ucx_ep_flush(item->ptr->endpoints[target],
1248-
item->ptr->worker);
1249-
if (rc != OPAL_SUCCESS) {
1250-
MCA_COMMON_UCX_ERROR("opal_common_ucx_ep_flush failed: %d",
1251-
rc);
1252-
rc = OPAL_ERROR;
1253-
}
1254-
WPOOL_DBG_OUT(_dbg_tls || _dbg_mem,
1255-
"target = %d, ep = %p worker = %p\n",
1256-
(int)target,
1257-
(void *)item->ptr->endpoints[target],
1258-
(void *)item->ptr->worker);
1259-
opal_mutex_unlock(&item->ptr->mutex);
1260-
}
1299+
item->ptr->global_inflight_ops -= item->ptr->inflight_ops[target];
1300+
item->ptr->inflight_ops[target] = 0;
1301+
break;
1302+
}
1303+
opal_mutex_unlock(&item->ptr->mutex);
1304+
1305+
if (rc != OPAL_SUCCESS) {
1306+
MCA_COMMON_UCX_ERROR("opal_common_ucx_flush failed: %d",
1307+
rc);
1308+
rc = OPAL_ERROR;
12611309
}
1310+
WPOOL_DBG_OUT(_dbg_tls || _dbg_mem,
1311+
"target = %d, ep = %p worker = %p\n",
1312+
(int)target,
1313+
(void *)item->ptr->endpoints[target],
1314+
(void *)item->ptr->worker);
12621315
}
12631316
opal_mutex_unlock(&ctx->mutex);
12641317

opal/mca/common/ucx/common_ucx_wpool.h

Lines changed: 100 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include "common_ucx_int.h"
88
#include "common_ucx_request.h"
99
#include <stdint.h>
10+
#include <string.h>
1011

1112
#include <ucp/api/ucp.h>
1213
#include <pthread.h>
@@ -84,6 +85,9 @@ typedef struct {
8485
ucp_worker_h worker;
8586
ucp_ep_h *endpoints;
8687
size_t comm_size;
88+
short *inflight_ops;
89+
short global_inflight_ops;
90+
ucs_status_ptr_t inflight_req;
8791
} opal_common_ucx_winfo_t;
8892

8993
typedef struct {
@@ -101,6 +105,12 @@ typedef enum {
101105
OPAL_COMMON_UCX_SCOPE_WORKER
102106
} opal_common_ucx_flush_scope_t;
103107

108+
typedef enum {
109+
OPAL_COMMON_UCX_FLUSH_NB,
110+
OPAL_COMMON_UCX_FLUSH_B,
111+
OPAL_COMMON_UCX_FLUSH_NB_PREFERRED
112+
} opal_common_ucx_flush_type_t;
113+
104114
typedef enum {
105115
OPAL_COMMON_UCX_MEM_ALLOCATE_MAP,
106116
OPAL_COMMON_UCX_MEM_MAP
@@ -236,6 +246,58 @@ OPAL_DECLSPEC int opal_common_ucx_wpmem_flush(opal_common_ucx_wpmem_t *mem,
236246
int target);
237247
OPAL_DECLSPEC int opal_common_ucx_wpmem_fence(opal_common_ucx_wpmem_t *mem);
238248

249+
OPAL_DECLSPEC int opal_common_ucx_flush(ucp_ep_h ep, ucp_worker_h worker,
250+
opal_common_ucx_flush_type_t type,
251+
opal_common_ucx_flush_scope_t scope,
252+
ucs_status_ptr_t *req_ptr);
253+
254+
static inline int _periodical_flush_nb(opal_common_ucx_wpmem_t *mem,
255+
opal_common_ucx_winfo_t *winfo,
256+
int target) {
257+
int rc = OPAL_SUCCESS;
258+
259+
winfo->inflight_ops[target]++;
260+
winfo->global_inflight_ops++;
261+
262+
if (OPAL_UNLIKELY(winfo->inflight_ops[target] >= MCA_COMMON_UCX_PER_TARGET_OPS_THRESHOLD) ||
263+
OPAL_UNLIKELY(winfo->global_inflight_ops >= MCA_COMMON_UCX_GLOBAL_OPS_THRESHOLD)) {
264+
opal_common_ucx_flush_scope_t scope;
265+
266+
if (winfo->inflight_req != UCS_OK) {
267+
rc = opal_common_ucx_wait_request(winfo->inflight_req, winfo->worker,
268+
"opal_common_ucx_flush_nb");
269+
if(OPAL_UNLIKELY(OPAL_SUCCESS != rc)){
270+
MCA_COMMON_UCX_VERBOSE(1, "opal_common_ucx_wait_request failed: %d", rc);
271+
return rc;
272+
}
273+
winfo->inflight_req = UCS_OK;
274+
}
275+
276+
if (winfo->global_inflight_ops >= MCA_COMMON_UCX_GLOBAL_OPS_THRESHOLD) {
277+
scope = OPAL_COMMON_UCX_SCOPE_WORKER;
278+
winfo->global_inflight_ops = 0;
279+
memset(winfo->inflight_ops, 0, winfo->comm_size * sizeof(short));
280+
} else {
281+
scope = OPAL_COMMON_UCX_SCOPE_EP;
282+
winfo->global_inflight_ops -= winfo->inflight_ops[target];
283+
winfo->inflight_ops[target] = 0;
284+
}
285+
286+
rc = opal_common_ucx_flush(winfo->endpoints[target], winfo->worker,
287+
OPAL_COMMON_UCX_FLUSH_NB_PREFERRED, scope,
288+
&winfo->inflight_req);
289+
if(OPAL_UNLIKELY(OPAL_SUCCESS != rc)){
290+
MCA_COMMON_UCX_VERBOSE(1, "opal_common_ucx_flush failed: %d", rc);
291+
return rc;
292+
}
293+
} else if (OPAL_UNLIKELY(winfo->inflight_req != UCS_OK)) {
294+
int ret;
295+
do {
296+
ret = ucp_worker_progress(winfo->worker);
297+
} while (ret);
298+
}
299+
return rc;
300+
}
239301

240302
static inline int
241303
opal_common_ucx_wpmem_putget(opal_common_ucx_wpmem_t *mem, opal_common_ucx_op_t op,
@@ -269,7 +331,6 @@ opal_common_ucx_wpmem_putget(opal_common_ucx_wpmem_t *mem, opal_common_ucx_op_t
269331
called_func = "ucp_get_nbi";
270332
break;
271333
}
272-
opal_mutex_unlock(&winfo->mutex);
273334

274335
if (OPAL_UNLIKELY(status != UCS_OK && status != UCS_INPROGRESS)) {
275336
MCA_COMMON_UCX_ERROR("%s failed: %d", called_func, status);
@@ -278,6 +339,15 @@ opal_common_ucx_wpmem_putget(opal_common_ucx_wpmem_t *mem, opal_common_ucx_op_t
278339
WPOOL_DBG_OUT(_dbg_mem,"ep = %p, rkey = %p\n",
279340
(void *)ep, (void *)rkey);
280341
}
342+
343+
rc = _periodical_flush_nb(mem, winfo, target);
344+
if(OPAL_UNLIKELY(OPAL_SUCCESS != rc)){
345+
MCA_COMMON_UCX_VERBOSE(1, "_incr_and_check_inflight_ops failed: %d", rc);
346+
return rc;
347+
}
348+
349+
opal_mutex_unlock(&winfo->mutex);
350+
281351
return rc;
282352
}
283353

@@ -314,6 +384,13 @@ opal_common_ucx_wpmem_cmpswp(opal_common_ucx_wpmem_t *mem, uint64_t compare,
314384
WPOOL_DBG_OUT(_dbg_mem, "ep = %p, rkey = %p\n",
315385
(void *)ep, (void *)rkey);
316386
}
387+
388+
rc = _periodical_flush_nb(mem, winfo, target);
389+
if(OPAL_UNLIKELY(OPAL_SUCCESS != rc)){
390+
MCA_COMMON_UCX_VERBOSE(1, "_incr_and_check_inflight_ops failed: %d", rc);
391+
return rc;
392+
}
393+
317394
opal_mutex_unlock(&winfo->mutex);
318395

319396
return rc;
@@ -349,6 +426,13 @@ opal_common_ucx_wpmem_post(opal_common_ucx_wpmem_t *mem, ucp_atomic_post_op_t op
349426
WPOOL_DBG_OUT(_dbg_mem, "ep = %p, rkey = %p\n",
350427
(void *)ep, (void *)rkey);
351428
}
429+
430+
rc = _periodical_flush_nb(mem, winfo, target);
431+
if(OPAL_UNLIKELY(OPAL_SUCCESS != rc)){
432+
MCA_COMMON_UCX_VERBOSE(1, "_incr_and_check_inflight_ops failed: %d", rc);
433+
return rc;
434+
}
435+
352436
opal_mutex_unlock(&winfo->mutex);
353437
return rc;
354438
}
@@ -386,6 +470,13 @@ opal_common_ucx_wpmem_fetch(opal_common_ucx_wpmem_t *mem,
386470
WPOOL_DBG_OUT(_dbg_mem, "ep = %p, rkey = %p\n",
387471
(void *)ep, (void *)rkey);
388472
}
473+
474+
rc = _periodical_flush_nb(mem, winfo, target);
475+
if(OPAL_UNLIKELY(OPAL_SUCCESS != rc)){
476+
MCA_COMMON_UCX_VERBOSE(1, "_incr_and_check_inflight_ops failed: %d", rc);
477+
return rc;
478+
}
479+
389480
opal_mutex_unlock(&winfo->mutex);
390481

391482
return rc;
@@ -416,8 +507,6 @@ opal_common_ucx_wpmem_fetch_nb(opal_common_ucx_wpmem_t *mem,
416507
req = opal_common_ucx_atomic_fetch_nb(ep, opcode, value, buffer, len,
417508
rem_addr, rkey, opal_common_ucx_req_completion,
418509
winfo->worker);
419-
opal_mutex_unlock(&winfo->mutex);
420-
421510
if (UCS_PTR_IS_PTR(req)) {
422511
req->ext_req = user_req_ptr;
423512
req->ext_cb = user_req_cb;
@@ -427,6 +516,14 @@ opal_common_ucx_wpmem_fetch_nb(opal_common_ucx_wpmem_t *mem,
427516
}
428517
}
429518

519+
rc = _periodical_flush_nb(mem, winfo, target);
520+
if(OPAL_UNLIKELY(OPAL_SUCCESS != rc)){
521+
MCA_COMMON_UCX_VERBOSE(1, "_incr_and_check_inflight_ops failed: %d", rc);
522+
return rc;
523+
}
524+
525+
opal_mutex_unlock(&winfo->mutex);
526+
430527
return rc;
431528
}
432529

0 commit comments

Comments
 (0)