Skip to content

Commit c0e3bfa

Browse files
Improve error reporting (#7)
* Improve error reporting * Help clippy
1 parent 8703cb1 commit c0e3bfa

File tree

3 files changed

+43
-25
lines changed

3 files changed

+43
-25
lines changed

src/indexer.rs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#![allow(unused, dead_code)]
22
use crate::{
3-
openai::embeddings_for,
3+
openai::{embeddings_for, EmbeddingError},
44
server::Operation,
55
vecmath::{self, Embedding},
66
vectors::{LoadedVec, VectorStore},
@@ -93,9 +93,9 @@ pub async fn operations_to_point_operations(
9393
vector_store: &VectorStore,
9494
structs: Vec<Result<Operation, std::io::Error>>,
9595
key: &str,
96-
) -> Vec<PointOperation> {
96+
) -> Result<Vec<PointOperation>, IndexError> {
9797
// Should not unwrap here -
98-
let ops: Vec<Operation> = structs.into_iter().map(|ro| ro.unwrap()).collect();
98+
let ops: Vec<Operation> = structs.into_iter().collect::<Result<Vec<_>, _>>()?;
9999
let tuples: Vec<(Op, String, String)> = ops
100100
.iter()
101101
.flat_map(|o| match o {
@@ -112,12 +112,10 @@ pub async fn operations_to_point_operations(
112112
let vecs: Vec<Embedding> = if strings.is_empty() {
113113
Vec::new()
114114
} else {
115-
embeddings_for(key, &strings).await.unwrap()
115+
embeddings_for(key, &strings).await?
116116
};
117-
let domain = vector_store.get_domain(domain).unwrap();
118-
let loaded_vecs = vector_store
119-
.add_and_load_vecs(&domain, vecs.iter())
120-
.unwrap();
117+
let domain = vector_store.get_domain(domain)?;
118+
let loaded_vecs = vector_store.add_and_load_vecs(&domain, vecs.iter())?;
121119
let mut new_ops: Vec<PointOperation> = zip(tuples, loaded_vecs)
122120
.map(|((op, _, id), vec)| match op {
123121
Op::Insert => PointOperation::Insert {
@@ -136,7 +134,7 @@ pub async fn operations_to_point_operations(
136134
})
137135
.collect();
138136
new_ops.append(&mut delete_ops);
139-
new_ops
137+
Ok(new_ops)
140138
}
141139

142140
pub struct IndexIdentifier {
@@ -145,9 +143,14 @@ pub struct IndexIdentifier {
145143
pub domain: String,
146144
}
147145

148-
#[derive(Debug)]
149-
enum IndexError {
146+
#[derive(Debug, Error)]
147+
pub enum IndexError {
148+
#[error("Indexing failed")]
150149
Failed,
150+
#[error("Indexing failed with io error: {0:?}")]
151+
IoError(#[from] std::io::Error),
152+
#[error("Embedding error: {0:?}")]
153+
EmbeddingError(#[from] EmbeddingError),
151154
}
152155

153156
/*

src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
216216
for structs in opstream {
217217
let structs: Vec<_> = structs.collect();
218218
let new_ops =
219-
operations_to_point_operations(&domain.clone(), &store, structs, &key).await;
219+
operations_to_point_operations(&domain.clone(), &store, structs, &key).await?;
220220
hnsw = start_indexing_from_operations(hnsw, new_ops).unwrap();
221221
}
222222
let index_id = create_index_name(&domain, &commit);

src/server.rs

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use crate::indexer::deserialize_index;
3939
use crate::indexer::operations_to_point_operations;
4040
use crate::indexer::search;
4141
use crate::indexer::serialize_index;
42+
use crate::indexer::IndexError;
4243
use crate::indexer::Point;
4344
use crate::indexer::PointOperation;
4445
use crate::indexer::SearchError;
@@ -244,7 +245,7 @@ fn uri_to_spec(uri: &Uri) -> Result<ResourceSpec, SpecParseError> {
244245
#[derive(Clone, Debug)]
245246
pub enum TaskStatus {
246247
Pending(f32),
247-
Error,
248+
Error(String),
248249
Completed,
249250
}
250251

@@ -422,6 +423,7 @@ impl Service {
422423
api_key: String,
423424
) -> Result<(), StartIndexError> {
424425
let content_endpoint = self.content_endpoint.clone();
426+
let internal_task_id = task_id.clone();
425427
if let Some(content_endpoint) = content_endpoint {
426428
tokio::spawn(async move {
427429
let index_id = create_index_name(&domain, &commit);
@@ -436,13 +438,24 @@ impl Service {
436438
.await
437439
.unwrap()
438440
.chunks(100);
439-
let (id, hnsw) = self
441+
match self
440442
.process_operation_chunks(
441443
opstream, domain, commit, previous, &index_id, &task_id, &api_key,
442444
)
443-
.await;
444-
self.set_index(id, hnsw.into()).await;
445-
self.clear_pending(&index_id).await;
445+
.await
446+
{
447+
Ok((id, hnsw)) => {
448+
self.set_index(id, hnsw.into()).await;
449+
self.clear_pending(&index_id).await;
450+
}
451+
Err(err) => {
452+
self.set_task_status(
453+
internal_task_id,
454+
TaskStatus::Error(err.to_string()),
455+
)
456+
.await;
457+
}
458+
}
446459
}
447460
self.set_task_status(task_id, TaskStatus::Completed).await;
448461
});
@@ -471,6 +484,7 @@ impl Service {
471484
Ok(())
472485
}
473486

487+
#[allow(clippy::too_many_arguments)]
474488
async fn process_operation_chunks(
475489
self: &Arc<Self>,
476490
mut opstream: futures::stream::Chunks<
@@ -482,7 +496,7 @@ impl Service {
482496
index_id: &str,
483497
task_id: &str,
484498
api_key: &str,
485-
) -> (String, HnswIndex) {
499+
) -> Result<(String, HnswIndex), IndexError> {
486500
let id = create_index_name(&domain, &commit);
487501
let mut hnsw = self
488502
.load_hnsw_for_indexing(IndexIdentifier {
@@ -500,14 +514,14 @@ impl Service {
500514
structs,
501515
api_key,
502516
)
503-
.await;
504-
hnsw = start_indexing_from_operations(hnsw, new_ops).unwrap();
517+
.await?;
518+
hnsw = start_indexing_from_operations(hnsw, new_ops)?;
505519
}
506520
self.set_task_status(task_id.to_string(), TaskStatus::Pending(0.8))
507521
.await;
508522
let path = self.path.clone();
509-
serialize_index(path, index_id, hnsw.clone()).unwrap();
510-
(id, hnsw)
523+
serialize_index(path, index_id, hnsw.clone())?;
524+
Ok((id, hnsw))
511525
}
512526

513527
async fn get_start_index(
@@ -526,7 +540,7 @@ impl Service {
526540

527541
async fn get(self: Arc<Self>, req: Request<Body>) -> Result<Response<Body>, Infallible> {
528542
let uri = req.uri();
529-
match dbg!(uri_to_spec(uri)) {
543+
match uri_to_spec(uri) {
530544
Ok(ResourceSpec::StartIndex {
531545
domain,
532546
commit,
@@ -557,8 +571,9 @@ impl Service {
557571
TaskStatus::Pending(f) => {
558572
Ok(Response::builder().body(format!("{}", f).into()).unwrap())
559573
}
560-
TaskStatus::Error => Ok(Response::builder()
561-
.body(format!("{:?}", state).into())
574+
TaskStatus::Error(msg) => Ok(Response::builder()
575+
.status(StatusCode::INTERNAL_SERVER_ERROR)
576+
.body(format!("{:?}", msg).into())
562577
.unwrap()),
563578
TaskStatus::Completed => {
564579
Ok(Response::builder().body(format!("{}", 1.0).into()).unwrap())

0 commit comments

Comments
 (0)