14
14
15
15
#include <inttypes.h>
16
16
17
+ #include "aws/io/event_loop.h"
18
+
17
19
#ifdef _MSC_VER
18
20
/* allow declared initializer using address of automatic variable */
19
21
# pragma warning(disable : 4221)
@@ -28,6 +30,7 @@ struct aws_event_stream_rpc_client_connection {
28
30
struct aws_allocator * allocator ;
29
31
struct aws_hash_table continuation_table ;
30
32
struct aws_client_bootstrap * bootstrap_ref ;
33
+ struct aws_event_loop * event_loop ;
31
34
struct aws_atomic_var ref_count ;
32
35
struct aws_channel * channel ;
33
36
struct aws_channel_handler * event_stream_handler ;
@@ -39,6 +42,7 @@ struct aws_event_stream_rpc_client_connection {
39
42
aws_event_stream_rpc_client_on_connection_setup_fn * on_connection_setup ;
40
43
aws_event_stream_rpc_client_connection_protocol_message_fn * on_connection_protocol_message ;
41
44
aws_event_stream_rpc_client_on_connection_shutdown_fn * on_connection_shutdown ;
45
+ aws_event_stream_rpc_client_on_connection_terminated_fn * on_connection_terminated ;
42
46
void * user_data ;
43
47
bool bootstrap_owned ;
44
48
bool enable_read_back_pressure ;
@@ -49,6 +53,7 @@ struct aws_event_stream_rpc_client_continuation_token {
49
53
struct aws_event_stream_rpc_client_connection * connection ;
50
54
aws_event_stream_rpc_client_stream_continuation_fn * continuation_fn ;
51
55
aws_event_stream_rpc_client_stream_continuation_closed_fn * closed_fn ;
56
+ aws_event_stream_rpc_client_stream_continuation_terminated_fn * terminated_fn ;
52
57
void * user_data ;
53
58
struct aws_atomic_var ref_count ;
54
59
struct aws_atomic_var is_closed ;
@@ -187,9 +192,7 @@ static void s_on_channel_shutdown_fn(
187
192
aws_event_stream_rpc_client_connection_release (connection );
188
193
}
189
194
190
- /* Set each continuation's is_closed=true.
191
- * A lock MUST be held while calling this.
192
- * For use with aws_hash_table_foreach(). */
195
+ /* Set each continuation's is_closed=true. */
193
196
static int s_mark_each_continuation_closed (void * context , struct aws_hash_element * p_element ) {
194
197
(void )context ;
195
198
struct aws_event_stream_rpc_client_continuation_token * continuation = p_element -> value ;
@@ -233,18 +236,31 @@ static int s_complete_and_clear_each_continuation(void *context, struct aws_hash
233
236
static void s_clear_continuation_table (struct aws_event_stream_rpc_client_connection * connection ) {
234
237
AWS_ASSERT (!aws_event_stream_rpc_client_connection_is_open (connection ));
235
238
239
+ struct aws_hash_table temp_table ;
240
+ aws_hash_table_init (
241
+ & temp_table ,
242
+ connection -> allocator ,
243
+ 64 ,
244
+ aws_event_stream_rpc_hash_streamid ,
245
+ aws_event_stream_rpc_streamid_eq ,
246
+ NULL ,
247
+ NULL );
248
+
236
249
/* Use lock to ensure synchronization with code that adds entries to table.
237
250
* Since connection was just marked closed, no further entries will be
238
- * added to table once we acquire the lock. */
251
+ * added to table once we acquire the lock.
252
+ *
253
+ * While no further entries can be added, there are concurrent execution paths where things can be
254
+ * removed. So rather than iterating the connection's table, swap it out for an empty one and iterate
255
+ * the temporary table instead. Removing from an empty table will be harmless.
256
+ */
239
257
aws_mutex_lock (& connection -> stream_lock );
240
- aws_hash_table_foreach ( & connection -> continuation_table , s_mark_each_continuation_closed , NULL );
258
+ aws_hash_table_swap ( & temp_table , & connection -> continuation_table );
241
259
aws_mutex_unlock (& connection -> stream_lock );
242
260
243
- /* Now release lock before invoking callbacks.
244
- * It's safe to alter the table now without a lock, since no further
245
- * entries can be added, and we've gone through the critical section
246
- * above to ensure synchronization */
247
- aws_hash_table_foreach (& connection -> continuation_table , s_complete_and_clear_each_continuation , NULL );
261
+ aws_hash_table_foreach (& temp_table , s_mark_each_continuation_closed , NULL );
262
+ aws_hash_table_foreach (& temp_table , s_complete_and_clear_each_continuation , NULL );
263
+ aws_hash_table_clean_up (& temp_table );
248
264
}
249
265
250
266
int aws_event_stream_rpc_client_connection_connect (
@@ -268,6 +284,8 @@ int aws_event_stream_rpc_client_connection_connect(
268
284
connection -> allocator = allocator ;
269
285
aws_atomic_init_int (& connection -> ref_count , 1 );
270
286
connection -> bootstrap_ref = conn_options -> bootstrap ;
287
+ connection -> event_loop = aws_event_loop_group_get_next_loop (connection -> bootstrap_ref -> event_loop_group );
288
+
271
289
/* this is released in the connection release which gets called regardless of if this function is successful or
272
290
* not*/
273
291
aws_client_bootstrap_acquire (connection -> bootstrap_ref );
@@ -276,6 +294,7 @@ int aws_event_stream_rpc_client_connection_connect(
276
294
aws_mutex_init (& connection -> stream_lock );
277
295
278
296
connection -> on_connection_shutdown = conn_options -> on_connection_shutdown ;
297
+ connection -> on_connection_terminated = conn_options -> on_connection_terminated ;
279
298
connection -> on_connection_protocol_message = conn_options -> on_connection_protocol_message ;
280
299
connection -> on_connection_setup = conn_options -> on_connection_setup ;
281
300
connection -> user_data = conn_options -> user_data ;
@@ -307,6 +326,7 @@ int aws_event_stream_rpc_client_connection_connect(
307
326
.enable_read_back_pressure = false,
308
327
.setup_callback = s_on_channel_setup_fn ,
309
328
.shutdown_callback = s_on_channel_shutdown_fn ,
329
+ .requested_event_loop = connection -> event_loop ,
310
330
};
311
331
312
332
if (aws_client_bootstrap_new_socket_channel (& bootstrap_options )) {
@@ -340,7 +360,15 @@ static void s_destroy_connection(struct aws_event_stream_rpc_client_connection *
340
360
AWS_LOGF_DEBUG (AWS_LS_EVENT_STREAM_RPC_CLIENT , "id=%p: destroying connection." , (void * )connection );
341
361
aws_hash_table_clean_up (& connection -> continuation_table );
342
362
aws_client_bootstrap_release (connection -> bootstrap_ref );
363
+
364
+ aws_event_stream_rpc_client_on_connection_terminated_fn * terminated_fn = connection -> on_connection_terminated ;
365
+ void * terminated_user_data = connection -> user_data ;
366
+
343
367
aws_mem_release (connection -> allocator , connection );
368
+
369
+ if (terminated_fn ) {
370
+ terminated_fn (terminated_user_data );
371
+ }
344
372
}
345
373
346
374
void aws_event_stream_rpc_client_connection_release (const struct aws_event_stream_rpc_client_connection * connection ) {
@@ -434,13 +462,19 @@ static void s_on_protocol_message_written_fn(
434
462
AWS_FATAL_ASSERT (message_args -> continuation && "end stream flag was set but it wasn't on a continuation" );
435
463
aws_atomic_store_int (& message_args -> continuation -> is_closed , 1U );
436
464
465
+ int was_present = 0 ;
437
466
aws_mutex_lock (& message_args -> connection -> stream_lock );
438
467
aws_hash_table_remove (
439
- & message_args -> connection -> continuation_table , & message_args -> continuation -> stream_id , NULL , NULL );
468
+ & message_args -> connection -> continuation_table , & message_args -> continuation -> stream_id , NULL , & was_present );
440
469
aws_mutex_unlock (& message_args -> connection -> stream_lock );
441
470
442
- /* Lock must NOT be held while invoking callback */
443
- s_complete_continuation (message_args -> continuation );
471
+ /*
472
+ * Whoever successfully removes the continuation from the table gets to complete it.
473
+ * Lock must NOT be held while invoking callback
474
+ */
475
+ if (was_present ) {
476
+ s_complete_continuation (message_args -> continuation );
477
+ }
444
478
}
445
479
446
480
message_args -> flush_fn (error_code , message_args -> user_data );
@@ -770,7 +804,6 @@ static void s_route_message_by_type(
770
804
aws_mutex_unlock (& connection -> stream_lock );
771
805
772
806
continuation -> continuation_fn (continuation , & message_args , continuation -> user_data );
773
- aws_event_stream_rpc_client_continuation_release (continuation );
774
807
775
808
/* if it was a terminal stream message purge it from the hash table. The delete will decref the continuation. */
776
809
if (message_flags & AWS_EVENT_STREAM_RPC_MESSAGE_FLAG_TERMINATE_STREAM ) {
@@ -780,13 +813,21 @@ static void s_route_message_by_type(
780
813
(void * )connection ,
781
814
(void * )continuation );
782
815
aws_atomic_store_int (& continuation -> is_closed , 1U );
816
+ int was_present = 0 ;
783
817
aws_mutex_lock (& connection -> stream_lock );
784
- aws_hash_table_remove (& connection -> continuation_table , & stream_id , NULL , NULL );
818
+ aws_hash_table_remove (& connection -> continuation_table , & stream_id , NULL , & was_present );
785
819
aws_mutex_unlock (& connection -> stream_lock );
786
820
787
- /* Note that we do not invoke callback while holding lock */
788
- s_complete_continuation (continuation );
821
+ /*
822
+ * Whoever successfully removes the continuation from the table gets to complete it.
823
+ * Lock must NOT be held while invoking callback
824
+ */
825
+ if (was_present ) {
826
+ s_complete_continuation (continuation );
827
+ }
789
828
}
829
+
830
+ aws_event_stream_rpc_client_continuation_release (continuation );
790
831
} else {
791
832
if (message_type <= AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_APPLICATION_ERROR ||
792
833
message_type >= AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_COUNT ) {
@@ -915,6 +956,7 @@ struct aws_event_stream_rpc_client_continuation_token *aws_event_stream_rpc_clie
915
956
aws_atomic_init_int (& continuation -> is_complete , 0 );
916
957
continuation -> continuation_fn = continuation_options -> on_continuation ;
917
958
continuation -> closed_fn = continuation_options -> on_continuation_closed ;
959
+ continuation -> terminated_fn = continuation_options -> on_continuation_terminated ;
918
960
continuation -> user_data = continuation_options -> user_data ;
919
961
920
962
return continuation ;
@@ -959,7 +1001,15 @@ void aws_event_stream_rpc_client_continuation_release(
959
1001
if (ref_count == 1 ) {
960
1002
struct aws_allocator * allocator = continuation_mut -> connection -> allocator ;
961
1003
aws_event_stream_rpc_client_connection_release (continuation_mut -> connection );
1004
+
1005
+ aws_event_stream_rpc_client_stream_continuation_terminated_fn * terminated_fn = continuation_mut -> terminated_fn ;
1006
+ void * terminated_user_data = continuation_mut -> user_data ;
1007
+
962
1008
aws_mem_release (allocator , continuation_mut );
1009
+
1010
+ if (terminated_fn ) {
1011
+ terminated_fn (terminated_user_data );
1012
+ }
963
1013
}
964
1014
}
965
1015
@@ -1062,3 +1112,13 @@ int aws_event_stream_rpc_client_continuation_send_message(
1062
1112
return s_send_protocol_message (
1063
1113
continuation -> connection , continuation , NULL , message_args , continuation -> stream_id , flush_fn , user_data );
1064
1114
}
1115
+
1116
+ struct aws_event_loop * aws_event_stream_rpc_client_connection_get_event_loop (
1117
+ const struct aws_event_stream_rpc_client_connection * connection ) {
1118
+
1119
+ if (!connection ) {
1120
+ return NULL ;
1121
+ }
1122
+
1123
+ return connection -> event_loop ;
1124
+ }
0 commit comments