Skip to content

Commit 4fcf108

Browse files
committed
Add support for sqlite backed durable objects.
Fixes #645
1 parent a16090a commit 4fcf108

File tree

12 files changed

+267
-1
lines changed

12 files changed

+267
-1
lines changed

worker-sandbox/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ mod test;
2626
mod user;
2727
mod utils;
2828
mod ws;
29+
mod sql_counter;
2930

3031
#[derive(Deserialize, Serialize)]
3132
struct MyData {

worker-sandbox/src/router.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::{
22
alarm, analytics_engine, assets, cache, d1, fetch, form, kv, queue, r2, request, service,
3-
socket, user, ws, SomeSharedData, GLOBAL_STATE,
3+
socket, user, ws, sql_counter, SomeSharedData, GLOBAL_STATE,
44
};
55
#[cfg(feature = "http")]
66
use std::convert::TryInto;
@@ -224,6 +224,7 @@ pub fn make_router(data: SomeSharedData, env: Env) -> axum::Router {
224224
"/analytics-engine",
225225
get(handler!(analytics_engine::handle_analytics_event)),
226226
)
227+
.route("/sql-counter/:name", get(handler!(sql_counter::handle_sql_counter)))
227228
.fallback(get(handler!(catchall)))
228229
.layer(Extension(env))
229230
.layer(Extension(data))
@@ -364,6 +365,7 @@ pub fn make_router<'a>(data: SomeSharedData) -> Router<'a, SomeSharedData> {
364365
.delete_async("/r2/delete", handler!(r2::delete))
365366
.get_async("/socket/failed", handler!(socket::handle_socket_failed))
366367
.get_async("/socket/read", handler!(socket::handle_socket_read))
368+
.get_async("/sql-counter/:name", handler!(sql_counter::handle_sql_counter))
367369
.or_else_any_method_async("/*catchall", handler!(catchall))
368370
}
369371

worker-sandbox/src/sql_counter.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
use worker::*;
2+
use wasm_bindgen::JsValue;
3+
4+
/// A simple SQLite-backed counter stored in Durable Object storage.
5+
///
6+
/// Each Durable Object instance owns its own private SQLite database. We keep a
7+
/// single table `counter` with one row that stores the current value.
8+
#[durable_object]
9+
pub struct SqlCounter {
10+
sql: SqlStorage,
11+
}
12+
13+
#[durable_object]
14+
impl DurableObject for SqlCounter {
15+
fn new(state: State, _env: Env) -> Self {
16+
let sql = state.storage().sql();
17+
// Create table if it does not exist. Note: `exec` is synchronous.
18+
sql.exec("CREATE TABLE IF NOT EXISTS counter(value INTEGER);", Vec::new())
19+
.expect("create table");
20+
Self { sql }
21+
}
22+
23+
async fn fetch(&mut self, _req: Request) -> Result<Response> {
24+
// Read current value (if any)
25+
#[derive(serde::Deserialize)]
26+
struct Row {
27+
value: i64,
28+
}
29+
30+
let rows: Vec<Row> = self
31+
.sql
32+
.exec("SELECT value FROM counter LIMIT 1;", Vec::new())?
33+
.to_array()?;
34+
let current = rows.get(0).map(|r| r.value).unwrap_or(0);
35+
let next = current + 1;
36+
37+
// Upsert new value – simplest way: delete and insert again.
38+
self.sql.exec("DELETE FROM counter;", Vec::new())?;
39+
let val_js = JsValue::from_f64(next as f64);
40+
self.sql
41+
.exec("INSERT INTO counter(value) VALUES (?);", vec![val_js])?;
42+
43+
Response::ok(format!("SQL counter is now {}", next))
44+
}
45+
}
46+
47+
#[worker::send]
48+
/// Route handler that proxies a request to our SqlCounter Durable Object with id derived from the
49+
/// path `/sql-counter/{name}` (so every name gets its own instance).
50+
pub async fn handle_sql_counter(req: Request, env: Env, _data: super::SomeSharedData) -> Result<Response> {
51+
let uri = req.url()?;
52+
let mut segments = uri.path_segments().unwrap();
53+
// skip "sql-counter"
54+
let _ = segments.next();
55+
let name = segments.next().unwrap_or("default");
56+
let namespace = env.durable_object("SQL_COUNTER")?;
57+
let stub = namespace.id_from_name(name)?.get_stub()?;
58+
stub.fetch_with_str("https://fake-host/").await
59+
}

worker-sandbox/tests/mf.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ export const mf = new Miniflare({
5959
durableObjects: {
6060
COUNTER: "Counter",
6161
PUT_RAW_TEST_OBJECT: "PutRawTestObject",
62+
SQL_COUNTER: "SqlCounter",
6263
},
6364
kvNamespaces: ["SOME_NAMESPACE", "FILE_SIZES", "TEST"],
6465
serviceBindings: {

worker-sandbox/tests/sqlite.spec.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import { describe, test, expect } from "vitest";
2+
import { mf } from "./mf";
3+
4+
// Unsupported by miniflare
5+
describe.skip("sqlite durable object", () => {
6+
test("counter increments per object", async () => {
7+
// First access for object "alice"
8+
let resp = await mf.dispatchFetch("http://fake.host/sql-counter/alice");
9+
expect(resp.status).toBe(200);
10+
expect(await resp.text()).toBe("SQL counter is now 1");
11+
12+
// Second access for same object should increment
13+
resp = await mf.dispatchFetch("http://fake.host/sql-counter/alice");
14+
expect(resp.status).toBe(200);
15+
expect(await resp.text()).toBe("SQL counter is now 2");
16+
17+
// Different object name should have its own counter starting at 1
18+
resp = await mf.dispatchFetch("http://fake.host/sql-counter/bob");
19+
expect(resp.status).toBe(200);
20+
expect(await resp.text()).toBe("SQL counter is now 1");
21+
});
22+
});

worker-sandbox/wrangler.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ bindings = [
2929
{ name = "COUNTER", class_name = "Counter" },
3030
{ name = "ALARM", class_name = "AlarmObject" },
3131
{ name = "PUT_RAW_TEST_OBJECT", class_name = "PutRawTestObject" },
32+
{ name = "SQL_COUNTER", class_name = "SqlCounter" },
3233
]
3334

3435
[[analytics_engine_datasets]]
@@ -82,3 +83,7 @@ type = "CompiledWasm"
8283

8384
[package.metadata.wasm-pack.profile.dev.wasm-bindgen]
8485
dwarf-debug-info = true
86+
87+
[[migrations]]
88+
tag = "v1"
89+
new_sqlite_classes = ["SqlCounter"]

worker-sys/src/types/durable_object.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@ mod namespace;
55
mod state;
66
mod storage;
77
mod transaction;
8+
mod sql_storage;
89

910
pub use id::*;
1011
pub use namespace::*;
1112
pub use state::*;
1213
pub use storage::*;
1314
pub use transaction::*;
15+
pub use sql_storage::*;
1416

1517
#[wasm_bindgen]
1618
extern "C" {
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
use js_sys::{Array, Iterator as JsIterator, Object};
2+
use wasm_bindgen::prelude::*;
3+
4+
#[wasm_bindgen]
5+
extern "C" {
6+
#[wasm_bindgen(extends = js_sys::Object)]
7+
#[derive(Clone)]
8+
pub type SqlStorage;
9+
10+
/// Returns the on-disk size of the SQLite database, in bytes.
11+
#[wasm_bindgen(method, getter, js_name = databaseSize)]
12+
pub fn database_size(this: &SqlStorage) -> f64;
13+
14+
/// Execute a SQL statement (optionally with bindings) and return a cursor over the results.
15+
///
16+
/// The JavaScript definition of `exec` is variadic, taking a SQL string followed by an
17+
/// arbitrary number of binding parameters. In Rust we accept the bindings packed into an
18+
/// `Array` – callers can construct the array (or use the helper provided in the higher-level
19+
/// `worker` crate).
20+
#[wasm_bindgen(structural, method, catch, variadic, js_class = SqlStorage, js_name = exec)]
21+
pub fn exec(
22+
this: &SqlStorage,
23+
query: &str,
24+
bindings: Array,
25+
) -> Result<SqlStorageCursor, JsValue>;
26+
}
27+
28+
#[wasm_bindgen]
29+
extern "C" {
30+
#[wasm_bindgen(extends = js_sys::Object)]
31+
#[derive(Clone)]
32+
pub type SqlStorageCursor;
33+
34+
/// JavaScript `Iterator.next()` implementation. Returns an object with `{ done, value }`.
35+
#[wasm_bindgen(method, js_name = next)]
36+
pub fn next(this: &SqlStorageCursor) -> Object;
37+
38+
/// Convert the remaining rows into an array of objects.
39+
#[wasm_bindgen(method, js_name = toArray)]
40+
pub fn to_array(this: &SqlStorageCursor) -> Array;
41+
42+
/// Returns the single row if exactly one row exists, otherwise throws.
43+
#[wasm_bindgen(method)]
44+
pub fn one(this: &SqlStorageCursor) -> JsValue;
45+
46+
/// Returns an iterator where each row is an array rather than an object.
47+
#[wasm_bindgen(method)]
48+
pub fn raw(this: &SqlStorageCursor) -> JsIterator;
49+
50+
/// Column names in the order they appear in `raw()` row arrays.
51+
#[wasm_bindgen(method, getter, js_name = columnNames)]
52+
pub fn column_names(this: &SqlStorageCursor) -> Array;
53+
54+
/// Rows read so far.
55+
#[wasm_bindgen(method, getter, js_name = rowsRead)]
56+
pub fn rows_read(this: &SqlStorageCursor) -> f64;
57+
58+
/// Rows written so far.
59+
#[wasm_bindgen(method, getter, js_name = rowsWritten)]
60+
pub fn rows_written(this: &SqlStorageCursor) -> f64;
61+
}

worker-sys/src/types/durable_object/storage.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,4 +74,7 @@ extern "C" {
7474
this: &DurableObjectStorage,
7575
options: js_sys::Object,
7676
) -> Result<js_sys::Promise, JsValue>;
77+
78+
#[wasm_bindgen(method, getter)]
79+
pub fn sql(this: &DurableObjectStorage) -> crate::types::SqlStorage;
7780
}

worker/src/durable.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -501,6 +501,11 @@ impl Storage {
501501
.map_err(Error::from)
502502
.map(|_| ())
503503
}
504+
505+
// Add new method to access SQLite APIs
506+
pub fn sql(&self) -> crate::sql::SqlStorage {
507+
crate::sql::SqlStorage::new(self.inner.sql())
508+
}
504509
}
505510

506511
pub struct Transaction {

worker/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,7 @@ mod socket;
234234
mod streams;
235235
mod version;
236236
mod websocket;
237+
mod sql;
237238

238239
pub type Result<T> = StdResult<T, error::Error>;
239240

@@ -252,3 +253,5 @@ pub type HttpRequest = ::http::Request<http::body::Body>;
252253
#[cfg(feature = "http")]
253254
/// **Requires** `http` feature. Type alias for `http::Response<worker::Body>`.
254255
pub type HttpResponse = ::http::Response<http::body::Body>;
256+
257+
pub use crate::sql::*;

worker/src/sql.rs

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
use js_sys::{Array};
2+
use wasm_bindgen::JsValue;
3+
use worker_sys::types::{SqlStorage as SqlStorageSys, SqlStorageCursor as SqlStorageCursorSys};
4+
5+
use serde::de::DeserializeOwned;
6+
use serde_wasm_bindgen as swb;
7+
8+
use crate::Error;
9+
use crate::Result;
10+
11+
/// Wrapper around the Workers `SqlStorage` interface exposed at `ctx.storage.sql`.
12+
///
13+
/// The API is intentionally minimal for now – additional helper utilities can be built on top
14+
/// as needed.
15+
#[derive(Clone)]
16+
pub struct SqlStorage {
17+
inner: SqlStorageSys,
18+
}
19+
20+
unsafe impl Send for SqlStorage {}
21+
unsafe impl Sync for SqlStorage {}
22+
23+
impl SqlStorage {
24+
pub(crate) fn new(inner: SqlStorageSys) -> Self {
25+
Self { inner }
26+
}
27+
28+
/// Size of the underlying SQLite database in bytes.
29+
pub fn database_size(&self) -> usize {
30+
self.inner.database_size() as usize
31+
}
32+
33+
/// Execute a SQL query and return a cursor over the results.
34+
///
35+
/// `bindings` correspond to positional `?` placeholders in the query.
36+
pub fn exec(&self, query: &str, bindings: Vec<JsValue>) -> Result<SqlCursor> {
37+
let array = Array::new();
38+
for v in bindings {
39+
array.push(&v);
40+
}
41+
let cursor = self.inner.exec(query, array).map_err(Error::from)?;
42+
Ok(SqlCursor { inner: cursor })
43+
}
44+
}
45+
46+
impl AsRef<JsValue> for SqlStorage {
47+
fn as_ref(&self) -> &JsValue {
48+
&self.inner
49+
}
50+
}
51+
52+
/// A cursor returned from `SqlStorage::exec`.
53+
#[derive(Clone)]
54+
pub struct SqlCursor {
55+
inner: SqlStorageCursorSys,
56+
}
57+
58+
unsafe impl Send for SqlCursor {}
59+
unsafe impl Sync for SqlCursor {}
60+
61+
impl SqlCursor {
62+
/// Consume the remaining rows of the cursor into a `Vec` of deserialised objects.
63+
pub fn to_array<T>(&self) -> Result<Vec<T>>
64+
where
65+
T: DeserializeOwned,
66+
{
67+
let arr = self.inner.to_array();
68+
let mut out = Vec::with_capacity(arr.length() as usize);
69+
for val in arr.iter() {
70+
out.push(swb::from_value(val)?);
71+
}
72+
Ok(out)
73+
}
74+
75+
/// Return the first (and only) row of the query result.
76+
pub fn one<T>(&self) -> Result<T>
77+
where
78+
T: DeserializeOwned,
79+
{
80+
let val = self.inner.one();
81+
Ok(swb::from_value(val)?)
82+
}
83+
84+
/// Column names returned by the query.
85+
pub fn column_names(&self) -> Vec<String> {
86+
self.inner
87+
.column_names()
88+
.iter()
89+
.map(|v| v.as_string().unwrap_or_default())
90+
.collect()
91+
}
92+
93+
/// Number of rows read so far by the cursor.
94+
pub fn rows_read(&self) -> usize {
95+
self.inner.rows_read() as usize
96+
}
97+
98+
/// Number of rows written by the query so far.
99+
pub fn rows_written(&self) -> usize {
100+
self.inner.rows_written() as usize
101+
}
102+
}

0 commit comments

Comments
 (0)