Skip to content

Commit 060f5a4

Browse files
rrooijMatthijs van Otterdijk
andauthored
Add logging and fix events (#11)
* log every request * slightly more robust error handling in start_indexing * return a 404 for unknown task status * Fix formatting --------- Co-authored-by: Matthijs van Otterdijk <matthijs@terminusdb.com>
1 parent ade4338 commit 060f5a4

File tree

3 files changed

+132
-16
lines changed

3 files changed

+132
-16
lines changed

Cargo.lock

Lines changed: 82 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ packed_simd = {version = "0.3.8", optional=true}
3131
aligned_box = "0.2"
3232
tiktoken-rs = "0.4"
3333
itertools = "0.10"
34+
chrono = "0.4.26"
3435

3536
[features]
3637
simd = ["packed_simd"]

src/server.rs

Lines changed: 49 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,12 @@ impl Service {
395395
}
396396

397397
async fn serve(self: Arc<Self>, req: Request<Body>) -> Result<Response<Body>, Infallible> {
398+
eprintln!(
399+
"{:?}: {:?} {:?}",
400+
chrono::offset::Local::now(),
401+
req.method(),
402+
req.uri()
403+
);
398404
match *req.method() {
399405
Method::POST => self.post(req).await,
400406
Method::GET => self.get(req).await,
@@ -414,6 +420,32 @@ impl Service {
414420
}
415421
}
416422

423+
async fn start_indexing_inner(
424+
self: Arc<Self>,
425+
domain: String,
426+
commit: String,
427+
previous: Option<String>,
428+
task_id: &str,
429+
api_key: String,
430+
index_id: &str,
431+
content_endpoint: String,
432+
) -> Result<(String, HnswIndex), IndexError> {
433+
let internal_task_id = task_id.clone();
434+
let opstream = get_operations_from_content_endpoint(
435+
content_endpoint.to_string(),
436+
self.user_forward_header.clone(),
437+
domain.clone(),
438+
commit.clone(),
439+
previous.clone(),
440+
)
441+
.await?
442+
.chunks(100);
443+
self.process_operation_chunks(
444+
opstream, domain, commit, previous, index_id, task_id, &api_key,
445+
)
446+
.await
447+
}
448+
417449
fn start_indexing(
418450
self: Arc<Self>,
419451
domain: String,
@@ -428,36 +460,39 @@ impl Service {
428460
tokio::spawn(async move {
429461
let index_id = create_index_name(&domain, &commit);
430462
if self.test_and_set_pending(index_id.clone()).await {
431-
let opstream = get_operations_from_content_endpoint(
432-
content_endpoint.to_string(),
433-
self.user_forward_header.clone(),
434-
domain.clone(),
435-
commit.clone(),
436-
previous.clone(),
437-
)
438-
.await
439-
.unwrap()
440-
.chunks(100);
441463
match self
442-
.process_operation_chunks(
443-
opstream, domain, commit, previous, &index_id, &task_id, &api_key,
464+
.clone()
465+
.start_indexing_inner(
466+
domain,
467+
commit,
468+
previous,
469+
&task_id,
470+
api_key,
471+
&index_id,
472+
content_endpoint,
444473
)
445474
.await
446475
{
447476
Ok((id, hnsw)) => {
448477
self.set_index(id, hnsw.into()).await;
478+
self.set_task_status(task_id, TaskStatus::Completed).await;
449479
self.clear_pending(&index_id).await;
450480
}
451481
Err(err) => {
482+
eprintln!(
483+
"{:?}: error while indexing: {:?}",
484+
chrono::offset::Local::now(),
485+
err
486+
);
452487
self.set_task_status(
453488
internal_task_id,
454489
TaskStatus::Error(err.to_string()),
455490
)
456491
.await;
492+
self.clear_pending(&index_id).await;
457493
}
458494
}
459495
}
460-
self.set_task_status(task_id, TaskStatus::Completed).await;
461496
});
462497
Ok(())
463498
} else {
@@ -577,7 +612,7 @@ impl Service {
577612
}
578613
}
579614
} else {
580-
Ok(Response::builder().body(format!("{}", 1.0).into()).unwrap())
615+
Ok(Response::builder().status(404).body(Body::empty()).unwrap())
581616
}
582617
}
583618
Ok(ResourceSpec::DuplicateCandidates {

0 commit comments

Comments
 (0)