1
1
use std:: fs:: File ;
2
2
use std:: io:: { self , BufRead , BufReader , Seek , SeekFrom , Write } ;
3
+ use std:: sync:: atomic:: { AtomicBool , Ordering } ;
4
+ use std:: sync:: Arc ;
3
5
use std:: thread;
4
6
use std:: time:: Duration ;
5
7
6
8
use anyhow:: { anyhow, Result } ;
7
- use crossbeam_channel:: { select, tick, unbounded} ;
9
+ use crossbeam_channel:: { bounded , select, tick, unbounded} ;
8
10
use crossterm:: cursor:: SavePosition ;
9
11
use crossterm:: execute;
10
12
use crossterm:: terminal:: { Clear , ClearType } ;
@@ -117,7 +119,6 @@ fn tail(
117
119
queries : Option < Vec < String > > ,
118
120
) -> Result < ( ) > {
119
121
const SLEEP : u64 = 100 ;
120
- const CHUNK : usize = 5 ;
121
122
122
123
// Save our cursor position.
123
124
execute ! ( io:: stdout( ) , SavePosition ) ?;
@@ -133,42 +134,61 @@ fn tail(
133
134
let ( tx, rx) = unbounded ( ) ;
134
135
let ticker = tick ( Duration :: from_secs ( opts. interval ) ) ;
135
136
136
- thread:: spawn ( move || -> Result < ( ) > {
137
+ // The interrupt handling plumbing.
138
+ let ( stop_tx, stop_rx) = bounded ( 0 ) ;
139
+ let running = Arc :: new ( AtomicBool :: new ( true ) ) ;
140
+ let handler_r = Arc :: clone ( & running) ;
141
+
142
+ ctrlc:: set_handler ( move || {
143
+ handler_r. store ( false , Ordering :: SeqCst ) ;
144
+ } ) ?;
145
+
146
+ let reader_handle = thread:: spawn ( move || -> Result < ( ) > {
137
147
loop {
138
- let mut line = String :: new ( ) ;
139
- let n_read = tail_reader. read_line ( & mut line) ?;
140
-
141
- if n_read > 0 {
142
- len += n_read as u64 ;
143
- tail_reader. seek ( SeekFrom :: Start ( len) ) ?;
144
- line. pop ( ) ; // Remove the newline character.
145
- debug ! ( "tail read: {}" , line) ;
146
- tx. send ( line) ?;
147
- } else {
148
- debug ! ( "tail sleeping for {} milliseconds" , SLEEP ) ;
149
- thread:: sleep ( Duration :: from_millis ( SLEEP ) ) ;
148
+ select ! {
149
+ recv( stop_rx) -> _ => { return Ok ( ( ) ) ; }
150
+ default => {
151
+ let mut line = String :: new( ) ;
152
+ let n_read = tail_reader. read_line( & mut line) ?;
153
+
154
+ if n_read > 0 {
155
+ len += n_read as u64 ;
156
+ tail_reader. seek( SeekFrom :: Start ( len) ) ?;
157
+ line. pop( ) ; // Remove the newline character.
158
+ debug!( "tail read: {}" , line) ;
159
+ tx. send( line) ?;
160
+ } else {
161
+ debug!( "tail sleeping for {} milliseconds" , SLEEP ) ;
162
+ thread:: sleep( Duration :: from_millis( SLEEP ) ) ;
163
+ }
164
+ }
150
165
}
151
166
}
152
167
} ) ;
153
168
154
- let mut lines = Vec :: with_capacity ( CHUNK ) ;
155
- loop {
169
+ let mut lines = Vec :: new ( ) ;
170
+ while running . load ( Ordering :: SeqCst ) {
156
171
select ! {
157
172
recv( rx) -> line => {
158
173
lines. push( line?) ;
159
- // If we have reached our internal buffering size, write them to the SQL engine and
160
- // reset the vector.
161
- if lines. len( ) == CHUNK {
162
- parse_input( & lines, & pattern, & processor) ?;
163
- lines. clear( ) ;
164
- }
174
+ parse_input( & lines, & pattern, & processor) ?;
175
+ lines. clear( ) ;
165
176
}
166
177
recv( ticker) -> _ => {
167
178
execute!( io:: stdout( ) , Clear ( ClearType :: All ) ) ?;
168
179
processor. report( opts. follow) ?;
169
180
}
170
181
}
171
182
}
183
+
184
+ // We got an interrupt, so stop the reading thread.
185
+ stop_tx. send ( ( ) ) ?;
186
+
187
+ // The join will panic if the thread panics but otherwise it will propagate the return value up
188
+ // to the main thread.
189
+ reader_handle
190
+ . join ( )
191
+ . expect ( "the file reading thread should not have panicked" )
172
192
}
173
193
174
194
// Either read from STDIN or the file specified.
0 commit comments