63
63
64
64
/* Number of messages to send before rescheduling */
65
65
#define MAX_SEND_MSG_COUNT 25
66
+ #define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(10000)
66
67
67
68
struct cbuf {
68
69
unsigned int base ;
@@ -110,10 +111,12 @@ struct connection {
110
111
#define CF_CLOSE 6
111
112
#define CF_APP_LIMITED 7
112
113
#define CF_CLOSING 8
114
+ #define CF_SHUTDOWN 9
113
115
struct list_head writequeue ; /* List of outgoing writequeue_entries */
114
116
spinlock_t writequeue_lock ;
115
117
int (* rx_action ) (struct connection * ); /* What to do when active */
116
118
void (* connect_action ) (struct connection * ); /* What to do to connect */
119
+ void (* shutdown_action )(struct connection * con ); /* What to do to shutdown */
117
120
struct page * rx_page ;
118
121
struct cbuf cb ;
119
122
int retries ;
@@ -122,6 +125,7 @@ struct connection {
122
125
struct connection * othercon ;
123
126
struct work_struct rwork ; /* Receive workqueue */
124
127
struct work_struct swork ; /* Send workqueue */
128
+ wait_queue_head_t shutdown_wait ; /* wait for graceful shutdown */
125
129
};
126
130
#define sock2con (x ) ((struct connection *)(x)->sk_user_data)
127
131
@@ -218,6 +222,7 @@ static struct connection *__nodeid2con(int nodeid, gfp_t alloc)
218
222
spin_lock_init (& con -> writequeue_lock );
219
223
INIT_WORK (& con -> swork , process_send_sockets );
220
224
INIT_WORK (& con -> rwork , process_recv_sockets );
225
+ init_waitqueue_head (& con -> shutdown_wait );
221
226
222
227
/* Setup action pointers for child sockets */
223
228
if (con -> nodeid ) {
@@ -619,6 +624,54 @@ static void close_connection(struct connection *con, bool and_other,
619
624
clear_bit (CF_CLOSING , & con -> flags );
620
625
}
621
626
627
+ static void shutdown_connection (struct connection * con )
628
+ {
629
+ int ret ;
630
+
631
+ if (cancel_work_sync (& con -> swork )) {
632
+ log_print ("canceled swork for node %d" , con -> nodeid );
633
+ clear_bit (CF_WRITE_PENDING , & con -> flags );
634
+ }
635
+
636
+ mutex_lock (& con -> sock_mutex );
637
+ /* nothing to shutdown */
638
+ if (!con -> sock ) {
639
+ mutex_unlock (& con -> sock_mutex );
640
+ return ;
641
+ }
642
+
643
+ set_bit (CF_SHUTDOWN , & con -> flags );
644
+ ret = kernel_sock_shutdown (con -> sock , SHUT_WR );
645
+ mutex_unlock (& con -> sock_mutex );
646
+ if (ret ) {
647
+ log_print ("Connection %p failed to shutdown: %d will force close" ,
648
+ con , ret );
649
+ goto force_close ;
650
+ } else {
651
+ ret = wait_event_timeout (con -> shutdown_wait ,
652
+ !test_bit (CF_SHUTDOWN , & con -> flags ),
653
+ DLM_SHUTDOWN_WAIT_TIMEOUT );
654
+ if (ret == 0 ) {
655
+ log_print ("Connection %p shutdown timed out, will force close" ,
656
+ con );
657
+ goto force_close ;
658
+ }
659
+ }
660
+
661
+ return ;
662
+
663
+ force_close :
664
+ clear_bit (CF_SHUTDOWN , & con -> flags );
665
+ close_connection (con , false, true, true);
666
+ }
667
+
668
+ static void dlm_tcp_shutdown (struct connection * con )
669
+ {
670
+ if (con -> othercon )
671
+ shutdown_connection (con -> othercon );
672
+ shutdown_connection (con );
673
+ }
674
+
622
675
/* Data received from remote end */
623
676
static int receive_from_sock (struct connection * con )
624
677
{
@@ -713,13 +766,18 @@ static int receive_from_sock(struct connection *con)
713
766
out_close :
714
767
mutex_unlock (& con -> sock_mutex );
715
768
if (ret != - EAGAIN ) {
716
- close_connection (con , false, true, false);
717
769
/* Reconnect when there is something to send */
770
+ close_connection (con , false, true, false);
771
+ if (ret == 0 ) {
772
+ log_print ("connection %p got EOF from %d" ,
773
+ con , con -> nodeid );
774
+ /* handling for tcp shutdown */
775
+ clear_bit (CF_SHUTDOWN , & con -> flags );
776
+ wake_up (& con -> shutdown_wait );
777
+ /* signal to breaking receive worker */
778
+ ret = -1 ;
779
+ }
718
780
}
719
- /* Don't return success if we really got EOF */
720
- if (ret == 0 )
721
- ret = - EAGAIN ;
722
-
723
781
return ret ;
724
782
}
725
783
@@ -803,6 +861,7 @@ static int accept_from_sock(struct connection *con)
803
861
spin_lock_init (& othercon -> writequeue_lock );
804
862
INIT_WORK (& othercon -> swork , process_send_sockets );
805
863
INIT_WORK (& othercon -> rwork , process_recv_sockets );
864
+ init_waitqueue_head (& othercon -> shutdown_wait );
806
865
set_bit (CF_IS_OTHERCON , & othercon -> flags );
807
866
} else {
808
867
/* close other sock con if we have something new */
@@ -1047,6 +1106,7 @@ static void tcp_connect_to_sock(struct connection *con)
1047
1106
1048
1107
con -> rx_action = receive_from_sock ;
1049
1108
con -> connect_action = tcp_connect_to_sock ;
1109
+ con -> shutdown_action = dlm_tcp_shutdown ;
1050
1110
add_sock (sock , con );
1051
1111
1052
1112
/* Bind to our cluster-known address connecting to avoid
@@ -1542,6 +1602,12 @@ static void stop_conn(struct connection *con)
1542
1602
_stop_conn (con , true);
1543
1603
}
1544
1604
1605
+ static void shutdown_conn (struct connection * con )
1606
+ {
1607
+ if (con -> shutdown_action )
1608
+ con -> shutdown_action (con );
1609
+ }
1610
+
1545
1611
static void free_conn (struct connection * con )
1546
1612
{
1547
1613
close_connection (con , true, true, true);
@@ -1593,6 +1659,7 @@ void dlm_lowcomms_stop(void)
1593
1659
mutex_lock (& connections_lock );
1594
1660
dlm_allow_conn = 0 ;
1595
1661
mutex_unlock (& connections_lock );
1662
+ foreach_conn (shutdown_conn );
1596
1663
work_flush ();
1597
1664
clean_writequeues ();
1598
1665
foreach_conn (free_conn );
0 commit comments