Skip to content

Commit 0904852

Browse files
committed
feat: add wasm-bindgen-futures runtime
Add the `wasm-bindgen-futures` webtime, with support for `wasm32-unknown-known`. Tests can be run with `wasm-pack test --node -- --no-default-features --features asyncdb-wasm-bindgen-futures`
1 parent 7fb3e23 commit 0904852

File tree

5 files changed

+251
-5
lines changed

5 files changed

+251
-5
lines changed

Cargo.toml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,25 @@ fs2 = { optional = true, version = "0.4.3" }
2626

2727
tokio = { optional = true, features = ["rt", "sync"], version = "1.39.3" }
2828
async-std = { optional = true, version = "1.12.0" }
29+
wasm-bindgen-futures = { optional = true, version = "0.4.24" }
30+
getrandom = { optional = true, version = "0.2.15", features = ["js"] }
2931

3032
[features]
31-
default = ["fs"]
33+
default = ["fs", "asyncdb-wasm-bindgen-futures"]
3234
async = ["asyncdb-tokio"]
3335
asyncdb-tokio = ["tokio"]
3436
asyncdb-async-std = ["async-std"]
37+
asyncdb-wasm-bindgen-futures = [
38+
"wasm-bindgen-futures",
39+
"async-std/async-channel",
40+
"getrandom",
41+
]
3542
fs = ["errno", "fs2"]
3643

3744
[dev-dependencies]
3845
time-test = "0.2"
3946
bencher = "0.1"
47+
wasm-bindgen-test = "0.3.0"
4048

4149
[[bench]]
4250
name = "maps_bench"

src/asyncdb.rs

Lines changed: 78 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
use std::collections::hash_map::HashMap;
22

33
use crate::{
4-
send_response, send_response_result, AsyncDB, Message, Result, Status, StatusCode, WriteBatch,
5-
DB,
4+
send_response, send_response_result, snapshot::Snapshot, AsyncDB, Message, Result, Status,
5+
StatusCode, WriteBatch, DB,
66
};
77

88
pub(crate) const CHANNEL_BUFFER_SIZE: usize = 32;
99

1010
#[derive(Clone, Copy)]
11-
pub struct SnapshotRef(usize);
11+
pub struct SnapshotRef(pub(crate) usize);
1212

1313
/// A request sent to the database thread.
1414
pub(crate) enum Request {
@@ -151,6 +151,78 @@ impl AsyncDB {
151151
}
152152
}
153153

154+
pub(crate) fn match_message(
155+
db: &mut DB,
156+
mut recv: impl ReceiverExt<Message>,
157+
snapshots: &mut HashMap<usize, Snapshot>,
158+
snapshot_counter: &mut usize,
159+
message: Message,
160+
) {
161+
match message.req {
162+
Request::Close => {
163+
send_response(message.resp_channel, Response::OK);
164+
recv.close();
165+
return;
166+
}
167+
Request::Put { key, val } => {
168+
let ok = db.put(&key, &val);
169+
send_response_result(message.resp_channel, ok);
170+
}
171+
Request::Delete { key } => {
172+
let ok = db.delete(&key);
173+
send_response_result(message.resp_channel, ok);
174+
}
175+
Request::Write { batch, sync } => {
176+
let ok = db.write(batch, sync);
177+
send_response_result(message.resp_channel, ok);
178+
}
179+
Request::Flush => {
180+
let ok = db.flush();
181+
send_response_result(message.resp_channel, ok);
182+
}
183+
Request::GetAt { snapshot, key } => {
184+
let snapshot_id = snapshot.0;
185+
if let Some(snapshot) = snapshots.get(&snapshot_id) {
186+
let ok = db.get_at(snapshot, &key);
187+
match ok {
188+
Err(e) => {
189+
send_response(message.resp_channel, Response::Error(e));
190+
}
191+
Ok(v) => {
192+
send_response(message.resp_channel, Response::Value(v));
193+
}
194+
};
195+
} else {
196+
send_response(
197+
message.resp_channel,
198+
Response::Error(Status {
199+
code: StatusCode::AsyncError,
200+
err: "Unknown snapshot reference: this is a bug".to_string(),
201+
}),
202+
);
203+
}
204+
}
205+
Request::Get { key } => {
206+
let r = db.get(&key);
207+
send_response(message.resp_channel, Response::Value(r));
208+
}
209+
Request::GetSnapshot => {
210+
snapshots.insert(*snapshot_counter, db.get_snapshot());
211+
let sref = SnapshotRef(*snapshot_counter);
212+
*snapshot_counter += 1;
213+
send_response(message.resp_channel, Response::Snapshot(sref));
214+
}
215+
Request::DropSnapshot { snapshot } => {
216+
snapshots.remove(&snapshot.0);
217+
send_response_result(message.resp_channel, Ok(()));
218+
}
219+
Request::CompactRange { from, to } => {
220+
let ok = db.compact_range(&from, &to);
221+
send_response_result(message.resp_channel, ok);
222+
}
223+
}
224+
}
225+
154226
pub(crate) fn run_server(mut db: DB, mut recv: impl ReceiverExt<Message>) {
155227
let mut snapshots = HashMap::new();
156228
let mut snapshot_counter: usize = 0;
@@ -225,5 +297,8 @@ impl AsyncDB {
225297

226298
pub(crate) trait ReceiverExt<T> {
227299
fn blocking_recv(&mut self) -> Option<T>;
300+
async fn recv(&mut self) -> Option<T> {
301+
self.blocking_recv()
302+
}
228303
fn close(&mut self);
229304
}

src/asyncdb_async_std.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ pub(crate) struct Message {
1111
pub(crate) req: Request,
1212
pub(crate) resp_channel: channel::Sender<Response>,
1313
}
14+
1415
/// `AsyncDB` makes it easy to use LevelDB in a async-std runtime.
1516
/// The methods follow very closely the main API (see `DB` type). Iteration is not yet implemented.
1617
#[derive(Clone)]

src/asyncdb_wasm_bindgen_futures.rs

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
use std::collections::HashMap;
2+
use std::path::Path;
3+
4+
use async_std::channel::{self, TryRecvError};
5+
use wasm_bindgen_futures::spawn_local;
6+
7+
use crate::asyncdb::{ReceiverExt, Request, Response, CHANNEL_BUFFER_SIZE};
8+
use crate::snapshot::Snapshot;
9+
use crate::{Options, Result, Status, StatusCode, DB};
10+
11+
pub(crate) struct Message {
12+
pub(crate) req: Request,
13+
pub(crate) resp_channel: channel::Sender<Response>,
14+
}
15+
16+
/// `AsyncDB` makes it easy to use LevelDB in a async-std runtime.
17+
/// The methods follow very closely the main API (see `DB` type). Iteration is not yet implemented.
18+
#[derive(Clone)]
19+
pub struct AsyncDB {
20+
shutdown: channel::Sender<()>,
21+
send: channel::Sender<Message>,
22+
}
23+
24+
impl AsyncDB {
25+
/// Create a new or open an existing database.
26+
pub fn new<P: AsRef<Path>>(name: P, opts: Options) -> Result<AsyncDB> {
27+
let db = DB::open(name, opts)?;
28+
29+
let (send, recv) = channel::bounded(CHANNEL_BUFFER_SIZE);
30+
let (shutdown, shutdown_recv) = channel::bounded(1);
31+
32+
spawn_local(async move {
33+
AsyncDB::run_server_async(db, recv, shutdown_recv, HashMap::new(), 0).await;
34+
});
35+
36+
Ok(AsyncDB { shutdown, send })
37+
}
38+
39+
pub(crate) async fn process_request(&self, req: Request) -> Result<Response> {
40+
let (tx, rx) = channel::bounded(1);
41+
42+
let m = Message {
43+
req,
44+
resp_channel: tx,
45+
};
46+
if let Err(e) = self.send.send(m).await {
47+
return Err(Status {
48+
code: StatusCode::AsyncError,
49+
err: e.to_string(),
50+
});
51+
}
52+
let resp = rx.recv().await;
53+
match resp {
54+
Err(e) => Err(Status {
55+
code: StatusCode::AsyncError,
56+
err: e.to_string(),
57+
}),
58+
Ok(r) => Ok(r),
59+
}
60+
}
61+
62+
pub(crate) async fn run_server_async(
63+
mut db: DB,
64+
mut recv: impl ReceiverExt<Message> + Clone + 'static,
65+
mut shutdown: impl ReceiverExt<()> + Clone + 'static,
66+
mut snapshots: HashMap<usize, Snapshot>,
67+
mut snapshot_counter: usize,
68+
) {
69+
if let Some(message) = recv.recv().await {
70+
Self::match_message(
71+
&mut db,
72+
recv.clone(),
73+
&mut snapshots,
74+
&mut snapshot_counter,
75+
message,
76+
);
77+
}
78+
79+
spawn_local(async move {
80+
// check shutdown
81+
if let Some(()) = shutdown.recv().await {
82+
return;
83+
} else {
84+
AsyncDB::run_server_async(db, recv, shutdown, snapshots, snapshot_counter).await
85+
};
86+
});
87+
}
88+
89+
pub(crate) async fn stop_server_async(&self) {
90+
self.shutdown.close();
91+
}
92+
}
93+
94+
pub(crate) fn send_response_result(ch: channel::Sender<Response>, result: Result<()>) {
95+
if let Err(e) = result {
96+
ch.try_send(Response::Error(e)).ok();
97+
} else {
98+
ch.try_send(Response::OK).ok();
99+
}
100+
}
101+
102+
pub(crate) fn send_response(ch: channel::Sender<Response>, res: Response) {
103+
ch.send_blocking(res).ok();
104+
}
105+
106+
impl ReceiverExt<Message> for channel::Receiver<Message> {
107+
fn blocking_recv(&mut self) -> Option<Message> {
108+
self.recv_blocking().ok()
109+
}
110+
111+
fn close(&mut self) {
112+
channel::Receiver::close(self);
113+
}
114+
115+
async fn recv(&mut self) -> Option<Message> {
116+
channel::Receiver::recv(&self).await.ok()
117+
}
118+
}
119+
120+
impl ReceiverExt<()> for channel::Receiver<()> {
121+
fn blocking_recv(&mut self) -> Option<()> {
122+
self.recv_blocking().ok()
123+
}
124+
125+
fn close(&mut self) {
126+
channel::Receiver::close(self);
127+
}
128+
129+
async fn recv(&mut self) -> Option<()> {
130+
match channel::Receiver::try_recv(&self) {
131+
Ok(_) => Some(()),
132+
Err(TryRecvError::Empty) => None,
133+
Err(TryRecvError::Closed) => Some(()),
134+
}
135+
}
136+
}
137+
138+
#[cfg(test)]
139+
pub mod tests {
140+
use crate::{in_memory, AsyncDB};
141+
use wasm_bindgen_test::wasm_bindgen_test;
142+
143+
#[wasm_bindgen_test]
144+
async fn test_asyncdb() {
145+
let db = AsyncDB::new("test.db", in_memory()).unwrap();
146+
db.put(b"key".to_vec(), b"value".to_vec()).await.unwrap();
147+
let val = db.get(b"key".to_vec()).await.unwrap();
148+
assert_eq!(val, Some(b"value".to_vec()));
149+
db.stop_server_async().await;
150+
}
151+
}

src/lib.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,11 @@ extern crate time_test;
4040
#[macro_use]
4141
mod infolog;
4242

43-
#[cfg(any(feature = "asyncdb-tokio", feature = "asyncdb-async-std"))]
43+
#[cfg(any(
44+
feature = "asyncdb-tokio",
45+
feature = "asyncdb-async-std",
46+
feature = "asyncdb-wasm-bindgen-futures"
47+
))]
4448
mod asyncdb;
4549

4650
#[cfg(feature = "asyncdb-tokio")]
@@ -53,6 +57,11 @@ mod asyncdb_async_std;
5357
#[cfg(feature = "asyncdb-async-std")]
5458
use asyncdb_async_std::{send_response, send_response_result, Message};
5559

60+
#[cfg(feature = "asyncdb-wasm-bindgen-futures")]
61+
mod asyncdb_wasm_bindgen_futures;
62+
#[cfg(feature = "asyncdb-wasm-bindgen-futures")]
63+
use self::asyncdb_wasm_bindgen_futures::{send_response, send_response_result, Message};
64+
5665
mod block;
5766
mod block_builder;
5867
mod blockhandle;
@@ -96,6 +105,8 @@ pub mod env;
96105
pub use asyncdb_async_std::AsyncDB;
97106
#[cfg(feature = "asyncdb-tokio")]
98107
pub use asyncdb_tokio::AsyncDB;
108+
#[cfg(feature = "asyncdb-wasm-bindgen-futures")]
109+
pub use asyncdb_wasm_bindgen_futures::AsyncDB;
99110
pub use cmp::{Cmp, DefaultCmp};
100111
pub use compressor::{Compressor, CompressorId};
101112
pub use db_impl::DB;

0 commit comments

Comments
 (0)