Skip to content

modifying osc mt code #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 68 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
ac2651a
temp enable btl uct
xinzhao3 Nov 9, 2018
4ad6e0b
add data structures
xinzhao3 Nov 12, 2018
34153d3
init/finalize functions
xinzhao3 Nov 12, 2018
817cb28
change datastruct
artpol84 Nov 12, 2018
3c0fea3
Merge pull request #4 from artpol84/osc/mt_v2
xinzhao3 Nov 12, 2018
366ab6a
data structure
xinzhao3 Nov 12, 2018
3075638
init/finalize funcs
xinzhao3 Nov 12, 2018
d3926f7
Add Operation function
artpol84 Nov 12, 2018
c7c2f68
Merge pull request #5 from artpol84/osc/mt_v2
xinzhao3 Nov 12, 2018
a6d75bc
Add flush function
artpol84 Nov 12, 2018
bc9057a
Merge pull request #6 from artpol84/osc/mt_v2
xinzhao3 Nov 12, 2018
113f2ff
modify mem_create
xinzhao3 Nov 12, 2018
2f1ac68
Intermediate fixes
artpol84 Nov 12, 2018
ccfb889
Merge pull request #7 from artpol84/osc/mt_v2
xinzhao3 Nov 12, 2018
3838e72
modify on func signature
xinzhao3 Nov 12, 2018
78b31f5
modify on wpool_init
xinzhao3 Nov 12, 2018
8eeb324
Bring everything to a "builds OK" state
artpol84 Nov 13, 2018
03cff67
Merge pull request #8 from artpol84/osc/mt_v2
xinzhao3 Nov 13, 2018
a60c39f
common code change
xinzhao3 Nov 12, 2018
1e82809
osc changes
xinzhao3 Nov 12, 2018
6446948
OSC builds as well.
artpol84 Nov 13, 2018
ca8fba7
Merge pull request #9 from artpol84/osc/mt_v2
xinzhao3 Nov 13, 2018
4c10a49
fix
artpol84 Nov 13, 2018
887bdd4
Merge pull request #10 from artpol84/osc/mt_v2
xinzhao3 Nov 13, 2018
9ad3145
fix #2
artpol84 Nov 13, 2018
777635e
Merge pull request #11 from artpol84/osc/mt_v2
xinzhao3 Nov 13, 2018
0c962a5
fix
xinzhao3 Nov 13, 2018
556875e
fix
xinzhao3 Nov 13, 2018
d3ebdc3
add printfs
xinzhao3 Nov 16, 2018
091972c
fix
xinzhao3 Nov 16, 2018
24623c9
add printf msg
xinzhao3 Nov 16, 2018
836ec01
TEST
xinzhao3 Nov 16, 2018
fbadd86
fix dbg out
xinzhao3 Nov 17, 2018
a0755fe
fix dbg out
xinzhao3 Nov 17, 2018
c910483
some fixes
xinzhao3 Nov 17, 2018
b57c86f
fix
xinzhao3 Nov 17, 2018
bacf303
output
xinzhao3 Nov 17, 2018
8c79a72
Merge branch 'topic/osc-mt' of github.com:xinzhao3/ompi into topic/os…
xinzhao3 Nov 17, 2018
cefad21
fixes
xinzhao3 Nov 17, 2018
13caecf
fixes
xinzhao3 Nov 17, 2018
32a1a26
fix on atomic fadd
xinzhao3 Nov 17, 2018
c27fb70
fixes
xinzhao3 Nov 17, 2018
1a3b559
fix
xinzhao3 Nov 17, 2018
f0c2e15
fix
xinzhao3 Nov 17, 2018
5410d82
FIX
xinzhao3 Nov 17, 2018
b57a3b7
fix
xinzhao3 Nov 17, 2018
c80b2ff
fix
xinzhao3 Nov 17, 2018
3006241
fix
xinzhao3 Nov 17, 2018
228b037
add dbg_output to osc code
xinzhao3 Nov 19, 2018
c46af4f
fixes
xinzhao3 Nov 19, 2018
f264ad3
add timestep
xinzhao3 Nov 19, 2018
d251dac
fix on timestep
xinzhao3 Nov 19, 2018
7e1d6e0
fix
xinzhao3 Nov 19, 2018
0803880
fixes
xinzhao3 Nov 19, 2018
6bc9762
modify on win_free
xinzhao3 Nov 19, 2018
66f85a3
fixes on win_free
xinzhao3 Nov 19, 2018
bf023db
fix on osc_win_free
xinzhao3 Nov 19, 2018
94e0c8a
fixes
xinzhao3 Nov 19, 2018
676839e
fixes
xinzhao3 Nov 19, 2018
c54b7a4
fixes
xinzhao3 Nov 19, 2018
53d81f0
fixes
xinzhao3 Nov 20, 2018
3d212a3
fix
xinzhao3 Nov 20, 2018
492417e
fix race
xinzhao3 Nov 20, 2018
755bf46
fixes
xinzhao3 Nov 20, 2018
6673f47
fixes on worker progress
xinzhao3 Nov 20, 2018
c5df657
fixes
xinzhao3 Nov 20, 2018
989d86f
modify on osc comm code
xinzhao3 Nov 21, 2018
e553e24
fixes on osc code
xinzhao3 Nov 21, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/platform/mellanox/optimized
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
enable_mca_no_build=coll-ml,btl-uct
enable_mca_no_build=coll-ml
enable_debug_symbols=yes
enable_orterun_prefix_by_default=yes
with_verbs=no
Expand Down
18 changes: 6 additions & 12 deletions ompi/mca/osc/ucx/osc_ucx.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,9 @@
#define OMPI_OSC_UCX_ATTACH_MAX 32
#define OMPI_OSC_UCX_RKEY_BUF_MAX 1024

typedef struct ompi_osc_ucx_win_info {
ucp_rkey_h rkey;
uint64_t addr;
bool rkey_init;
} ompi_osc_ucx_win_info_t;

typedef struct ompi_osc_ucx_component {
ompi_osc_base_component_t super;
ucp_context_h ucp_context;
ucp_worker_h ucp_worker;
opal_common_ucx_wpool_t *wpool;
bool enable_mpi_threads;
opal_free_list_t requests; /* request free list for the r* communication variants */
bool env_initialized; /* UCX environment is initialized or not */
Expand Down Expand Up @@ -97,12 +90,10 @@ typedef struct ompi_osc_ucx_state {
typedef struct ompi_osc_ucx_module {
ompi_osc_base_module_t super;
struct ompi_communicator_t *comm;
ucp_mem_h memh; /* remote accessible memory */
int flavor;
size_t size;
ucp_mem_h state_memh;
ompi_osc_ucx_win_info_t *win_info_array;
ompi_osc_ucx_win_info_t *state_info_array;
uint64_t *addrs;
uint64_t *state_addrs;
int disp_unit; /* if disp_unit >= 0, then everyone has the same
* disp unit size; if disp_unit == -1, then we
* need to look at disp_units */
Expand All @@ -122,6 +113,9 @@ typedef struct ompi_osc_ucx_module {
uint64_t req_result;
int *start_grp_ranks;
bool lock_all_is_nocheck;
opal_common_ucx_ctx_t *ctx;
opal_common_ucx_mem_t *mem;
opal_common_ucx_mem_t *state_mem;
} ompi_osc_ucx_module_t;

typedef enum locktype {
Expand Down
58 changes: 31 additions & 27 deletions ompi/mca/osc/ucx/osc_ucx_active_target.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ static inline void ompi_osc_ucx_handle_incoming_post(ompi_osc_ucx_module_t *modu

int ompi_osc_ucx_fence(int assert, struct ompi_win_t *win) {
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
int ret;
int ret = OMPI_SUCCESS;

if (module->epoch_type.access != NONE_EPOCH &&
module->epoch_type.access != FENCE_EPOCH) {
Expand All @@ -74,7 +74,7 @@ int ompi_osc_ucx_fence(int assert, struct ompi_win_t *win) {
}

if (!(assert & MPI_MODE_NOPRECEDE)) {
ret = opal_common_ucx_worker_flush(mca_osc_ucx_component.ucp_worker);
ret = opal_common_ucx_mem_flush(module->mem, OPAL_COMMON_UCX_SCOPE_WORKER, 0/*ignore*/);
if (ret != OMPI_SUCCESS) {
return ret;
}
Expand Down Expand Up @@ -147,7 +147,7 @@ int ompi_osc_ucx_start(struct ompi_group_t *group, int assert, struct ompi_win_t

ompi_osc_ucx_handle_incoming_post(module, &(module->state.post_state[i]), ranks_in_win_grp, size);
}
ucp_worker_progress(mca_osc_ucx_component.ucp_worker);
opal_common_ucx_workers_progress(mca_osc_ucx_component.wpool);
}

module->post_count = 0;
Expand All @@ -163,7 +163,6 @@ int ompi_osc_ucx_start(struct ompi_group_t *group, int assert, struct ompi_win_t

int ompi_osc_ucx_complete(struct ompi_win_t *win) {
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
ucs_status_t status;
int i, size;
int ret = OMPI_SUCCESS;

Expand All @@ -173,29 +172,30 @@ int ompi_osc_ucx_complete(struct ompi_win_t *win) {

module->epoch_type.access = NONE_EPOCH;

ret = opal_common_ucx_worker_flush(mca_osc_ucx_component.ucp_worker);
ret = opal_common_ucx_mem_flush(module->mem, OPAL_COMMON_UCX_SCOPE_WORKER, 0/*ignore*/);
if (ret != OMPI_SUCCESS) {
return ret;
}

module->global_ops_num = 0;
memset(module->per_target_ops_nums, 0,
sizeof(int) * ompi_comm_size(module->comm));

size = ompi_group_size(module->start_group);
for (i = 0; i < size; i++) {
uint64_t remote_addr = (module->state_info_array)[module->start_grp_ranks[i]].addr + OSC_UCX_STATE_COMPLETE_COUNT_OFFSET; /* write to state.complete_count on remote side */
ucp_rkey_h rkey = (module->state_info_array)[module->start_grp_ranks[i]].rkey;
ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, module->start_grp_ranks[i]);

status = ucp_atomic_post(ep, UCP_ATOMIC_POST_OP_ADD, 1,
sizeof(uint64_t), remote_addr, rkey);
if (status != UCS_OK) {
OSC_UCX_VERBOSE(1, "ucp_atomic_post failed: %d", status);
uint64_t remote_addr = module->state_addrs[module->start_grp_ranks[i]] + OSC_UCX_STATE_COMPLETE_COUNT_OFFSET; // write to state.complete_count on remote side

ret = opal_common_ucx_mem_post(module->mem, UCP_ATOMIC_POST_OP_ADD,
1, module->start_grp_ranks[i], sizeof(uint64_t),
remote_addr);
if (ret != OMPI_SUCCESS) {
OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_post failed: %d", ret);
}

ret = opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker);
if (OMPI_SUCCESS != ret) {
OSC_UCX_VERBOSE(1, "opal_common_ucx_ep_flush failed: %d", ret);
ret = opal_common_ucx_mem_flush(module->mem, OPAL_COMMON_UCX_SCOPE_EP,
module->start_grp_ranks[i]);
if (ret != OMPI_SUCCESS) {
return ret;
}
}

Expand Down Expand Up @@ -243,25 +243,29 @@ int ompi_osc_ucx_post(struct ompi_group_t *group, int assert, struct ompi_win_t
}

for (i = 0; i < size; i++) {
uint64_t remote_addr = (module->state_info_array)[ranks_in_win_grp[i]].addr + OSC_UCX_STATE_POST_INDEX_OFFSET; /* write to state.post_index on remote side */
ucp_rkey_h rkey = (module->state_info_array)[ranks_in_win_grp[i]].rkey;
ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, ranks_in_win_grp[i]);
uint64_t remote_addr = module->state_addrs[ranks_in_win_grp[i]] + OSC_UCX_STATE_POST_INDEX_OFFSET; // write to state.post_index on remote side
uint64_t curr_idx = 0, result = 0;

/* do fop first to get an post index */
opal_common_ucx_atomic_fetch(ep, UCP_ATOMIC_FETCH_OP_FADD, 1,
&result, sizeof(result),
remote_addr, rkey, mca_osc_ucx_component.ucp_worker);
ret = opal_common_ucx_mem_fetch(module->mem, UCP_ATOMIC_FETCH_OP_FADD,
1, ranks_in_win_grp[i], &result,
sizeof(result), remote_addr);
if (ret != OMPI_SUCCESS) {
return OMPI_ERROR;
}

curr_idx = result & (OMPI_OSC_UCX_POST_PEER_MAX - 1);

remote_addr = (module->state_info_array)[ranks_in_win_grp[i]].addr + OSC_UCX_STATE_POST_STATE_OFFSET + sizeof(uint64_t) * curr_idx;
remote_addr = module->state_addrs[ranks_in_win_grp[i]] + OSC_UCX_STATE_POST_STATE_OFFSET + sizeof(uint64_t) * curr_idx;

/* do cas to send post message */
do {
opal_common_ucx_atomic_cswap(ep, 0, (uint64_t)myrank + 1, &result,
sizeof(result), remote_addr, rkey,
mca_osc_ucx_component.ucp_worker);
ret = opal_common_ucx_mem_cmpswp(module->mem, 0, result,
myrank + 1, &result, sizeof(result),
remote_addr);
if (ret != OMPI_SUCCESS) {
return OMPI_ERROR;
}

if (result == 0)
break;
Expand Down Expand Up @@ -302,7 +306,7 @@ int ompi_osc_ucx_wait(struct ompi_win_t *win) {

while (module->state.complete_count != (uint64_t)size) {
/* not sure if this is required */
ucp_worker_progress(mca_osc_ucx_component.ucp_worker);
opal_common_ucx_workers_progress(mca_osc_ucx_component.wpool);
}

module->state.complete_count = 0;
Expand Down
Loading