@@ -25,7 +25,7 @@ mca_btl_base_descriptor_t *mca_btl_uct_alloc (mca_btl_base_module_t *btl, mca_bt
25
25
mca_btl_uct_module_t * uct_btl = (mca_btl_uct_module_t * ) btl ;
26
26
mca_btl_uct_base_frag_t * frag = NULL ;
27
27
28
- if (( size + 8 ) <= (size_t ) MCA_BTL_UCT_TL_ATTR (uct_btl -> am_tl , 0 ).cap .am .max_short ) {
28
+ if (size <= (size_t ) MCA_BTL_UCT_TL_ATTR (uct_btl -> am_tl , 0 ).cap .am .max_short ) {
29
29
frag = mca_btl_uct_frag_alloc_short (uct_btl , endpoint );
30
30
} else if (size <= uct_btl -> super .btl_eager_limit ) {
31
31
frag = mca_btl_uct_frag_alloc_eager (uct_btl , endpoint );
@@ -40,6 +40,10 @@ mca_btl_base_descriptor_t *mca_btl_uct_alloc (mca_btl_base_module_t *btl, mca_bt
40
40
frag -> base .des_flags = flags ;
41
41
frag -> base .order = order ;
42
42
frag -> uct_iov .length = size ;
43
+ if (NULL != frag -> base .super .registration ) {
44
+ /* zero-copy fragments will need callbacks */
45
+ frag -> base .des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK ;
46
+ }
43
47
}
44
48
45
49
return (mca_btl_base_descriptor_t * ) frag ;
@@ -95,14 +99,18 @@ struct mca_btl_base_descriptor_t *mca_btl_uct_prepare_src (mca_btl_base_module_t
95
99
return NULL ;
96
100
}
97
101
102
+ frag -> uct_iov .length = total_size ;
98
103
frag -> base .order = order ;
99
104
frag -> base .des_flags = flags ;
100
105
if (total_size > (size_t ) MCA_BTL_UCT_TL_ATTR (uct_btl -> am_tl , 0 ).cap .am .max_short ) {
106
+ frag -> segments [0 ].seg_len = reserve ;
101
107
frag -> segments [1 ].seg_len = * size ;
102
108
frag -> segments [1 ].seg_addr .pval = data_ptr ;
103
109
frag -> base .des_segment_count = 2 ;
104
110
} else {
111
+ frag -> segments [0 ].seg_len = total_size ;
105
112
memcpy ((void * )((intptr_t ) frag -> segments [1 ].seg_addr .pval + reserve ), data_ptr , * size );
113
+ frag -> base .des_segment_count = 1 ;
106
114
}
107
115
}
108
116
@@ -130,7 +138,7 @@ static size_t mca_btl_uct_send_frag_pack (void *data, void *arg)
130
138
data = (void * )((intptr_t ) data + 8 );
131
139
132
140
/* this function should only ever get called with fragments with two segments */
133
- for (size_t i = 0 ; i < 2 ; ++ i ) {
141
+ for (size_t i = 0 ; i < frag -> base . des_segment_count ; ++ i ) {
134
142
const size_t seg_len = frag -> segments [i ].seg_len ;
135
143
memcpy (data , frag -> segments [i ].seg_addr .pval , seg_len );
136
144
data = (void * )((intptr_t ) data + seg_len );
@@ -140,57 +148,84 @@ static size_t mca_btl_uct_send_frag_pack (void *data, void *arg)
140
148
return length ;
141
149
}
142
150
143
- int mca_btl_uct_send_frag (mca_btl_uct_module_t * uct_btl , mca_btl_base_endpoint_t * endpoint , mca_btl_uct_base_frag_t * frag ,
144
- int32_t flags , mca_btl_uct_device_context_t * context , uct_ep_h ep_handle )
151
+ static void mca_btl_uct_append_pending_frag (mca_btl_uct_module_t * uct_btl , mca_btl_uct_base_frag_t * frag ,
152
+ mca_btl_uct_device_context_t * context , bool ready )
145
153
{
154
+ frag -> ready = ready ;
155
+ frag -> base .des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK ;
156
+ opal_atomic_wmb ();
157
+
158
+ opal_list_append (& uct_btl -> pending_frags , (opal_list_item_t * ) frag );
159
+ }
160
+
161
+ int mca_btl_uct_send_frag (mca_btl_uct_module_t * uct_btl , mca_btl_uct_base_frag_t * frag , bool append )
162
+ {
163
+ mca_btl_uct_device_context_t * context = frag -> context ;
164
+ const ssize_t msg_size = frag -> uct_iov .length + 8 ;
165
+ ssize_t size ;
146
166
ucs_status_t ucs_status ;
167
+ uct_ep_h ep_handle = NULL ;
147
168
148
- mca_btl_uct_context_lock (context );
169
+ /* if we get here then we must have an endpoint handle for this context/endpoint pair */
170
+ (void ) mca_btl_uct_endpoint_test_am (uct_btl , frag -> endpoint , frag -> context , & ep_handle );
171
+ assert (NULL != ep_handle );
149
172
150
- do {
173
+ /* if another thread set this we really don't care too much as this flag is only meant
174
+ * to protect against deep recursion */
175
+ if (!context -> in_am_callback ) {
176
+ mca_btl_uct_context_lock (context );
177
+ /* attempt to post the fragment */
151
178
if (NULL != frag -> base .super .registration ) {
152
179
frag -> comp .dev_context = context ;
153
-
154
180
ucs_status = uct_ep_am_zcopy (ep_handle , MCA_BTL_UCT_FRAG , & frag -> header , sizeof (frag -> header ),
155
181
& frag -> uct_iov , 1 , 0 , & frag -> comp .uct_comp );
182
+
183
+ if (OPAL_LIKELY (UCS_INPROGRESS == ucs_status )) {
184
+ uct_worker_progress (context -> uct_worker );
185
+ mca_btl_uct_context_unlock (context );
186
+ return OPAL_SUCCESS ;
187
+ }
156
188
} else {
157
189
/* short message */
158
- /* restore original flags */
159
- frag -> base .des_flags = flags ;
160
-
161
- if (1 == frag -> base .des_segment_count ) {
190
+ if (1 == frag -> base .des_segment_count && (frag -> uct_iov .length + 8 ) < MCA_BTL_UCT_TL_ATTR (uct_btl -> am_tl , 0 ).cap .am .max_short ) {
162
191
ucs_status = uct_ep_am_short (ep_handle , MCA_BTL_UCT_FRAG , frag -> header .value , frag -> uct_iov .buffer ,
163
192
frag -> uct_iov .length );
164
- } else {
165
- ucs_status = uct_ep_am_bcopy (ep_handle , MCA_BTL_UCT_FRAG , mca_btl_uct_send_frag_pack , frag , 0 );
193
+
194
+ if (OPAL_LIKELY (UCS_OK == ucs_status )) {
195
+ uct_worker_progress (context -> uct_worker );
196
+ mca_btl_uct_context_unlock (context );
197
+ /* send is complete */
198
+ mca_btl_uct_frag_complete (frag , OPAL_SUCCESS );
199
+ return 1 ;
200
+ }
166
201
}
167
- }
168
202
169
- if (UCS_ERR_NO_RESOURCE != ucs_status ) {
170
- /* go ahead and progress the worker while we have the lock */
171
- (void ) uct_worker_progress (context -> uct_worker );
172
- break ;
203
+ size = uct_ep_am_bcopy (ep_handle , MCA_BTL_UCT_FRAG , mca_btl_uct_send_frag_pack , frag , 0 );
204
+ if (OPAL_LIKELY (size == msg_size )) {
205
+ uct_worker_progress (context -> uct_worker );
206
+ mca_btl_uct_context_unlock (context );
207
+ /* send is complete */
208
+ mca_btl_uct_frag_complete (frag , OPAL_SUCCESS );
209
+ return 1 ;
210
+ }
173
211
}
174
212
175
- /* wait for something to complete before trying again */
176
- while (!uct_worker_progress (context -> uct_worker ));
177
- } while (1 );
178
-
179
- mca_btl_uct_context_unlock (context );
213
+ /* wait for something to happen */
214
+ uct_worker_progress (context -> uct_worker );
215
+ mca_btl_uct_context_unlock (context );
180
216
181
- if (UCS_OK == ucs_status ) {
182
- /* restore original flags */
183
- frag -> base .des_flags = flags ;
184
- /* send is complete */
185
- mca_btl_uct_frag_complete (frag , OPAL_SUCCESS );
186
- return 1 ;
217
+ mca_btl_uct_device_handle_completions (context );
187
218
}
188
219
189
- if (OPAL_UNLIKELY ( UCS_INPROGRESS != ucs_status ) ) {
220
+ if (! append ) {
190
221
return OPAL_ERR_OUT_OF_RESOURCE ;
191
222
}
192
223
193
- return 0 ;
224
+ OPAL_THREAD_LOCK (& uct_btl -> lock );
225
+ mca_btl_uct_append_pending_frag (uct_btl , frag , context , true);
226
+ OPAL_THREAD_UNLOCK (& uct_btl -> lock );
227
+
228
+ return OPAL_SUCCESS ;
194
229
}
195
230
196
231
int mca_btl_uct_send (mca_btl_base_module_t * btl , mca_btl_base_endpoint_t * endpoint , mca_btl_base_descriptor_t * descriptor ,
@@ -199,7 +234,6 @@ int mca_btl_uct_send (mca_btl_base_module_t *btl, mca_btl_base_endpoint_t *endpo
199
234
mca_btl_uct_module_t * uct_btl = (mca_btl_uct_module_t * ) btl ;
200
235
mca_btl_uct_device_context_t * context = mca_btl_uct_module_get_am_context (uct_btl );
201
236
mca_btl_uct_base_frag_t * frag = (mca_btl_uct_base_frag_t * ) descriptor ;
202
- int flags = frag -> base .des_flags ;
203
237
uct_ep_h ep_handle ;
204
238
int rc ;
205
239
@@ -208,28 +242,21 @@ int mca_btl_uct_send (mca_btl_base_module_t *btl, mca_btl_base_endpoint_t *endpo
208
242
209
243
210
244
frag -> header .data .tag = tag ;
211
-
212
- /* add the callback flag before posting to avoid potential races with other threads */
213
- frag -> base .des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK ;
245
+ frag -> context = context ;
214
246
215
247
rc = mca_btl_uct_endpoint_check_am (uct_btl , endpoint , context , & ep_handle );
216
248
if (OPAL_UNLIKELY (OPAL_SUCCESS != rc )) {
217
- OPAL_THREAD_LOCK (& endpoint -> ep_lock );
249
+ OPAL_THREAD_LOCK (& uct_btl -> lock );
218
250
/* check one more time in case another thread is completing the connection now */
219
251
if (OPAL_SUCCESS != mca_btl_uct_endpoint_test_am (uct_btl , endpoint , context , & ep_handle )) {
220
- frag -> context_id = context -> context_id ;
221
- frag -> ready = false;
222
- OPAL_THREAD_LOCK (& uct_btl -> lock );
223
- opal_list_append (& uct_btl -> pending_frags , (opal_list_item_t * ) frag );
224
- OPAL_THREAD_UNLOCK (& endpoint -> ep_lock );
252
+ mca_btl_uct_append_pending_frag (uct_btl , frag , context , false);
225
253
OPAL_THREAD_UNLOCK (& uct_btl -> lock );
226
-
227
254
return OPAL_SUCCESS ;
228
255
}
229
- OPAL_THREAD_UNLOCK (& endpoint -> ep_lock );
256
+ OPAL_THREAD_UNLOCK (& uct_btl -> lock );
230
257
}
231
258
232
- return mca_btl_uct_send_frag (uct_btl , endpoint , frag , flags , context , ep_handle );
259
+ return mca_btl_uct_send_frag (uct_btl , frag , true );
233
260
}
234
261
235
262
struct mca_btl_uct_sendi_pack_args_t {
@@ -255,9 +282,7 @@ static size_t mca_btl_uct_sendi_pack (void *data, void *arg)
255
282
256
283
static inline size_t mca_btl_uct_max_sendi (mca_btl_uct_module_t * uct_btl , int context_id )
257
284
{
258
- const mca_btl_uct_tl_t * tl = uct_btl -> am_tl ;
259
- return (MCA_BTL_UCT_TL_ATTR (tl , context_id ).cap .am .max_short > MCA_BTL_UCT_TL_ATTR (tl , context_id ).cap .am .max_bcopy ) ?
260
- MCA_BTL_UCT_TL_ATTR (tl , context_id ).cap .am .max_short : MCA_BTL_UCT_TL_ATTR (tl , context_id ).cap .am .max_bcopy ;
285
+ return MCA_BTL_UCT_TL_ATTR (uct_btl -> am_tl , context_id ).cap .am .max_bcopy ;
261
286
}
262
287
263
288
int mca_btl_uct_sendi (mca_btl_base_module_t * btl , mca_btl_base_endpoint_t * endpoint , opal_convertor_t * convertor ,
@@ -270,7 +295,7 @@ int mca_btl_uct_sendi (mca_btl_base_module_t *btl, mca_btl_base_endpoint_t *endp
270
295
/* message with header */
271
296
const size_t msg_size = total_size + 8 ;
272
297
mca_btl_uct_am_header_t am_header ;
273
- ucs_status_t ucs_status = UCS_OK ;
298
+ ucs_status_t ucs_status = UCS_ERR_NO_RESOURCE ;
274
299
uct_ep_h ep_handle ;
275
300
int rc ;
276
301
0 commit comments