Skip to content

Commit 2a6479b

Browse files
authored
Merge pull request #1427 from quickwit-oss/empty_segments_crash
handle empty segments for merge
2 parents ea72cf3 + 9c2ef81 commit 2a6479b

File tree

4 files changed

+148
-37
lines changed

4 files changed

+148
-37
lines changed

src/indexer/index_writer.rs

Lines changed: 90 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -510,10 +510,12 @@ impl IndexWriter {
510510
Ok(self.committed_opstamp)
511511
}
512512

513-
/// Merges a given list of segments
513+
/// Merges a given list of segments.
514+
///
515+
/// If all segments are empty no new segment will be created.
514516
///
515517
/// `segment_ids` is required to be non-empty.
516-
pub fn merge(&mut self, segment_ids: &[SegmentId]) -> FutureResult<SegmentMeta> {
518+
pub fn merge(&mut self, segment_ids: &[SegmentId]) -> FutureResult<Option<SegmentMeta>> {
517519
let merge_operation = self.segment_updater.make_merge_operation(segment_ids);
518520
let segment_updater = self.segment_updater.clone();
519521
segment_updater.start_merge(merge_operation)
@@ -1011,6 +1013,92 @@ mod tests {
10111013
Ok(())
10121014
}
10131015

1016+
#[test]
1017+
fn test_merge_on_empty_segments_single_segment() -> crate::Result<()> {
1018+
let mut schema_builder = schema::Schema::builder();
1019+
let text_field = schema_builder.add_text_field("text", schema::TEXT);
1020+
let index = Index::create_in_ram(schema_builder.build());
1021+
let reader = index
1022+
.reader_builder()
1023+
.reload_policy(ReloadPolicy::Manual)
1024+
.try_into()?;
1025+
let num_docs_containing = |s: &str| {
1026+
let term_a = Term::from_field_text(text_field, s);
1027+
reader.searcher().doc_freq(&term_a).unwrap()
1028+
};
1029+
// writing the segment
1030+
let mut index_writer = index.writer(12_000_000).unwrap();
1031+
index_writer.add_document(doc!(text_field=>"a"))?;
1032+
index_writer.commit()?;
1033+
// this should create 1 segment
1034+
1035+
let segments = index.searchable_segment_ids().unwrap();
1036+
assert_eq!(segments.len(), 1);
1037+
1038+
reader.reload().unwrap();
1039+
assert_eq!(num_docs_containing("a"), 1);
1040+
1041+
index_writer.delete_term(Term::from_field_text(text_field, "a"));
1042+
index_writer.commit()?;
1043+
1044+
reader.reload().unwrap();
1045+
assert_eq!(num_docs_containing("a"), 0);
1046+
1047+
index_writer.merge(&segments);
1048+
index_writer.wait_merging_threads().unwrap();
1049+
1050+
let segments = index.searchable_segment_ids().unwrap();
1051+
assert_eq!(segments.len(), 0);
1052+
1053+
Ok(())
1054+
}
1055+
1056+
#[test]
1057+
fn test_merge_on_empty_segments() -> crate::Result<()> {
1058+
let mut schema_builder = schema::Schema::builder();
1059+
let text_field = schema_builder.add_text_field("text", schema::TEXT);
1060+
let index = Index::create_in_ram(schema_builder.build());
1061+
let reader = index
1062+
.reader_builder()
1063+
.reload_policy(ReloadPolicy::Manual)
1064+
.try_into()?;
1065+
let num_docs_containing = |s: &str| {
1066+
let term_a = Term::from_field_text(text_field, s);
1067+
reader.searcher().doc_freq(&term_a).unwrap()
1068+
};
1069+
// writing the segment
1070+
let mut index_writer = index.writer(12_000_000).unwrap();
1071+
index_writer.add_document(doc!(text_field=>"a"))?;
1072+
index_writer.commit()?;
1073+
index_writer.add_document(doc!(text_field=>"a"))?;
1074+
index_writer.commit()?;
1075+
index_writer.add_document(doc!(text_field=>"a"))?;
1076+
index_writer.commit()?;
1077+
index_writer.add_document(doc!(text_field=>"a"))?;
1078+
index_writer.commit()?;
1079+
// this should create 4 segments
1080+
1081+
let segments = index.searchable_segment_ids().unwrap();
1082+
assert_eq!(segments.len(), 4);
1083+
1084+
reader.reload().unwrap();
1085+
assert_eq!(num_docs_containing("a"), 4);
1086+
1087+
index_writer.delete_term(Term::from_field_text(text_field, "a"));
1088+
index_writer.commit()?;
1089+
1090+
reader.reload().unwrap();
1091+
assert_eq!(num_docs_containing("a"), 0);
1092+
1093+
index_writer.merge(&segments);
1094+
index_writer.wait_merging_threads().unwrap();
1095+
1096+
let segments = index.searchable_segment_ids().unwrap();
1097+
assert_eq!(segments.len(), 0);
1098+
1099+
Ok(())
1100+
}
1101+
10141102
#[test]
10151103
fn test_with_merges() -> crate::Result<()> {
10161104
let mut schema_builder = schema::Schema::builder();

src/indexer/merger.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ impl IndexMerger {
200200
readers.push(reader);
201201
}
202202
}
203+
203204
let max_doc = readers.iter().map(|reader| reader.num_docs()).sum();
204205
if let Some(sort_by_field) = index_settings.sort_by_field.as_ref() {
205206
readers = Self::sort_readers_by_min_sort_field(readers, sort_by_field)?;

src/indexer/segment_manager.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ impl SegmentManager {
173173
.to_string();
174174
return Err(TantivyError::InvalidArgument(error_msg));
175175
}
176+
176177
Ok(segment_entries)
177178
}
178179

@@ -186,7 +187,7 @@ impl SegmentManager {
186187
pub(crate) fn end_merge(
187188
&self,
188189
before_merge_segment_ids: &[SegmentId],
189-
after_merge_segment_entry: SegmentEntry,
190+
after_merge_segment_entry: Option<SegmentEntry>,
190191
) -> crate::Result<SegmentsStatus> {
191192
let mut registers_lock = self.write();
192193
let segments_status = registers_lock
@@ -207,7 +208,9 @@ impl SegmentManager {
207208
for segment_id in before_merge_segment_ids {
208209
target_register.remove_segment(segment_id);
209210
}
210-
target_register.add_segment_entry(after_merge_segment_entry);
211+
if let Some(entry) = after_merge_segment_entry {
212+
target_register.add_segment_entry(entry);
213+
}
211214
Ok(segments_status)
212215
}
213216

src/indexer/segment_updater.rs

Lines changed: 52 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,15 @@ fn merge(
9191
index: &Index,
9292
mut segment_entries: Vec<SegmentEntry>,
9393
target_opstamp: Opstamp,
94-
) -> crate::Result<SegmentEntry> {
94+
) -> crate::Result<Option<SegmentEntry>> {
95+
let num_docs = segment_entries
96+
.iter()
97+
.map(|segment| segment.meta().num_docs() as u64)
98+
.sum::<u64>();
99+
if num_docs == 0 {
100+
return Ok(None);
101+
}
102+
95103
// first we need to apply deletes to our segment.
96104
let merged_segment = index.new_segment();
97105

@@ -120,7 +128,7 @@ fn merge(
120128
let merged_segment_id = merged_segment.id();
121129

122130
let segment_meta = index.new_segment_meta(merged_segment_id, num_docs);
123-
Ok(SegmentEntry::new(segment_meta, delete_cursor, None))
131+
Ok(Some(SegmentEntry::new(segment_meta, delete_cursor, None)))
124132
}
125133

126134
/// Advanced: Merges a list of segments from different indices in a new index.
@@ -475,7 +483,10 @@ impl SegmentUpdater {
475483
// suggested and the moment when it ended up being executed.)
476484
//
477485
// `segment_ids` is required to be non-empty.
478-
pub fn start_merge(&self, merge_operation: MergeOperation) -> FutureResult<SegmentMeta> {
486+
pub fn start_merge(
487+
&self,
488+
merge_operation: MergeOperation,
489+
) -> FutureResult<Option<SegmentMeta>> {
479490
assert!(
480491
!merge_operation.segment_ids().is_empty(),
481492
"Segment_ids cannot be empty."
@@ -512,18 +523,19 @@ impl SegmentUpdater {
512523
merge_operation.target_opstamp(),
513524
) {
514525
Ok(after_merge_segment_entry) => {
515-
let segment_meta_res =
516-
segment_updater.end_merge(merge_operation, after_merge_segment_entry);
517-
let _send_result = merging_future_send.send(segment_meta_res);
526+
let res = segment_updater.end_merge(merge_operation, after_merge_segment_entry);
527+
let _send_result = merging_future_send.send(res);
518528
}
519529
Err(merge_error) => {
520530
warn!(
521531
"Merge of {:?} was cancelled: {:?}",
522532
merge_operation.segment_ids().to_vec(),
523533
merge_error
524534
);
535+
if cfg!(test) {
536+
panic!("{:?}", merge_error);
537+
}
525538
let _send_result = merging_future_send.send(Err(merge_error));
526-
assert!(!cfg!(test), "Merge failed.");
527539
}
528540
}
529541
});
@@ -573,35 +585,42 @@ impl SegmentUpdater {
573585
fn end_merge(
574586
&self,
575587
merge_operation: MergeOperation,
576-
mut after_merge_segment_entry: SegmentEntry,
577-
) -> crate::Result<SegmentMeta> {
588+
mut after_merge_segment_entry: Option<SegmentEntry>,
589+
) -> crate::Result<Option<SegmentMeta>> {
578590
let segment_updater = self.clone();
579-
let after_merge_segment_meta = after_merge_segment_entry.meta().clone();
591+
let after_merge_segment_meta = after_merge_segment_entry
592+
.as_ref()
593+
.map(|after_merge_segment_entry| after_merge_segment_entry.meta().clone());
580594
self.schedule_task(move || {
581-
info!("End merge {:?}", after_merge_segment_entry.meta());
595+
info!(
596+
"End merge {:?}",
597+
after_merge_segment_entry.as_ref().map(|entry| entry.meta())
598+
);
582599
{
583-
let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone();
584-
if let Some(delete_operation) = delete_cursor.get() {
585-
let committed_opstamp = segment_updater.load_meta().opstamp;
586-
if delete_operation.opstamp < committed_opstamp {
587-
let index = &segment_updater.index;
588-
let segment = index.segment(after_merge_segment_entry.meta().clone());
589-
if let Err(advance_deletes_err) = advance_deletes(
590-
segment,
591-
&mut after_merge_segment_entry,
592-
committed_opstamp,
593-
) {
594-
error!(
595-
"Merge of {:?} was cancelled (advancing deletes failed): {:?}",
596-
merge_operation.segment_ids(),
597-
advance_deletes_err
598-
);
599-
assert!(!cfg!(test), "Merge failed.");
600-
601-
// ... cancel merge
602-
// `merge_operations` are tracked. As it is dropped, the
603-
// the segment_ids will be available again for merge.
604-
return Err(advance_deletes_err);
600+
if let Some(after_merge_segment_entry) = after_merge_segment_entry.as_mut() {
601+
let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone();
602+
if let Some(delete_operation) = delete_cursor.get() {
603+
let committed_opstamp = segment_updater.load_meta().opstamp;
604+
if delete_operation.opstamp < committed_opstamp {
605+
let index = &segment_updater.index;
606+
let segment = index.segment(after_merge_segment_entry.meta().clone());
607+
if let Err(advance_deletes_err) = advance_deletes(
608+
segment,
609+
after_merge_segment_entry,
610+
committed_opstamp,
611+
) {
612+
error!(
613+
"Merge of {:?} was cancelled (advancing deletes failed): {:?}",
614+
merge_operation.segment_ids(),
615+
advance_deletes_err
616+
);
617+
assert!(!cfg!(test), "Merge failed.");
618+
619+
// ... cancel merge
620+
// `merge_operations` are tracked. As it is dropped, the
621+
// the segment_ids will be available again for merge.
622+
return Err(advance_deletes_err);
623+
}
605624
}
606625
}
607626
}

0 commit comments

Comments
 (0)