Skip to content

Commit b76b924

Browse files
authored
Add min_shards parameter to index config (#5781)
1 parent 289a12e commit b76b924

File tree

52 files changed

+656
-309
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+656
-309
lines changed

quickwit/quickwit-cli/src/source.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -823,7 +823,7 @@ mod tests {
823823
.collect();
824824
let sources = vec![SourceConfig {
825825
source_id: "foo-source".to_string(),
826-
num_pipelines: NonZeroUsize::new(1).unwrap(),
826+
num_pipelines: NonZeroUsize::MIN,
827827
enabled: true,
828828
source_params: SourceParams::file_from_str("path/to/file").unwrap(),
829829
transform_config: None,
@@ -884,15 +884,15 @@ mod tests {
884884
let sources = [
885885
SourceConfig {
886886
source_id: "foo-source".to_string(),
887-
num_pipelines: NonZeroUsize::new(1).unwrap(),
887+
num_pipelines: NonZeroUsize::MIN,
888888
enabled: true,
889889
source_params: SourceParams::stdin(),
890890
transform_config: None,
891891
input_format: SourceInputFormat::Json,
892892
},
893893
SourceConfig {
894894
source_id: "bar-source".to_string(),
895-
num_pipelines: NonZeroUsize::new(1).unwrap(),
895+
num_pipelines: NonZeroUsize::MIN,
896896
enabled: true,
897897
source_params: SourceParams::stdin(),
898898
transform_config: None,

quickwit/quickwit-cli/src/tool.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,7 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<
415415
.map(|vrl_script| TransformConfig::new(vrl_script, None));
416416
let source_config = SourceConfig {
417417
source_id: CLI_SOURCE_ID.to_string(),
418-
num_pipelines: NonZeroUsize::new(1).expect("1 is always non-zero."),
418+
num_pipelines: NonZeroUsize::MIN,
419419
enabled: true,
420420
source_params,
421421
transform_config,
@@ -605,7 +605,7 @@ pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> {
605605
index_id: args.index_id,
606606
source_config: SourceConfig {
607607
source_id: args.source_id,
608-
num_pipelines: NonZeroUsize::new(1).unwrap(),
608+
num_pipelines: NonZeroUsize::MIN,
609609
enabled: true,
610610
source_params: SourceParams::Vec(VecSourceParams::default()),
611611
transform_config: None,

quickwit/quickwit-config/resources/tests/index_config/hdfs-logs.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@
6666
"heap_size": "3G"
6767
}
6868
},
69+
"ingest_settings": {
70+
"min_shards": 12
71+
},
6972
"search_settings": {
7073
"default_search_fields": ["severity_text", "body"]
7174
}

quickwit/quickwit-config/resources/tests/index_config/hdfs-logs.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,5 +34,8 @@ maturation_period = "48 hours"
3434
[indexing_settings.resources]
3535
heap_size = "3G"
3636

37+
[ingest_settings]
38+
min_shards = 12
39+
3740
[search_settings]
3841
default_search_fields = [ "severity_text", "body" ]

quickwit/quickwit-config/resources/tests/index_config/hdfs-logs.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,5 +46,8 @@ indexing_settings:
4646
resources:
4747
heap_size: 3G
4848

49+
ingest_settings:
50+
min_shards: 12
51+
4952
search_settings:
5053
default_search_fields: [severity_text, body]

quickwit/quickwit-config/src/index_config/mod.rs

Lines changed: 66 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
pub(crate) mod serialize;
1616

1717
use std::hash::{Hash, Hasher};
18+
use std::num::NonZeroUsize;
1819
use std::str::FromStr;
1920
use std::sync::Arc;
2021
use std::time::Duration;
@@ -161,6 +162,30 @@ impl Default for IndexingSettings {
161162
}
162163
}
163164

165+
/// Settings for ingestion.
166+
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, utoipa::ToSchema)]
167+
#[serde(deny_unknown_fields)]
168+
pub struct IngestSettings {
169+
/// Configures the minimum number of shards to use for ingestion.
170+
#[schema(default = 1, value_type = usize)]
171+
#[serde(default = "IngestSettings::default_min_shards")]
172+
pub min_shards: NonZeroUsize,
173+
}
174+
175+
impl IngestSettings {
176+
pub fn default_min_shards() -> NonZeroUsize {
177+
NonZeroUsize::MIN
178+
}
179+
}
180+
181+
impl Default for IngestSettings {
182+
fn default() -> Self {
183+
Self {
184+
min_shards: Self::default_min_shards(),
185+
}
186+
}
187+
}
188+
164189
#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)]
165190
#[serde(deny_unknown_fields)]
166191
pub struct SearchSettings {
@@ -251,6 +276,7 @@ pub struct IndexConfig {
251276
pub index_uri: Uri,
252277
pub doc_mapping: DocMapping,
253278
pub indexing_settings: IndexingSettings,
279+
pub ingest_settings: IngestSettings,
254280
pub search_settings: SearchSettings,
255281
pub retention_policy_opt: Option<RetentionPolicy>,
256282
}
@@ -359,8 +385,9 @@ impl IndexConfig {
359385
index_uri,
360386
doc_mapping,
361387
indexing_settings,
388+
ingest_settings: IngestSettings::default(),
362389
search_settings,
363-
retention_policy_opt: Default::default(),
390+
retention_policy_opt: None,
364391
}
365392
}
366393
}
@@ -435,10 +462,6 @@ impl crate::TestableForRegression for IndexConfig {
435462
store_source: true,
436463
tokenizers: vec![tokenizer],
437464
};
438-
let retention_policy = Some(RetentionPolicy {
439-
retention_period: "90 days".to_string(),
440-
evaluation_schedule: "daily".to_string(),
441-
});
442465
let stable_log_config = StableLogMergePolicyConfig {
443466
merge_factor: 9,
444467
max_merge_factor: 11,
@@ -456,16 +479,24 @@ impl crate::TestableForRegression for IndexConfig {
456479
resources: indexing_resources,
457480
..Default::default()
458481
};
482+
let ingest_settings = IngestSettings {
483+
min_shards: NonZeroUsize::new(12).unwrap(),
484+
};
459485
let search_settings = SearchSettings {
460486
default_search_fields: vec!["message".to_string()],
461487
};
488+
let retention_policy_opt = Some(RetentionPolicy {
489+
retention_period: "90 days".to_string(),
490+
evaluation_schedule: "daily".to_string(),
491+
});
462492
IndexConfig {
463493
index_id: "my-index".to_string(),
464494
index_uri: Uri::for_test("s3://quickwit-indexes/my-index"),
465495
doc_mapping,
466496
indexing_settings,
467-
retention_policy_opt: retention_policy,
497+
ingest_settings,
468498
search_settings,
499+
retention_policy_opt,
469500
}
470501
}
471502

@@ -474,7 +505,9 @@ impl crate::TestableForRegression for IndexConfig {
474505
assert_eq!(self.index_uri, other.index_uri);
475506
assert_eq!(self.doc_mapping, other.doc_mapping);
476507
assert_eq!(self.indexing_settings, other.indexing_settings);
508+
assert_eq!(self.ingest_settings, other.ingest_settings);
477509
assert_eq!(self.search_settings, other.search_settings);
510+
assert_eq!(self.retention_policy_opt, other.retention_policy_opt);
478511
}
479512
}
480513

@@ -488,7 +521,8 @@ pub fn build_doc_mapper(
488521
default_search_fields: search_settings.default_search_fields.clone(),
489522
legacy_type_tag: None,
490523
};
491-
Ok(Arc::new(builder.try_build()?))
524+
let doc_mapper = builder.try_build()?;
525+
Ok(Arc::new(doc_mapper))
492526
}
493527

494528
/// Validates the objects that make up an index configuration. This is a "free" function as opposed
@@ -598,6 +632,7 @@ mod tests {
598632
..Default::default()
599633
}
600634
);
635+
assert_eq!(index_config.ingest_settings.min_shards.get(), 12);
601636
assert_eq!(
602637
index_config.search_settings,
603638
SearchSettings {
@@ -640,12 +675,13 @@ mod tests {
640675
assert_eq!(index_config.doc_mapping.field_mappings[0].name, "body");
641676
assert!(!index_config.doc_mapping.store_source);
642677
assert_eq!(index_config.indexing_settings, IndexingSettings::default());
643-
assert_eq!(
644-
index_config.search_settings,
645-
SearchSettings {
646-
default_search_fields: vec!["body".to_string()],
647-
}
648-
);
678+
assert_eq!(index_config.ingest_settings, IngestSettings::default());
679+
680+
let expected_search_settings = SearchSettings {
681+
default_search_fields: vec!["body".to_string()],
682+
};
683+
assert_eq!(index_config.search_settings, expected_search_settings);
684+
assert!(index_config.retention_policy_opt.is_none());
649685
}
650686
{
651687
let index_config_filepath = get_index_config_filepath("partial-hdfs-logs.yaml");
@@ -903,4 +939,21 @@ mod tests {
903939
schedule_test_helper_fn("monthly");
904940
schedule_test_helper_fn("* * * ? * ?");
905941
}
942+
943+
#[test]
944+
fn test_ingest_settings_serde() {
945+
let ingest_settings = IngestSettings {
946+
min_shards: NonZeroUsize::MIN,
947+
};
948+
let ingest_settings_yaml = serde_yaml::to_string(&ingest_settings).unwrap();
949+
let ingest_settings_roundtrip: IngestSettings =
950+
serde_yaml::from_str(&ingest_settings_yaml).unwrap();
951+
assert_eq!(ingest_settings, ingest_settings_roundtrip);
952+
953+
let ingest_settings_yaml = r#"
954+
min_shards: 0
955+
"#;
956+
let error = serde_yaml::from_str::<IngestSettings>(ingest_settings_yaml).unwrap_err();
957+
assert!(error.to_string().contains("expected a nonzero"));
958+
}
906959
}

quickwit/quickwit-config/src/index_config/serialize.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use quickwit_proto::types::{DocMappingUid, IndexId};
2121
use serde::{Deserialize, Serialize};
2222
use tracing::info;
2323

24-
use super::validate_index_config;
24+
use super::{IngestSettings, validate_index_config};
2525
use crate::{
2626
ConfigFormat, DocMapping, IndexConfig, IndexingSettings, RetentionPolicy, SearchSettings,
2727
validate_identifier,
@@ -185,6 +185,7 @@ impl IndexConfigForSerialization {
185185
index_uri,
186186
doc_mapping: self.doc_mapping,
187187
indexing_settings: self.indexing_settings,
188+
ingest_settings: self.ingest_settings,
188189
search_settings: self.search_settings,
189190
retention_policy_opt: self.retention_policy_opt,
190191
};
@@ -231,6 +232,8 @@ pub struct IndexConfigV0_8 {
231232
#[serde(default)]
232233
pub indexing_settings: IndexingSettings,
233234
#[serde(default)]
235+
pub ingest_settings: IngestSettings,
236+
#[serde(default)]
234237
pub search_settings: SearchSettings,
235238
#[serde(rename = "retention")]
236239
#[serde(default)]
@@ -244,6 +247,7 @@ impl From<IndexConfig> for IndexConfigV0_8 {
244247
index_uri: Some(index_config.index_uri),
245248
doc_mapping: index_config.doc_mapping,
246249
indexing_settings: index_config.indexing_settings,
250+
ingest_settings: index_config.ingest_settings,
247251
search_settings: index_config.search_settings,
248252
retention_policy_opt: index_config.retention_policy_opt,
249253
}

quickwit/quickwit-config/src/index_template/mod.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use quickwit_proto::types::{DocMappingUid, IndexId};
2020
use serde::{Deserialize, Serialize};
2121
pub use serialize::{IndexTemplateV0_8, VersionedIndexTemplate};
2222

23-
use crate::index_config::validate_index_config;
23+
use crate::index_config::{IngestSettings, validate_index_config};
2424
use crate::{
2525
DocMapping, IndexConfig, IndexingSettings, RetentionPolicy, SearchSettings,
2626
validate_identifier, validate_index_id_pattern,
@@ -45,6 +45,8 @@ pub struct IndexTemplate {
4545
#[serde(default)]
4646
pub indexing_settings: IndexingSettings,
4747
#[serde(default)]
48+
pub ingest_settings: IngestSettings,
49+
#[serde(default)]
4850
pub search_settings: SearchSettings,
4951
#[serde(rename = "retention")]
5052
#[serde(default)]
@@ -72,6 +74,7 @@ impl IndexTemplate {
7274
index_uri,
7375
doc_mapping,
7476
indexing_settings: self.indexing_settings.clone(),
77+
ingest_settings: self.ingest_settings.clone(),
7578
search_settings: self.search_settings.clone(),
7679
retention_policy_opt: self.retention_policy_opt.clone(),
7780
};
@@ -128,6 +131,7 @@ impl IndexTemplate {
128131
description: Some("Test description.".to_string()),
129132
doc_mapping,
130133
indexing_settings: IndexingSettings::default(),
134+
ingest_settings: IngestSettings::default(),
131135
search_settings: SearchSettings::default(),
132136
retention_policy_opt: None,
133137
}
@@ -168,6 +172,7 @@ impl crate::TestableForRegression for IndexTemplate {
168172
description: Some("Test description.".to_string()),
169173
doc_mapping,
170174
indexing_settings: IndexingSettings::default(),
175+
ingest_settings: IngestSettings::default(),
171176
search_settings: SearchSettings::default(),
172177
retention_policy_opt: Some(RetentionPolicy {
173178
retention_period: "42 days".to_string(),

quickwit/quickwit-config/src/index_template/serialize.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use quickwit_common::uri::Uri;
1616
use serde::{Deserialize, Serialize};
1717

1818
use super::{IndexIdPattern, IndexTemplate, IndexTemplateId};
19+
use crate::index_config::IngestSettings;
1920
use crate::{DocMapping, IndexingSettings, RetentionPolicy, SearchSettings};
2021

2122
#[derive(Clone, Debug, Serialize, Deserialize, utoipa::ToSchema)]
@@ -49,6 +50,8 @@ pub struct IndexTemplateV0_8 {
4950
#[serde(default)]
5051
pub indexing_settings: IndexingSettings,
5152
#[serde(default)]
53+
pub ingest_settings: IngestSettings,
54+
#[serde(default)]
5255
pub search_settings: SearchSettings,
5356
#[serde(default)]
5457
pub retention: Option<RetentionPolicy>,
@@ -78,6 +81,7 @@ impl From<IndexTemplateV0_8> for IndexTemplate {
7881
description: index_template_v0_8.description,
7982
doc_mapping: index_template_v0_8.doc_mapping,
8083
indexing_settings: index_template_v0_8.indexing_settings,
84+
ingest_settings: index_template_v0_8.ingest_settings,
8185
search_settings: index_template_v0_8.search_settings,
8286
retention_policy_opt: index_template_v0_8.retention,
8387
}
@@ -94,6 +98,7 @@ impl From<IndexTemplate> for IndexTemplateV0_8 {
9498
description: index_template.description,
9599
doc_mapping: index_template.doc_mapping,
96100
indexing_settings: index_template.indexing_settings,
101+
ingest_settings: index_template.ingest_settings,
97102
search_settings: index_template.search_settings,
98103
retention: index_template.retention_policy_opt,
99104
}

0 commit comments

Comments
 (0)