Skip to content

Commit 622adc1

Browse files
Tanuj NayakTanuj Nayak
authored andcommitted
more testing pending
1 parent a2e4c71 commit 622adc1

File tree

6 files changed

+199
-33
lines changed

6 files changed

+199
-33
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/worker/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ async-trait = { workspace = true }
3131
roaring = { workspace = true }
3232
figment = { workspace = true }
3333
futures = { workspace = true }
34+
opentelemetry = { workspace = true }
3435
parking_lot = { workspace = true }
3536
tracing = { workspace = true }
3637
tokio = { workspace = true }

rust/worker/src/compactor/compaction_manager.rs

Lines changed: 38 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -49,14 +49,19 @@ use tracing::Instrument;
4949
use tracing::Span;
5050
use uuid::Uuid;
5151

52-
type BoxedFuture =
53-
Pin<Box<dyn Future<Output = Result<CompactionResponse, Box<dyn ChromaError>>> + Send>>;
52+
type CompactionOutput = Result<CompactionResponse, Box<dyn ChromaError>>;
53+
type BoxedFuture = Pin<Box<dyn Future<Output = CompactionOutput> + Send>>;
5454

5555
struct CompactionTask {
5656
collection_id: CollectionUuid,
5757
future: BoxedFuture,
5858
}
5959

60+
struct CompactionTaskCompletion {
61+
collection_id: CollectionUuid,
62+
result: CompactionOutput,
63+
}
64+
6065
#[derive(Clone)]
6166
pub(crate) struct CompactionManagerContext {
6267
system: System,
@@ -85,7 +90,7 @@ pub(crate) struct CompactionManager {
8590
scheduler: Scheduler,
8691
context: CompactionManagerContext,
8792
compact_awaiter_channel: mpsc::Sender<CompactionTask>,
88-
compact_awaiter_completion_channel: mpsc::UnboundedReceiver<CompactionResponse>,
93+
compact_awaiter_completion_channel: mpsc::UnboundedReceiver<CompactionTaskCompletion>,
8994
compact_awaiter: tokio::task::JoinHandle<()>,
9095
on_next_memberlist_signal: Option<oneshot::Sender<()>>,
9196
}
@@ -129,7 +134,7 @@ impl CompactionManager {
129134
// Using unbounded channel for the completion channel as its size
130135
// is bounded by max_concurrent_jobs. It's far more important for the
131136
// completion channel to not block or drop messages.
132-
let (completion_tx, completion_rx) = mpsc::unbounded_channel::<CompactionResponse>();
137+
let (completion_tx, completion_rx) = mpsc::unbounded_channel::<CompactionTaskCompletion>();
133138
let compact_awaiter = tokio::spawn(async {
134139
compact_awaiter_loop(compact_awaiter_rx, completion_tx).await;
135140
});
@@ -237,11 +242,18 @@ impl CompactionManager {
237242
self.context.dispatcher = Some(dispatcher);
238243
}
239244

240-
fn process_completions(&mut self) -> Vec<CompactionResponse> {
245+
fn process_completions(&mut self) -> Vec<CompactionTaskCompletion> {
241246
let compact_awaiter_completion_channel = &mut self.compact_awaiter_completion_channel;
242247
let mut completed_collections = Vec::new();
243248
while let Ok(resp) = compact_awaiter_completion_channel.try_recv() {
244-
self.scheduler.complete_collection(resp.collection_id);
249+
match resp.result {
250+
Ok(_) => {
251+
self.scheduler.succeed_collection(resp.collection_id);
252+
}
253+
Err(_) => {
254+
self.scheduler.fail_collection(resp.collection_id);
255+
}
256+
}
245257
completed_collections.push(resp);
246258
}
247259
completed_collections
@@ -349,6 +361,7 @@ impl Configurable<(CompactionServiceConfig, System)> for CompactionManager {
349361
Box::<dyn AssignmentPolicy>::try_from_config(assignment_policy_config, registry)
350362
.await?;
351363
let job_expiry_seconds = config.compactor.job_expiry_seconds;
364+
let max_failure_count = config.compactor.max_failure_count;
352365
let scheduler = Scheduler::new(
353366
my_ip,
354367
log.clone(),
@@ -359,6 +372,7 @@ impl Configurable<(CompactionServiceConfig, System)> for CompactionManager {
359372
assignment_policy,
360373
disabled_collections,
361374
job_expiry_seconds,
375+
max_failure_count,
362376
);
363377

364378
let blockfile_provider = BlockfileProvider::try_from_config(
@@ -405,25 +419,31 @@ impl Configurable<(CompactionServiceConfig, System)> for CompactionManager {
405419

406420
async fn compact_awaiter_loop(
407421
mut job_rx: mpsc::Receiver<CompactionTask>,
408-
completion_tx: mpsc::UnboundedSender<CompactionResponse>,
422+
completion_tx: mpsc::UnboundedSender<CompactionTaskCompletion>,
409423
) {
410424
let mut futures = FuturesUnordered::new();
411425
loop {
412426
select! {
413427
Some(job) = job_rx.recv() => {
414428
futures.push(async move {
415-
let _ = AssertUnwindSafe(job.future).catch_unwind().await;
416-
CompactionResponse {
417-
collection_id: job.collection_id,
429+
let result = AssertUnwindSafe(job.future).catch_unwind().await;
430+
match result {
431+
Ok(response) => CompactionTaskCompletion {
432+
collection_id: job.collection_id,
433+
result: response,
434+
},
435+
Err(_) => CompactionTaskCompletion {
436+
collection_id: job.collection_id,
437+
result: Err(Box::new(CompactionError::FailedToCompact)),
438+
},
418439
}
419440
});
420441
}
421-
Some(compaction_response) = futures.next() => {
422-
match completion_tx.send(compaction_response) {
442+
Some(completed_job) = futures.next() => {
443+
let collection_id = completed_job.collection_id;
444+
match completion_tx.send(completed_job) {
423445
Ok(_) => {},
424-
Err(_) => {
425-
tracing::error!("Failed to send compaction response");
426-
}
446+
Err(_) => tracing::error!("Failed to record compaction result for collection {}", collection_id),
427447
}
428448
}
429449
else => {
@@ -771,6 +791,7 @@ mod tests {
771791
let fetch_log_batch_size = 100;
772792
let purge_dirty_log_timeout_seconds = 60;
773793
let job_expiry_seconds = 3600;
794+
let max_failure_count = 3;
774795

775796
// Set assignment policy
776797
let mut assignment_policy = Box::new(RendezvousHashingAssignmentPolicy::default());
@@ -786,6 +807,7 @@ mod tests {
786807
assignment_policy,
787808
HashSet::new(),
788809
job_expiry_seconds,
810+
max_failure_count,
789811
);
790812
// Set memberlist
791813
scheduler.set_memberlist(vec![my_member.clone()]);
@@ -863,6 +885,7 @@ mod tests {
863885
completed_compactions.extend(
864886
completed
865887
.iter()
888+
.filter(|c| c.result.is_ok())
866889
.map(|c| c.collection_id)
867890
.collect::<Vec<CollectionUuid>>(),
868891
);

rust/worker/src/compactor/config.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ pub struct CompactorConfig {
2222
pub fetch_log_batch_size: u32,
2323
#[serde(default = "CompactorConfig::default_purge_dirty_log_timeout_seconds")]
2424
pub purge_dirty_log_timeout_seconds: u64,
25+
#[serde(default = "CompactorConfig::default_max_failure_count")]
26+
pub max_failure_count: u8,
2527
}
2628

2729
impl CompactorConfig {
@@ -64,6 +66,10 @@ impl CompactorConfig {
6466
fn default_purge_dirty_log_timeout_seconds() -> u64 {
6567
60
6668
}
69+
70+
fn default_max_failure_count() -> u8 {
71+
5
72+
}
6773
}
6874

6975
impl Default for CompactorConfig {
@@ -80,6 +86,7 @@ impl Default for CompactorConfig {
8086
fetch_log_batch_size: CompactorConfig::default_fetch_log_batch_size(),
8187
purge_dirty_log_timeout_seconds:
8288
CompactorConfig::default_purge_dirty_log_timeout_seconds(),
89+
max_failure_count: CompactorConfig::default_max_failure_count(),
8390
}
8491
}
8592
}

0 commit comments

Comments
 (0)