Skip to content

Commit e2d11a7

Browse files
authored
feat(lwt): add lightweight transaction (LWT) support (#29)
Signed-off-by: Daniel Boll <danielboll.academico@gmail.com>
1 parent 08e4764 commit e2d11a7

File tree

9 files changed

+97
-23
lines changed

9 files changed

+97
-23
lines changed

examples/lwt.mts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import { Cluster, Consistency, Query, SerialConsistency } from "../index.js"
2+
3+
const nodes = process.env.CLUSTER_NODES?.split(",") ?? ["127.0.0.1:9042"];
4+
5+
const cluster = new Cluster({ nodes });
6+
7+
const session = await cluster.connect();
8+
9+
await session.execute("CREATE KEYSPACE IF NOT EXISTS examples_ks WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}");
10+
await session.execute("CREATE TABLE IF NOT EXISTS examples_ks.tab (a int PRIMARY KEY)");
11+
12+
const query = new Query("INSERT INTO examples_ks.tab (a) VALUES(?) IF NOT EXISTS");
13+
query.setConsistency(Consistency.One);
14+
query.setSerialConsistency(SerialConsistency.Serial);
15+
16+
await session.execute(query, [12345]);
17+
18+
console.log("Ok.");

examples/tsconfig.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"extends": "../tsconfig.json",
33
"compilerOptions": {
4-
"module": "ESNext",
4+
"module": "NodeNext",
55
"target": "ESNext",
66
"moduleResolution": "NodeNext"
77
}

index.d.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,13 @@ export const enum Consistency {
3333
Serial = 8,
3434
LocalSerial = 9
3535
}
36+
export const enum SerialConsistency {
37+
Serial = 8,
38+
LocalSerial = 9
39+
}
3640
export interface ExecutionProfile {
3741
consistency?: Consistency
42+
serialConsistency?: SerialConsistency
3843
requestTimeout?: number
3944
}
4045
export interface ConnectionOptions {
@@ -69,9 +74,13 @@ export class Cluster {
6974
export type ScyllaQuery = Query
7075
export class Query {
7176
constructor(query: string)
77+
setConsistency(consistency: Consistency): void
78+
setSerialConsistency(serialConsistency: SerialConsistency): void
79+
setPageSize(pageSize: number): void
7280
}
7381
export class ScyllaPreparedStatement {
7482
setConsistency(consistency: Consistency): void
83+
setSerialConsistency(serialConsistency: SerialConsistency): void
7584
}
7685
export class Metrics {
7786
/** Returns counter for nonpaged queries */
@@ -95,7 +104,7 @@ export class Metrics {
95104
}
96105
export class ScyllaSession {
97106
metrics(): Metrics
98-
execute(query: string | ScyllaPreparedStatement, parameters?: Array<number | string | Uuid> | undefined | null): Promise<any>
107+
execute(query: string | Query | ScyllaPreparedStatement, parameters?: Array<number | string | Uuid> | undefined | null): Promise<any>
99108
query(scyllaQuery: Query, parameters?: Array<number | string | Uuid> | undefined | null): Promise<any>
100109
prepare(query: string): Promise<ScyllaPreparedStatement>
101110
/**

index.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,10 +310,11 @@ if (!nativeBinding) {
310310
throw new Error(`Failed to load native binding`)
311311
}
312312

313-
const { Compression, Consistency, Cluster, VerifyMode, Query, ScyllaPreparedStatement, Metrics, ScyllaSession, Uuid } = nativeBinding
313+
const { Compression, Consistency, SerialConsistency, Cluster, VerifyMode, Query, ScyllaPreparedStatement, Metrics, ScyllaSession, Uuid } = nativeBinding
314314

315315
module.exports.Compression = Compression
316316
module.exports.Consistency = Consistency
317+
module.exports.SerialConsistency = SerialConsistency
317318
module.exports.Cluster = Cluster
318319
module.exports.VerifyMode = VerifyMode
319320
module.exports.Query = Query

src/cluster/execution_profile/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
pub mod consistency;
2+
pub mod serial_consistency;
23

34
use self::consistency::Consistency;
5+
use self::serial_consistency::SerialConsistency;
46

57
#[napi(object)]
68
#[derive(Copy, Clone)]
79
pub struct ExecutionProfile {
810
pub consistency: Option<Consistency>,
11+
pub serial_consistency: Option<SerialConsistency>,
912
pub request_timeout: Option<u32>,
1013
}
1114

@@ -17,6 +20,8 @@ impl ExecutionProfile {
1720
ec_builder = ec_builder.consistency(consistency.into());
1821
}
1922

23+
ec_builder = ec_builder.serial_consistency(self.serial_consistency.map(|sc| sc.into()));
24+
2025
if let Some(request_timeout) = self.request_timeout {
2126
ec_builder =
2227
ec_builder.request_timeout(Some(std::time::Duration::from_secs(request_timeout.into())));
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
#[napi]
2+
pub enum SerialConsistency {
3+
Serial = 0x0008,
4+
LocalSerial = 0x0009,
5+
}
6+
7+
impl From<SerialConsistency> for scylla::statement::SerialConsistency {
8+
fn from(value: SerialConsistency) -> Self {
9+
match value {
10+
SerialConsistency::Serial => Self::Serial,
11+
SerialConsistency::LocalSerial => Self::LocalSerial,
12+
}
13+
}
14+
}
15+
16+
impl From<scylla::statement::SerialConsistency> for SerialConsistency {
17+
fn from(value: scylla::statement::SerialConsistency) -> Self {
18+
match value {
19+
scylla::statement::SerialConsistency::Serial => Self::Serial,
20+
scylla::statement::SerialConsistency::LocalSerial => Self::LocalSerial,
21+
}
22+
}
23+
}

src/query/scylla_prepared_statement.rs

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,29 @@
11
use scylla::prepared_statement::PreparedStatement;
22

3-
use crate::cluster::execution_profile::consistency::Consistency;
3+
use crate::cluster::execution_profile::{
4+
consistency::Consistency, serial_consistency::SerialConsistency,
5+
};
46

57
#[napi]
68
pub struct ScyllaPreparedStatement {
7-
pub (crate) prepared: PreparedStatement,
9+
pub(crate) prepared: PreparedStatement,
810
}
911

1012
#[napi]
1113
impl ScyllaPreparedStatement {
14+
pub fn new(prepared: PreparedStatement) -> Self {
15+
Self { prepared }
16+
}
1217

13-
pub fn new(prepared: PreparedStatement) -> Self {
14-
Self {
15-
prepared
16-
}
17-
}
18+
#[napi]
19+
pub fn set_consistency(&mut self, consistency: Consistency) {
20+
self.prepared.set_consistency(consistency.into());
21+
}
1822

19-
#[napi]
20-
pub fn set_consistency(&mut self, consistency: Consistency) {
21-
self.prepared.set_consistency(consistency.into());
22-
}
23+
#[napi]
24+
pub fn set_serial_consistency(&mut self, serial_consistency: SerialConsistency) {
25+
self
26+
.prepared
27+
.set_serial_consistency(Some(serial_consistency.into()));
28+
}
2329
}
24-

src/query/scylla_query.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
use std::fmt::Display;
22

3+
use crate::cluster::execution_profile::{
4+
consistency::Consistency, serial_consistency::SerialConsistency,
5+
};
36
use scylla::query::Query;
4-
use scylla::statement::Consistency;
57

68
#[napi(js_name = "Query")]
79
pub struct ScyllaQuery {
@@ -23,10 +25,19 @@ impl ScyllaQuery {
2325
}
2426
}
2527

28+
#[napi]
2629
pub fn set_consistency(&mut self, consistency: Consistency) {
27-
self.query.set_consistency(consistency);
30+
self.query.set_consistency(consistency.into());
2831
}
2932

33+
#[napi]
34+
pub fn set_serial_consistency(&mut self, serial_consistency: SerialConsistency) {
35+
self
36+
.query
37+
.set_serial_consistency(Some(serial_consistency.into()));
38+
}
39+
40+
#[napi]
3041
pub fn set_page_size(&mut self, page_size: i32) {
3142
self.query.set_page_size(page_size);
3243
}

src/session/scylla_session.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::helpers::query_results::QueryResult;
33
use crate::query::scylla_prepared_statement::ScyllaPreparedStatement;
44
use crate::query::scylla_query::ScyllaQuery;
55
use crate::types::uuid::Uuid;
6-
use napi::bindgen_prelude::{Either, Either3};
6+
use napi::bindgen_prelude::Either3;
77

88
use super::metrics;
99

@@ -26,7 +26,7 @@ impl ScyllaSession {
2626
#[napi]
2727
pub async fn execute(
2828
&self,
29-
query: Either<String, &ScyllaPreparedStatement>,
29+
query: Either3<String, &ScyllaQuery, &ScyllaPreparedStatement>,
3030
parameters: Option<Vec<Either3<u32, String, &Uuid>>>,
3131
) -> napi::Result<serde_json::Value> {
3232
let values = QueryParameter::parser(parameters.clone()).ok_or(napi::Error::new(
@@ -35,13 +35,15 @@ impl ScyllaSession {
3535
))?;
3636

3737
let query_result = match query.clone() {
38-
Either::A(query) => self.session.query(query, values).await,
39-
Either::B(prepared) => self.session.execute(&prepared.prepared, values).await,
38+
Either3::A(query) => self.session.query(query, values).await,
39+
Either3::B(query) => self.session.query(query.query.clone(), values).await,
40+
Either3::C(prepared) => self.session.execute(&prepared.prepared, values).await,
4041
}
4142
.map_err(|_| {
4243
let query = match query {
43-
Either::A(query) => query,
44-
Either::B(prepared) => prepared.prepared.get_statement().to_string(),
44+
Either3::A(query) => query,
45+
Either3::B(query) => query.query.contents.clone(),
46+
Either3::C(prepared) => prepared.prepared.get_statement().to_string(),
4547
};
4648

4749
napi::Error::new(

0 commit comments

Comments
 (0)