Skip to content

Commit f6f23ba

Browse files
committed
optionally create segment on merge
create a new segment only if it contains data fixes #1189
1 parent ea72cf3 commit f6f23ba

File tree

4 files changed

+140
-37
lines changed

4 files changed

+140
-37
lines changed

src/indexer/index_writer.rs

Lines changed: 87 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -513,7 +513,7 @@ impl IndexWriter {
513513
/// Merges a given list of segments
514514
///
515515
/// `segment_ids` is required to be non-empty.
516-
pub fn merge(&mut self, segment_ids: &[SegmentId]) -> FutureResult<SegmentMeta> {
516+
pub fn merge(&mut self, segment_ids: &[SegmentId]) -> FutureResult<()> {
517517
let merge_operation = self.segment_updater.make_merge_operation(segment_ids);
518518
let segment_updater = self.segment_updater.clone();
519519
segment_updater.start_merge(merge_operation)
@@ -1011,6 +1011,92 @@ mod tests {
10111011
Ok(())
10121012
}
10131013

1014+
#[test]
1015+
fn test_merge_on_empty_segments_single_segment() -> crate::Result<()> {
1016+
let mut schema_builder = schema::Schema::builder();
1017+
let text_field = schema_builder.add_text_field("text", schema::TEXT);
1018+
let index = Index::create_in_ram(schema_builder.build());
1019+
let reader = index
1020+
.reader_builder()
1021+
.reload_policy(ReloadPolicy::Manual)
1022+
.try_into()?;
1023+
let num_docs_containing = |s: &str| {
1024+
let term_a = Term::from_field_text(text_field, s);
1025+
reader.searcher().doc_freq(&term_a).unwrap()
1026+
};
1027+
// writing the segment
1028+
let mut index_writer = index.writer(12_000_000).unwrap();
1029+
index_writer.add_document(doc!(text_field=>"a"))?;
1030+
index_writer.commit()?;
1031+
// this should create 1 segment
1032+
1033+
let segments = index.searchable_segment_ids().unwrap();
1034+
assert_eq!(segments.len(), 1);
1035+
1036+
reader.reload().unwrap();
1037+
assert_eq!(num_docs_containing("a"), 1);
1038+
1039+
index_writer.delete_term(Term::from_field_text(text_field, "a"));
1040+
index_writer.commit()?;
1041+
1042+
reader.reload().unwrap();
1043+
assert_eq!(num_docs_containing("a"), 0);
1044+
1045+
index_writer.merge(&segments);
1046+
index_writer.wait_merging_threads().unwrap();
1047+
1048+
let segments = index.searchable_segment_ids().unwrap();
1049+
assert_eq!(segments.len(), 0);
1050+
1051+
Ok(())
1052+
}
1053+
1054+
#[test]
1055+
fn test_merge_on_empty_segments() -> crate::Result<()> {
1056+
let mut schema_builder = schema::Schema::builder();
1057+
let text_field = schema_builder.add_text_field("text", schema::TEXT);
1058+
let index = Index::create_in_ram(schema_builder.build());
1059+
let reader = index
1060+
.reader_builder()
1061+
.reload_policy(ReloadPolicy::Manual)
1062+
.try_into()?;
1063+
let num_docs_containing = |s: &str| {
1064+
let term_a = Term::from_field_text(text_field, s);
1065+
reader.searcher().doc_freq(&term_a).unwrap()
1066+
};
1067+
// writing the segment
1068+
let mut index_writer = index.writer(12_000_000).unwrap();
1069+
index_writer.add_document(doc!(text_field=>"a"))?;
1070+
index_writer.commit()?;
1071+
index_writer.add_document(doc!(text_field=>"a"))?;
1072+
index_writer.commit()?;
1073+
index_writer.add_document(doc!(text_field=>"a"))?;
1074+
index_writer.commit()?;
1075+
index_writer.add_document(doc!(text_field=>"a"))?;
1076+
index_writer.commit()?;
1077+
// this should create 4 segments
1078+
1079+
let segments = index.searchable_segment_ids().unwrap();
1080+
assert_eq!(segments.len(), 4);
1081+
1082+
reader.reload().unwrap();
1083+
assert_eq!(num_docs_containing("a"), 4);
1084+
1085+
index_writer.delete_term(Term::from_field_text(text_field, "a"));
1086+
index_writer.commit()?;
1087+
1088+
reader.reload().unwrap();
1089+
assert_eq!(num_docs_containing("a"), 0);
1090+
1091+
index_writer.merge(&segments);
1092+
index_writer.wait_merging_threads().unwrap();
1093+
1094+
let segments = index.searchable_segment_ids().unwrap();
1095+
assert_eq!(segments.len(), 0);
1096+
1097+
Ok(())
1098+
}
1099+
10141100
#[test]
10151101
fn test_with_merges() -> crate::Result<()> {
10161102
let mut schema_builder = schema::Schema::builder();

src/indexer/merger.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ impl IndexMerger {
200200
readers.push(reader);
201201
}
202202
}
203+
203204
let max_doc = readers.iter().map(|reader| reader.num_docs()).sum();
204205
if let Some(sort_by_field) = index_settings.sort_by_field.as_ref() {
205206
readers = Self::sort_readers_by_min_sort_field(readers, sort_by_field)?;

src/indexer/segment_manager.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ impl SegmentManager {
173173
.to_string();
174174
return Err(TantivyError::InvalidArgument(error_msg));
175175
}
176+
176177
Ok(segment_entries)
177178
}
178179

@@ -186,7 +187,7 @@ impl SegmentManager {
186187
pub(crate) fn end_merge(
187188
&self,
188189
before_merge_segment_ids: &[SegmentId],
189-
after_merge_segment_entry: SegmentEntry,
190+
after_merge_segment_entry: Option<SegmentEntry>,
190191
) -> crate::Result<SegmentsStatus> {
191192
let mut registers_lock = self.write();
192193
let segments_status = registers_lock
@@ -207,7 +208,9 @@ impl SegmentManager {
207208
for segment_id in before_merge_segment_ids {
208209
target_register.remove_segment(segment_id);
209210
}
210-
target_register.add_segment_entry(after_merge_segment_entry);
211+
if let Some(entry) = after_merge_segment_entry {
212+
target_register.add_segment_entry(entry);
213+
}
211214
Ok(segments_status)
212215
}
213216

src/indexer/segment_updater.rs

Lines changed: 47 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,15 @@ fn merge(
9191
index: &Index,
9292
mut segment_entries: Vec<SegmentEntry>,
9393
target_opstamp: Opstamp,
94-
) -> crate::Result<SegmentEntry> {
94+
) -> crate::Result<Option<SegmentEntry>> {
95+
let num_docs = segment_entries
96+
.iter()
97+
.map(|segment| segment.meta().num_docs() as u64)
98+
.sum::<u64>();
99+
if num_docs == 0 {
100+
return Ok(None);
101+
}
102+
95103
// first we need to apply deletes to our segment.
96104
let merged_segment = index.new_segment();
97105

@@ -120,7 +128,7 @@ fn merge(
120128
let merged_segment_id = merged_segment.id();
121129

122130
let segment_meta = index.new_segment_meta(merged_segment_id, num_docs);
123-
Ok(SegmentEntry::new(segment_meta, delete_cursor, None))
131+
Ok(Some(SegmentEntry::new(segment_meta, delete_cursor, None)))
124132
}
125133

126134
/// Advanced: Merges a list of segments from different indices in a new index.
@@ -475,7 +483,7 @@ impl SegmentUpdater {
475483
// suggested and the moment when it ended up being executed.)
476484
//
477485
// `segment_ids` is required to be non-empty.
478-
pub fn start_merge(&self, merge_operation: MergeOperation) -> FutureResult<SegmentMeta> {
486+
pub fn start_merge(&self, merge_operation: MergeOperation) -> FutureResult<()> {
479487
assert!(
480488
!merge_operation.segment_ids().is_empty(),
481489
"Segment_ids cannot be empty."
@@ -512,18 +520,19 @@ impl SegmentUpdater {
512520
merge_operation.target_opstamp(),
513521
) {
514522
Ok(after_merge_segment_entry) => {
515-
let segment_meta_res =
516-
segment_updater.end_merge(merge_operation, after_merge_segment_entry);
517-
let _send_result = merging_future_send.send(segment_meta_res);
523+
let res = segment_updater.end_merge(merge_operation, after_merge_segment_entry);
524+
let _send_result = merging_future_send.send(res);
518525
}
519526
Err(merge_error) => {
520527
warn!(
521528
"Merge of {:?} was cancelled: {:?}",
522529
merge_operation.segment_ids().to_vec(),
523530
merge_error
524531
);
532+
if cfg!(test) {
533+
panic!("{:?}", merge_error);
534+
}
525535
let _send_result = merging_future_send.send(Err(merge_error));
526-
assert!(!cfg!(test), "Merge failed.");
527536
}
528537
}
529538
});
@@ -573,35 +582,39 @@ impl SegmentUpdater {
573582
fn end_merge(
574583
&self,
575584
merge_operation: MergeOperation,
576-
mut after_merge_segment_entry: SegmentEntry,
577-
) -> crate::Result<SegmentMeta> {
585+
mut after_merge_segment_entry: Option<SegmentEntry>,
586+
) -> crate::Result<()> {
578587
let segment_updater = self.clone();
579-
let after_merge_segment_meta = after_merge_segment_entry.meta().clone();
580588
self.schedule_task(move || {
581-
info!("End merge {:?}", after_merge_segment_entry.meta());
589+
info!(
590+
"End merge {:?}",
591+
after_merge_segment_entry.as_ref().map(|entry| entry.meta())
592+
);
582593
{
583-
let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone();
584-
if let Some(delete_operation) = delete_cursor.get() {
585-
let committed_opstamp = segment_updater.load_meta().opstamp;
586-
if delete_operation.opstamp < committed_opstamp {
587-
let index = &segment_updater.index;
588-
let segment = index.segment(after_merge_segment_entry.meta().clone());
589-
if let Err(advance_deletes_err) = advance_deletes(
590-
segment,
591-
&mut after_merge_segment_entry,
592-
committed_opstamp,
593-
) {
594-
error!(
595-
"Merge of {:?} was cancelled (advancing deletes failed): {:?}",
596-
merge_operation.segment_ids(),
597-
advance_deletes_err
598-
);
599-
assert!(!cfg!(test), "Merge failed.");
600-
601-
// ... cancel merge
602-
// `merge_operations` are tracked. As it is dropped, the
603-
// the segment_ids will be available again for merge.
604-
return Err(advance_deletes_err);
594+
if let Some(mut after_merge_segment_entry) = after_merge_segment_entry.as_mut() {
595+
let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone();
596+
if let Some(delete_operation) = delete_cursor.get() {
597+
let committed_opstamp = segment_updater.load_meta().opstamp;
598+
if delete_operation.opstamp < committed_opstamp {
599+
let index = &segment_updater.index;
600+
let segment = index.segment(after_merge_segment_entry.meta().clone());
601+
if let Err(advance_deletes_err) = advance_deletes(
602+
segment,
603+
&mut after_merge_segment_entry,
604+
committed_opstamp,
605+
) {
606+
error!(
607+
"Merge of {:?} was cancelled (advancing deletes failed): {:?}",
608+
merge_operation.segment_ids(),
609+
advance_deletes_err
610+
);
611+
assert!(!cfg!(test), "Merge failed.");
612+
613+
// ... cancel merge
614+
// `merge_operations` are tracked. As it is dropped, the
615+
// the segment_ids will be available again for merge.
616+
return Err(advance_deletes_err);
617+
}
605618
}
606619
}
607620
}
@@ -622,7 +635,7 @@ impl SegmentUpdater {
622635
Ok(())
623636
})
624637
.wait()?;
625-
Ok(after_merge_segment_meta)
638+
Ok(())
626639
}
627640

628641
/// Wait for current merging threads.

0 commit comments

Comments
 (0)