File tree Expand file tree Collapse file tree 5 files changed +16
-14
lines changed Expand file tree Collapse file tree 5 files changed +16
-14
lines changed Original file line number Diff line number Diff line change @@ -76,7 +76,7 @@ impl StatisticsReceiver {
76
76
recv = Box :: pin ( flight_exchange. recv ( ) ) ;
77
77
}
78
78
Err ( cause) => {
79
- ctx. get_current_session ( ) . force_kill_query ( ) ;
79
+ ctx. get_current_session ( ) . force_kill_query ( cause . clone ( ) ) ;
80
80
return Err ( cause) ;
81
81
}
82
82
} ;
@@ -90,14 +90,13 @@ impl StatisticsReceiver {
90
90
notified = middle;
91
91
match Self :: recv_data ( & ctx, res) {
92
92
Ok ( true ) => {
93
- ctx. get_current_session ( ) . force_kill_query ( ) ;
94
93
return Ok ( ( ) ) ;
95
94
}
96
95
Ok ( false ) => {
97
96
recv = Box :: pin ( flight_exchange. recv ( ) ) ;
98
97
}
99
98
Err ( cause) => {
100
- ctx. get_current_session ( ) . force_kill_query ( ) ;
99
+ ctx. get_current_session ( ) . force_kill_query ( cause . clone ( ) ) ;
101
100
return Err ( cause) ;
102
101
}
103
102
} ;
@@ -106,7 +105,7 @@ impl StatisticsReceiver {
106
105
}
107
106
108
107
if let Err ( cause) = Self :: fetch ( & ctx, & flight_exchange, recv) . await {
109
- ctx. get_current_session ( ) . force_kill_query ( ) ;
108
+ ctx. get_current_session ( ) . force_kill_query ( cause . clone ( ) ) ;
110
109
return Err ( cause) ;
111
110
}
112
111
Original file line number Diff line number Diff line change @@ -48,7 +48,9 @@ impl KillInterpreter {
48
48
Ok ( Box :: pin ( DataBlockStream :: create ( schema, None , vec ! [ ] ) ) )
49
49
}
50
50
Some ( kill_session) => {
51
- kill_session. force_kill_query ( ) ;
51
+ kill_session. force_kill_query ( ErrorCode :: AbortedQuery (
52
+ "Aborted query, because the server is shutting down or the query was killed" ,
53
+ ) ) ;
52
54
let schema = Arc :: new ( DataSchema :: empty ( ) ) ;
53
55
Ok ( Box :: pin ( DataBlockStream :: create ( schema, None , vec ! [ ] ) ) )
54
56
}
Original file line number Diff line number Diff line change @@ -197,7 +197,9 @@ impl Executor {
197
197
Running ( r) => {
198
198
// release session
199
199
if kill {
200
- r. session . force_kill_query ( ) ;
200
+ r. session . force_kill_query ( ErrorCode :: AbortedQuery (
201
+ "Aborted query, because the server is shutting down or the query was killed" ,
202
+ ) ) ;
201
203
}
202
204
if let Err ( e) = & reason {
203
205
r. ctx . set_error ( e. clone ( ) ) ;
Original file line number Diff line number Diff line change @@ -124,11 +124,8 @@ impl QueryContextShared {
124
124
self . query_need_abort . clone ( )
125
125
}
126
126
127
- pub fn kill ( & self ) {
128
- self . set_error ( ErrorCode :: AbortedQuery (
129
- "Aborted query, because the server is shutting down or the query was killed" ,
130
- ) ) ;
131
-
127
+ pub fn kill ( & self , cause : ErrorCode ) {
128
+ self . set_error ( cause) ;
132
129
self . query_need_abort . store ( true , Ordering :: Release ) ;
133
130
let mut sources_abort_handle = self . sources_abort_handle . write ( ) ;
134
131
Original file line number Diff line number Diff line change @@ -106,15 +106,17 @@ impl Session {
106
106
}
107
107
108
108
pub fn force_kill_session ( self : & Arc < Self > ) {
109
- self . force_kill_query ( ) ;
109
+ self . force_kill_query ( ErrorCode :: AbortedQuery (
110
+ "Aborted query, because the server is shutting down or the query was killed" ,
111
+ ) ) ;
110
112
self . kill ( /* shutdown io stream */ ) ;
111
113
}
112
114
113
- pub fn force_kill_query ( self : & Arc < Self > ) {
115
+ pub fn force_kill_query ( self : & Arc < Self > , cause : ErrorCode ) {
114
116
let session_ctx = self . session_ctx . clone ( ) ;
115
117
116
118
if let Some ( context_shared) = session_ctx. get_query_context_shared ( ) {
117
- context_shared. kill ( /* shutdown executing query */ ) ;
119
+ context_shared. kill ( cause ) ;
118
120
}
119
121
}
120
122
You can’t perform that action at this time.
0 commit comments