Skip to content

Commit b3c9181

Browse files
cutecutecatusamoi
andauthored
fix: optiming threads deadlock (#453)
* fix: optiming threads deadlock Signed-off-by: cutecutecat <junyuchen@tensorchord.ai> * fix by comments Signed-off-by: cutecutecat <junyuchen@tensorchord.ai> * Update crates/rayon/src/lib.rs test clippy * Update crates/rayon/src/lib.rs test clippy * fix monitor status Signed-off-by: cutecutecat <junyuchen@tensorchord.ai> --------- Signed-off-by: cutecutecat <junyuchen@tensorchord.ai> Co-authored-by: usamoi <usamoi@outlook.com>
1 parent 97ce096 commit b3c9181

File tree

9 files changed

+99
-17
lines changed

9 files changed

+99
-17
lines changed

.typos.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11
[default.extend-words]
22
ND = "ND"
3+
DUR = "DUR"
4+
ANS = "ANS"
5+
6+
[default]
7+
extend-ignore-re = [
8+
# Latex formula
9+
"\\$.+?\\$",
10+
]
311

412
[files]
513
extend-exclude = ["vendor/pg_config/*.txt", "vendor/pgrx_binding/*.rs"]

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ pgvecto.rs is a Postgres extension that provides vector similarity search functi
2626
| | 🔪 Matryoshka embeddings | Subvector indexing, like vector[0:256], for enhanced Matryoshka embeddings. |
2727
| | ⬆️ Extended Vector Length | Vector lengths up to 65535 supported, ideal for the latest cutting-edge models. |
2828
| **System Performance** | 🚀 Production Ready | Battle-tested database ecosystem integrated with PostgreSQL. |
29-
| | ⚙️ High Availability | Logical replication support to ensure high availbility. |
29+
| | ⚙️ High Availability | Logical replication support to ensure high availability. |
3030
| | 💡 Resource Efficient | Efficient attribute storage leveraging PostgreSQL. |
3131
| **Security & Permissions** | 🔒 Permission Control | Easy access control like read-only roles, powered by PostgreSQL. |
3232

crates/index/src/optimizing/indexing.rs

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ pub use base::index::*;
77
use base::operator::Borrowed;
88
pub use base::search::*;
99
pub use base::vector::*;
10-
use crossbeam::channel::RecvError;
10+
use crossbeam::channel::TryRecvError;
1111
use crossbeam::channel::{bounded, Receiver, RecvTimeoutError, Sender};
1212
use std::cmp::Reverse;
1313
use std::convert::Infallible;
@@ -99,26 +99,32 @@ impl<O: Op> OptimizerIndexing<O> {
9999
}),
100100
)
101101
}
102-
fn main(self, shutdown: Receiver<Infallible>) {
102+
fn main(self, shutdown_rx: Receiver<Infallible>) {
103103
let index = self.index;
104104
loop {
105105
let view = index.view();
106106
let threads = view.flexible.optimizing_threads;
107+
let (finish_tx, finish_rx) = bounded::<Infallible>(1);
107108
rayon::ThreadPoolBuilder::new()
108109
.num_threads(threads as usize)
109110
.build_scoped(|pool| {
110111
std::thread::scope(|scope| {
111-
scope.spawn(|| match shutdown.recv() {
112-
Ok(never) => match never {},
113-
Err(RecvError) => {
114-
pool.stop();
112+
let handler = scope.spawn(|| {
113+
let status = monitor(&finish_rx, &shutdown_rx);
114+
match status {
115+
MonitorStatus::Finished => (),
116+
MonitorStatus::Shutdown => pool.stop(),
115117
}
116118
});
117-
let _ = pool.install(|| optimizing_indexing(index.clone()));
119+
pool.install(|| {
120+
let _finish_tx = finish_tx;
121+
let _ = optimizing_indexing(index.clone());
122+
});
123+
let _ = handler.join();
118124
})
119125
})
120126
.unwrap();
121-
match shutdown.recv_timeout(std::time::Duration::from_secs(60)) {
127+
match shutdown_rx.recv_timeout(std::time::Duration::from_secs(60)) {
122128
Ok(never) => match never {},
123129
Err(RecvTimeoutError::Disconnected) => return,
124130
Err(RecvTimeoutError::Timeout) => (),
@@ -127,6 +133,32 @@ impl<O: Op> OptimizerIndexing<O> {
127133
}
128134
}
129135

136+
pub enum MonitorStatus {
137+
Finished,
138+
Shutdown,
139+
}
140+
141+
/// Monitor the internal finish and the external shutdown of `optimizing_indexing`
142+
fn monitor(finish_rx: &Receiver<Infallible>, shutdown_rx: &Receiver<Infallible>) -> MonitorStatus {
143+
let timeout = std::time::Duration::from_secs(1);
144+
loop {
145+
match finish_rx.try_recv() {
146+
Ok(never) => match never {},
147+
Err(TryRecvError::Disconnected) => {
148+
return MonitorStatus::Finished;
149+
}
150+
Err(TryRecvError::Empty) => (),
151+
}
152+
match shutdown_rx.recv_timeout(timeout) {
153+
Ok(never) => match never {},
154+
Err(RecvTimeoutError::Disconnected) => {
155+
return MonitorStatus::Shutdown;
156+
}
157+
Err(RecvTimeoutError::Timeout) => (),
158+
}
159+
}
160+
}
161+
130162
enum Seg<O: Op> {
131163
Sealed(Arc<SealedSegment<O>>),
132164
Growing(Arc<GrowingSegment<O>>),

crates/rayon/src/lib.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#![feature(thread_local)]
22

33
use rayoff as rayon;
4-
use std::cell::OnceCell;
4+
use std::cell::RefCell;
55
use std::panic::AssertUnwindSafe;
66
use std::sync::atomic::AtomicBool;
77
use std::sync::atomic::Ordering;
@@ -51,6 +51,15 @@ impl ThreadPoolBuilder {
5151
let stop = Arc::new(AtomicBool::new(false));
5252
match std::panic::catch_unwind(AssertUnwindSafe(|| {
5353
self.builder
54+
.start_handler({
55+
let stop = stop.clone();
56+
move |_| {
57+
STOP.replace(Some(stop.clone()));
58+
}
59+
})
60+
.exit_handler(|_| {
61+
STOP.take();
62+
})
5463
.panic_handler(|e| {
5564
if e.downcast_ref::<CheckPanic>().is_some() {
5665
return;
@@ -60,9 +69,6 @@ impl ThreadPoolBuilder {
6069
.build_scoped(
6170
|thread| thread.run(),
6271
|pool| {
63-
pool.broadcast(|_| {
64-
STOP.set(stop.clone()).unwrap();
65-
});
6672
let pool = ThreadPool::new(stop.clone(), pool);
6773
f(&pool)
6874
},
@@ -105,12 +111,12 @@ impl<'a> ThreadPool<'a> {
105111
}
106112

107113
#[thread_local]
108-
static STOP: OnceCell<Arc<AtomicBool>> = OnceCell::new();
114+
static STOP: RefCell<Option<Arc<AtomicBool>>> = RefCell::new(None);
109115

110116
struct CheckPanic;
111117

112118
pub fn check() {
113-
if let Some(stop) = STOP.get() {
119+
if let Some(stop) = STOP.borrow().as_ref() {
114120
if stop.load(Ordering::Relaxed) {
115121
std::panic::panic_any(CheckPanic);
116122
}

tests/crash/kill.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,4 @@ def process_filter(p: psutil.Process) -> bool:
5353
if len(procs) == 1:
5454
logging.info(f"Background worker recreated {pids}")
5555
break
56-
time.sleep(1)
56+
time.sleep(1)

tests/crash/restore.slt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,7 @@ t
1111
query I
1212
SELECT COUNT(1) FROM (SELECT 1 FROM t ORDER BY val <-> '[0.5,0.5,0.5]' limit 10) t2;
1313
----
14-
10
14+
10
15+
16+
statement ok
17+
DROP TABLE t;

tests/sealing/check.slt

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
query I
2+
SELECT idx_indexing FROM pg_vector_index_stat WHERE indexname = 'i';
3+
----
4+
f
5+
6+
query I
7+
SELECT idx_growing FROM pg_vector_index_stat WHERE indexname = 'i';
8+
----
9+
{}
10+
11+
query I
12+
SELECT idx_sealed FROM pg_vector_index_stat WHERE indexname = 'i';
13+
----
14+
{1000}
15+
16+
statement ok
17+
DROP TABLE t;

tests/sealing/create.slt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
statement ok
2+
CREATE TABLE t (val vector(3));
3+
4+
statement ok
5+
INSERT INTO t (val) SELECT ARRAY[random(), random(), random()]::real[] FROM generate_series(1, 1000);
6+
7+
statement ok
8+
CREATE INDEX i ON t USING vectors (val vector_l2_ops)
9+
WITH (options = "[indexing.hnsw]");

tests/sealing/test.sh

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
#!/usr/bin/env bash
2+
set -e
3+
4+
# Test the background threads `optimizing.indexing` and `optimizing.sealing` working properly
5+
sqllogictest -u runner -d runner $(dirname $0)/create.slt
6+
sleep 240
7+
sqllogictest -u runner -d runner $(dirname $0)/check.slt

0 commit comments

Comments
 (0)