@@ -5,7 +5,7 @@ use std::{
5
5
ops:: Range ,
6
6
} ;
7
7
8
- use log:: { debug, info } ;
8
+ use log:: debug;
9
9
10
10
use crate :: {
11
11
commit:: { self , Commit , StoredCommit } ,
@@ -126,7 +126,7 @@ impl<W: io::Write> Writer<W> {
126
126
index
127
127
. append_after_commit ( self . commit . min_tx_offset , self . bytes_written , commit_len)
128
128
. map_err ( |e| {
129
- info ! ( "failed to append to offset index: {:?}" , e) ;
129
+ debug ! ( "failed to append to offset index: {:?}" , e) ;
130
130
} )
131
131
} ) ;
132
132
@@ -242,34 +242,42 @@ impl OffsetIndexWriter {
242
242
) -> Result < ( ) , IndexError > {
243
243
self . bytes_since_last_index += commit_len;
244
244
245
- if !self . require_segment_fsync && self . bytes_since_last_index >= self . min_write_interval . get ( ) {
246
- self . head . append ( min_tx_offset, byte_offset) ?;
247
- self . head . async_flush ( ) ?;
248
- self . reset ( ) ;
249
- } else {
245
+ if self . candidate_min_tx_offset == 0 {
250
246
self . candidate_byte_offset = byte_offset;
251
247
self . candidate_min_tx_offset = min_tx_offset;
252
248
}
249
+
250
+ if !self . require_segment_fsync {
251
+ self . append_internal ( ) ?;
252
+ }
253
+
253
254
Ok ( ( ) )
254
255
}
255
256
256
- pub fn append_after_fsync ( & mut self ) -> Result < ( ) , IndexError > {
257
- if self . bytes_since_last_index >= self . min_write_interval . get ( ) {
258
- self . head
259
- . append ( self . candidate_min_tx_offset , self . candidate_byte_offset ) ?;
257
+ fn append_internal ( & mut self ) -> Result < ( ) , IndexError > {
258
+ // If the candidate offset is zero, there has not been a commit since the last offset entry
259
+ if self . candidate_min_tx_offset == 0 {
260
+ return Ok ( ( ) ) ;
261
+ }
260
262
261
- self . head . async_flush ( ) ? ;
262
- self . reset ( ) ;
263
+ if self . bytes_since_last_index < self . min_write_interval . get ( ) {
264
+ return Ok ( ( ) ) ;
263
265
}
266
+
267
+ self . head
268
+ . append ( self . candidate_min_tx_offset , self . candidate_byte_offset ) ?;
269
+ self . head . async_flush ( ) ?;
270
+ self . reset ( ) ;
271
+
264
272
Ok ( ( ) )
265
273
}
266
274
}
267
275
268
276
impl FileLike for OffsetIndexWriter {
269
277
/// Must be called via SegmentWriter::fsync
270
278
fn fsync ( & mut self ) -> io:: Result < ( ) > {
271
- let _ = self . append_after_fsync ( ) . map_err ( |e| {
272
- info ! ( "failed to append to offset index: {:?}" , e) ;
279
+ let _ = self . append_internal ( ) . map_err ( |e| {
280
+ debug ! ( "failed to append to offset index: {:?}" , e) ;
273
281
} ) ;
274
282
Ok ( ( ) )
275
283
}
0 commit comments