Skip to content

Commit f429fbc

Browse files
Adding pending monitor
1 parent e7c2a38 commit f429fbc

File tree

1 file changed

+25
-12
lines changed

1 file changed

+25
-12
lines changed

src/server.rs

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ fn uri_to_spec(uri: &Uri) -> Result<ResourceSpec, SpecParseError> {
215215

216216
#[derive(Clone, Debug)]
217217
pub enum TaskStatus {
218-
Pending,
218+
Pending(f32),
219219
Error,
220220
Completed,
221221
}
@@ -348,9 +348,9 @@ impl Service {
348348
}
349349

350350
async fn serve(self: Arc<Self>, req: Request<Body>) -> Result<Response<Body>, Infallible> {
351-
match req.method() {
352-
&Method::POST => self.post(req).await,
353-
&Method::GET => self.get(req).await,
351+
match *req.method() {
352+
Method::POST => self.post(req).await,
353+
Method::GET => self.get(req).await,
354354
_ => todo!(),
355355
}
356356
}
@@ -389,7 +389,9 @@ impl Service {
389389
.unwrap()
390390
.chunks(100);
391391
let (id, hnsw) = self
392-
.process_operation_chunks(opstream, domain, commit, previous, &index_id)
392+
.process_operation_chunks(
393+
opstream, domain, commit, previous, &index_id, &task_id,
394+
)
393395
.await;
394396
self.set_index(id, hnsw.into()).await;
395397
self.clear_pending(&index_id).await;
@@ -439,6 +441,7 @@ impl Service {
439441
commit: String,
440442
previous: Option<String>,
441443
index_id: &str,
444+
task_id: &str,
442445
) -> (String, HnswIndex) {
443446
let id = create_index_name(&domain, &commit);
444447
let mut hnsw = self
@@ -448,6 +451,8 @@ impl Service {
448451
previous,
449452
})
450453
.await;
454+
self.set_task_status(task_id.to_string(), TaskStatus::Pending(0.3))
455+
.await;
451456
while let Some(structs) = opstream.next().await {
452457
let new_ops = operations_to_point_operations(
453458
&domain.clone(),
@@ -458,6 +463,8 @@ impl Service {
458463
.await;
459464
hnsw = start_indexing_from_operations(hnsw, new_ops).unwrap();
460465
}
466+
self.set_task_status(task_id.to_string(), TaskStatus::Pending(0.8))
467+
.await;
461468
let path = self.path.clone();
462469
serialize_index(path, index_id, hnsw.clone()).unwrap();
463470
(id, hnsw)
@@ -472,7 +479,7 @@ impl Service {
472479
previous,
473480
}) => {
474481
let task_id = Service::generate_task();
475-
self.set_task_status(task_id.clone(), TaskStatus::Pending)
482+
self.set_task_status(task_id.clone(), TaskStatus::Pending(0.0))
476483
.await;
477484
match self.start_indexing(domain, commit, previous, task_id.clone()) {
478485
Ok(()) => Ok(Response::builder().body(task_id.into()).unwrap()),
@@ -500,13 +507,19 @@ impl Service {
500507
}
501508
Ok(ResourceSpec::CheckTask { task_id }) => {
502509
if let Some(state) = self.get_task_status(&task_id).await {
503-
Ok(Response::builder()
504-
.body(format!("{:?}", state).into())
505-
.unwrap())
510+
match state {
511+
TaskStatus::Pending(f) => {
512+
Ok(Response::builder().body(format!("{}", f).into()).unwrap())
513+
}
514+
TaskStatus::Error => Ok(Response::builder()
515+
.body(format!("{:?}", state).into())
516+
.unwrap()),
517+
TaskStatus::Completed => {
518+
Ok(Response::builder().body(format!("{}", 1.0).into()).unwrap())
519+
}
520+
}
506521
} else {
507-
Ok(Response::builder()
508-
.body(format!("{:?}", TaskStatus::Completed).into())
509-
.unwrap())
522+
Ok(Response::builder().body(format!("{}", 1.0).into()).unwrap())
510523
}
511524
}
512525
Ok(ResourceSpec::DuplicateCandidates {

0 commit comments

Comments
 (0)