@@ -77,7 +77,8 @@ mca_spml_ucx_t mca_spml_ucx = {
77
77
.num_disconnect = 1 ,
78
78
.heap_reg_nb = 0 ,
79
79
.enabled = 0 ,
80
- .get_mkey_slow = NULL
80
+ .get_mkey_slow = NULL ,
81
+ .synchronized_quiet = false
81
82
};
82
83
83
84
mca_spml_ucx_ctx_t mca_spml_ucx_ctx_default = {
@@ -213,6 +214,40 @@ static void dump_address(int pe, char *addr, size_t len)
213
214
214
215
static char spml_ucx_transport_ids [1 ] = { 0 };
215
216
217
+ int mca_spml_ucx_init_put_op_mask (mca_spml_ucx_ctx_t * ctx , size_t nprocs )
218
+ {
219
+ int res ;
220
+
221
+ if (mca_spml_ucx .synchronized_quiet ) {
222
+ ctx -> put_proc_indexes = malloc (nprocs * sizeof (* ctx -> put_proc_indexes ));
223
+ if (NULL == ctx -> put_proc_indexes ) {
224
+ return OSHMEM_ERR_OUT_OF_RESOURCE ;
225
+ }
226
+
227
+ OBJ_CONSTRUCT (& ctx -> put_op_bitmap , opal_bitmap_t );
228
+ res = opal_bitmap_init (& ctx -> put_op_bitmap , nprocs );
229
+ if (OPAL_SUCCESS != res ) {
230
+ free (ctx -> put_proc_indexes );
231
+ ctx -> put_proc_indexes = NULL ;
232
+ return res ;
233
+ }
234
+
235
+ ctx -> put_proc_count = 0 ;
236
+ }
237
+
238
+ return OSHMEM_SUCCESS ;
239
+ }
240
+
241
+ int mca_spml_ucx_clear_put_op_mask (mca_spml_ucx_ctx_t * ctx )
242
+ {
243
+ if (mca_spml_ucx .synchronized_quiet && ctx -> put_proc_indexes ) {
244
+ OBJ_DESTRUCT (& ctx -> put_op_bitmap );
245
+ free (ctx -> put_proc_indexes );
246
+ }
247
+
248
+ return OSHMEM_SUCCESS ;
249
+ }
250
+
216
251
int mca_spml_ucx_add_procs (ompi_proc_t * * procs , size_t nprocs )
217
252
{
218
253
size_t i , j , n ;
@@ -232,6 +267,11 @@ int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs)
232
267
goto error ;
233
268
}
234
269
270
+ rc = mca_spml_ucx_init_put_op_mask (& mca_spml_ucx_ctx_default , nprocs );
271
+ if (OSHMEM_SUCCESS != rc ) {
272
+ goto error ;
273
+ }
274
+
235
275
err = ucp_worker_get_address (mca_spml_ucx_ctx_default .ucp_worker , & wk_local_addr , & wk_addr_len );
236
276
if (err != UCS_OK ) {
237
277
goto error ;
@@ -294,6 +334,8 @@ int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs)
294
334
free (mca_spml_ucx .remote_addrs_tbl [i ]);
295
335
}
296
336
}
337
+
338
+ mca_spml_ucx_clear_put_op_mask (& mca_spml_ucx_ctx_default );
297
339
if (mca_spml_ucx_ctx_default .ucp_peers )
298
340
free (mca_spml_ucx_ctx_default .ucp_peers );
299
341
if (mca_spml_ucx .remote_addrs_tbl )
@@ -581,6 +623,11 @@ static int mca_spml_ucx_ctx_create_common(long options, mca_spml_ucx_ctx_t **ucx
581
623
goto error ;
582
624
}
583
625
626
+ rc = mca_spml_ucx_init_put_op_mask (ucx_ctx , nprocs );
627
+ if (OSHMEM_SUCCESS != rc ) {
628
+ goto error2 ;
629
+ }
630
+
584
631
for (i = 0 ; i < nprocs ; i ++ ) {
585
632
ep_params .field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS ;
586
633
ep_params .address = (ucp_address_t * )(mca_spml_ucx .remote_addrs_tbl [i ]);
@@ -619,6 +666,8 @@ static int mca_spml_ucx_ctx_create_common(long options, mca_spml_ucx_ctx_t **ucx
619
666
}
620
667
}
621
668
669
+ mca_spml_ucx_clear_put_op_mask (ucx_ctx );
670
+
622
671
if (ucx_ctx -> ucp_peers )
623
672
free (ucx_ctx -> ucp_peers );
624
673
@@ -713,6 +762,7 @@ int mca_spml_ucx_put(shmem_ctx_t ctx, void* dst_addr, size_t size, void* src_add
713
762
void * rva ;
714
763
spml_ucx_mkey_t * ucx_mkey ;
715
764
mca_spml_ucx_ctx_t * ucx_ctx = (mca_spml_ucx_ctx_t * )ctx ;
765
+ int res ;
716
766
#if HAVE_DECL_UCP_PUT_NB
717
767
ucs_status_ptr_t request ;
718
768
#else
@@ -723,12 +773,18 @@ int mca_spml_ucx_put(shmem_ctx_t ctx, void* dst_addr, size_t size, void* src_add
723
773
#if HAVE_DECL_UCP_PUT_NB
724
774
request = ucp_put_nb (ucx_ctx -> ucp_peers [dst ].ucp_conn , src_addr , size ,
725
775
(uint64_t )rva , ucx_mkey -> rkey , opal_common_ucx_empty_complete_cb );
726
- return opal_common_ucx_wait_request (request , ucx_ctx -> ucp_worker , "ucp_put_nb" );
776
+ res = opal_common_ucx_wait_request (request , ucx_ctx -> ucp_worker , "ucp_put_nb" );
727
777
#else
728
778
status = ucp_put (ucx_ctx -> ucp_peers [dst ].ucp_conn , src_addr , size ,
729
779
(uint64_t )rva , ucx_mkey -> rkey );
730
- return ucx_status_to_oshmem (status );
780
+ res = ucx_status_to_oshmem (status );
731
781
#endif
782
+
783
+ if (OPAL_LIKELY (OSHMEM_SUCCESS == res )) {
784
+ mca_spml_ucx_remote_op_posted (ucx_ctx , dst );
785
+ }
786
+
787
+ return res ;
732
788
}
733
789
734
790
int mca_spml_ucx_put_nb (shmem_ctx_t ctx , void * dst_addr , size_t size , void * src_addr , int dst , void * * handle )
@@ -742,6 +798,10 @@ int mca_spml_ucx_put_nb(shmem_ctx_t ctx, void* dst_addr, size_t size, void* src_
742
798
status = ucp_put_nbi (ucx_ctx -> ucp_peers [dst ].ucp_conn , src_addr , size ,
743
799
(uint64_t )rva , ucx_mkey -> rkey );
744
800
801
+ if (OPAL_LIKELY (status >= 0 )) {
802
+ mca_spml_ucx_remote_op_posted (ucx_ctx , dst );
803
+ }
804
+
745
805
return ucx_status_to_oshmem_nb (status );
746
806
}
747
807
@@ -765,9 +825,28 @@ int mca_spml_ucx_fence(shmem_ctx_t ctx)
765
825
766
826
int mca_spml_ucx_quiet (shmem_ctx_t ctx )
767
827
{
828
+ int flush_get_data ;
768
829
int ret ;
830
+ unsigned i ;
831
+ int idx ;
769
832
mca_spml_ucx_ctx_t * ucx_ctx = (mca_spml_ucx_ctx_t * )ctx ;
770
833
834
+ if (mca_spml_ucx .synchronized_quiet ) {
835
+ for (i = 0 ; i < ucx_ctx -> put_proc_count ; i ++ ) {
836
+ idx = ucx_ctx -> put_proc_indexes [i ];
837
+ ret = mca_spml_ucx_get_nb (ctx ,
838
+ ucx_ctx -> ucp_peers [idx ].mkeys -> super .super .va_base ,
839
+ sizeof (flush_get_data ), & flush_get_data , idx , NULL );
840
+ if (OMPI_SUCCESS != ret ) {
841
+ oshmem_shmem_abort (-1 );
842
+ return ret ;
843
+ }
844
+
845
+ opal_bitmap_clear_bit (& ucx_ctx -> put_op_bitmap , idx );
846
+ }
847
+ ucx_ctx -> put_proc_count = 0 ;
848
+ }
849
+
771
850
opal_atomic_wmb ();
772
851
773
852
ret = opal_common_ucx_worker_flush (ucx_ctx -> ucp_worker );
0 commit comments