Skip to content

Commit a1a1ae9

Browse files
authored
feat(batch): add batch statements support (#30)
Signed-off-by: Daniel Boll <danielboll.academico@gmail.com>
1 parent f9fd947 commit a1a1ae9

File tree

10 files changed

+171
-33
lines changed

10 files changed

+171
-33
lines changed

examples/auth.mts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ const cluster = new Cluster({
1212

1313
const session = await cluster.connect();
1414

15-
await session.execute("CREATE KEYSPACE IF NOT EXISTS examples_ks WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}");
16-
await session.execute("DROP TABLE IF EXISTS examples_ks.auth");
15+
await session.execute("CREATE KEYSPACE IF NOT EXISTS auth WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}");
16+
await session.execute("DROP TABLE IF EXISTS auth.auth");
1717

1818
console.log("Ok.");

examples/batch-statements.mts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import { Cluster, BatchStatement, Query, Uuid } from "../index.js"
2+
3+
const nodes = process.env.CLUSTER_NODES?.split(",") ?? ["127.0.0.1:9042"];
4+
5+
const cluster = new Cluster({ nodes });
6+
const session = await cluster.connect();
7+
8+
const batch = new BatchStatement();
9+
10+
await session.execute("CREATE KEYSPACE IF NOT EXISTS batch_statements WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }");
11+
await session.useKeyspace("batch_statements");
12+
await session.execute("CREATE TABLE IF NOT EXISTS users (id UUID PRIMARY KEY, name TEXT)");
13+
14+
const simpleStatement = new Query("INSERT INTO users (id, name) VALUES (?, ?)");
15+
const preparedStatement = await session.prepare("INSERT INTO users (id, name) VALUES (?, ?)");
16+
17+
batch.appendStatement(simpleStatement);
18+
batch.appendStatement(preparedStatement);
19+
20+
await session.batch(batch, [[Uuid.randomV4(), "Alice"], [Uuid.randomV4(), "Bob"]]);
21+
22+
console.log(await session.execute("SELECT * FROM users"));

examples/lwt.mts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ const cluster = new Cluster({ nodes });
66

77
const session = await cluster.connect();
88

9-
await session.execute("CREATE KEYSPACE IF NOT EXISTS examples_ks WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}");
10-
await session.execute("CREATE TABLE IF NOT EXISTS examples_ks.tab (a int PRIMARY KEY)");
9+
await session.execute("CREATE KEYSPACE IF NOT EXISTS lwt WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}");
10+
await session.execute("CREATE TABLE IF NOT EXISTS lwt.tab (a int PRIMARY KEY)");
1111

12-
const query = new Query("INSERT INTO examples_ks.tab (a) VALUES(?) IF NOT EXISTS");
12+
const query = new Query("INSERT INTO lwt.tab (a) VALUES(?) IF NOT EXISTS");
1313
query.setConsistency(Consistency.One);
1414
query.setSerialConsistency(SerialConsistency.Serial);
1515

index.d.ts

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -71,16 +71,33 @@ export class Cluster {
7171
/** Connect to the cluster */
7272
connect(keyspaceOrOptions?: string | ConnectionOptions | undefined | null, options?: ConnectionOptions | undefined | null): Promise<ScyllaSession>
7373
}
74-
export type ScyllaQuery = Query
75-
export class Query {
76-
constructor(query: string)
74+
export type ScyllaBatchStatement = BatchStatement
75+
/**
76+
* Batch statements
77+
*
78+
* A batch statement allows to execute many data-modifying statements at once.
79+
* These statements can be simple or prepared.
80+
* Only INSERT, UPDATE and DELETE statements are allowed.
81+
*/
82+
export class BatchStatement {
83+
constructor()
84+
/**
85+
* Appends a statement to the batch.
86+
*
87+
* _Warning_
88+
* Using simple statements with bind markers in batches is strongly discouraged. For each simple statement with a non-empty list of values in the batch, the driver will send a prepare request, and it will be done sequentially. Results of preparation are not cached between `session.batch` calls. Consider preparing the statements before putting them into the batch.
89+
*/
90+
appendStatement(statement: Query | PreparedStatement): void
91+
}
92+
export class PreparedStatement {
7793
setConsistency(consistency: Consistency): void
7894
setSerialConsistency(serialConsistency: SerialConsistency): void
79-
setPageSize(pageSize: number): void
8095
}
81-
export class ScyllaPreparedStatement {
96+
export class Query {
97+
constructor(query: string)
8298
setConsistency(consistency: Consistency): void
8399
setSerialConsistency(serialConsistency: SerialConsistency): void
100+
setPageSize(pageSize: number): void
84101
}
85102
export class Metrics {
86103
/** Returns counter for nonpaged queries */
@@ -104,9 +121,10 @@ export class Metrics {
104121
}
105122
export class ScyllaSession {
106123
metrics(): Metrics
107-
execute(query: string | Query | ScyllaPreparedStatement, parameters?: Array<number | string | Uuid> | undefined | null): Promise<any>
124+
execute(query: string | Query | PreparedStatement, parameters?: Array<number | string | Uuid> | undefined | null): Promise<any>
108125
query(scyllaQuery: Query, parameters?: Array<number | string | Uuid> | undefined | null): Promise<any>
109-
prepare(query: string): Promise<ScyllaPreparedStatement>
126+
prepare(query: string): Promise<PreparedStatement>
127+
batch(batch: BatchStatement, parameters: Array<Array<number | string | Uuid> | undefined | null>): Promise<any>
110128
/**
111129
* Sends `USE <keyspace_name>` request on all connections\
112130
* This allows to write `SELECT * FROM table` instead of `SELECT * FROM keyspace.table`\

index.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -310,15 +310,16 @@ if (!nativeBinding) {
310310
throw new Error(`Failed to load native binding`)
311311
}
312312

313-
const { Compression, Consistency, SerialConsistency, Cluster, VerifyMode, Query, ScyllaPreparedStatement, Metrics, ScyllaSession, Uuid } = nativeBinding
313+
const { Compression, Consistency, SerialConsistency, Cluster, VerifyMode, BatchStatement, PreparedStatement, Query, Metrics, ScyllaSession, Uuid } = nativeBinding
314314

315315
module.exports.Compression = Compression
316316
module.exports.Consistency = Consistency
317317
module.exports.SerialConsistency = SerialConsistency
318318
module.exports.Cluster = Cluster
319319
module.exports.VerifyMode = VerifyMode
320+
module.exports.BatchStatement = BatchStatement
321+
module.exports.PreparedStatement = PreparedStatement
320322
module.exports.Query = Query
321-
module.exports.ScyllaPreparedStatement = ScyllaPreparedStatement
322323
module.exports.Metrics = Metrics
323324
module.exports.ScyllaSession = ScyllaSession
324325
module.exports.Uuid = Uuid

src/query/batch_statement.rs

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
use std::fmt::Display;
2+
3+
use napi::Either;
4+
use scylla::batch::Batch;
5+
6+
use super::{scylla_prepared_statement::PreparedStatement, scylla_query::Query};
7+
8+
/// Batch statements
9+
///
10+
/// A batch statement allows to execute many data-modifying statements at once.
11+
/// These statements can be simple or prepared.
12+
/// Only INSERT, UPDATE and DELETE statements are allowed.
13+
#[napi(js_name = "BatchStatement")]
14+
pub struct ScyllaBatchStatement {
15+
pub(crate) batch: Batch,
16+
}
17+
18+
impl Display for ScyllaBatchStatement {
19+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
20+
write!(
21+
f,
22+
"ScyllaBatchStatement: {:?}",
23+
self
24+
.batch
25+
.statements
26+
.iter()
27+
.map(|s| match s {
28+
scylla::batch::BatchStatement::Query(q) => q.contents.clone(),
29+
scylla::batch::BatchStatement::PreparedStatement(p) => p.get_statement().to_string(),
30+
})
31+
.collect::<Vec<_>>()
32+
)
33+
}
34+
}
35+
36+
#[napi]
37+
impl ScyllaBatchStatement {
38+
#[napi(constructor)]
39+
pub fn new() -> Self {
40+
Self {
41+
batch: Default::default(),
42+
}
43+
}
44+
45+
/// Appends a statement to the batch.
46+
///
47+
/// _Warning_
48+
/// Using simple statements with bind markers in batches is strongly discouraged. For each simple statement with a non-empty list of values in the batch, the driver will send a prepare request, and it will be done sequentially. Results of preparation are not cached between `session.batch` calls. Consider preparing the statements before putting them into the batch.
49+
#[napi]
50+
pub fn append_statement(&mut self, statement: Either<&Query, &PreparedStatement>) {
51+
match statement {
52+
Either::A(simple_query) => self.batch.append_statement(simple_query.query.clone()),
53+
Either::B(prepared_statement) => self
54+
.batch
55+
.append_statement(prepared_statement.prepared.clone()),
56+
}
57+
}
58+
}
59+
60+
impl Default for ScyllaBatchStatement {
61+
fn default() -> Self {
62+
Self::new()
63+
}
64+
}

src/query/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
1-
pub mod scylla_query;
1+
pub mod batch_statement;
22
pub mod scylla_prepared_statement;
3+
pub mod scylla_query;

src/query/scylla_prepared_statement.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
1-
use scylla::prepared_statement::PreparedStatement;
1+
use scylla::prepared_statement;
22

33
use crate::cluster::execution_profile::{
44
consistency::Consistency, serial_consistency::SerialConsistency,
55
};
66

77
#[napi]
8-
pub struct ScyllaPreparedStatement {
9-
pub(crate) prepared: PreparedStatement,
8+
pub struct PreparedStatement {
9+
pub(crate) prepared: prepared_statement::PreparedStatement,
1010
}
1111

1212
#[napi]
13-
impl ScyllaPreparedStatement {
14-
pub fn new(prepared: PreparedStatement) -> Self {
13+
impl PreparedStatement {
14+
pub fn new(prepared: prepared_statement::PreparedStatement) -> Self {
1515
Self { prepared }
1616
}
1717

src/query/scylla_query.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,25 +3,25 @@ use std::fmt::Display;
33
use crate::cluster::execution_profile::{
44
consistency::Consistency, serial_consistency::SerialConsistency,
55
};
6-
use scylla::query::Query;
6+
use scylla::query;
77

8-
#[napi(js_name = "Query")]
9-
pub struct ScyllaQuery {
10-
pub(crate) query: Query,
8+
#[napi]
9+
pub struct Query {
10+
pub(crate) query: query::Query,
1111
}
1212

13-
impl Display for ScyllaQuery {
13+
impl Display for Query {
1414
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1515
write!(f, "ScyllaQuery: {}", self.query.contents)
1616
}
1717
}
1818

1919
#[napi]
20-
impl ScyllaQuery {
20+
impl Query {
2121
#[napi(constructor)]
2222
pub fn new(query: String) -> Self {
2323
Self {
24-
query: Query::new(query),
24+
query: query::Query::new(query),
2525
}
2626
}
2727

src/session/scylla_session.rs

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use crate::helpers::query_parameter::QueryParameter;
22
use crate::helpers::query_results::QueryResult;
3-
use crate::query::scylla_prepared_statement::ScyllaPreparedStatement;
4-
use crate::query::scylla_query::ScyllaQuery;
3+
use crate::query::batch_statement::ScyllaBatchStatement;
4+
use crate::query::scylla_prepared_statement::PreparedStatement;
5+
use crate::query::scylla_query::Query;
56
use crate::types::uuid::Uuid;
67
use napi::bindgen_prelude::Either3;
78

@@ -26,7 +27,7 @@ impl ScyllaSession {
2627
#[napi]
2728
pub async fn execute(
2829
&self,
29-
query: Either3<String, &ScyllaQuery, &ScyllaPreparedStatement>,
30+
query: Either3<String, &Query, &PreparedStatement>,
3031
parameters: Option<Vec<Either3<u32, String, &Uuid>>>,
3132
) -> napi::Result<serde_json::Value> {
3233
let values = QueryParameter::parser(parameters.clone()).ok_or(napi::Error::new(
@@ -58,7 +59,7 @@ impl ScyllaSession {
5859
#[napi]
5960
pub async fn query(
6061
&self,
61-
scylla_query: &ScyllaQuery,
62+
scylla_query: &Query,
6263
parameters: Option<Vec<Either3<u32, String, &Uuid>>>,
6364
) -> napi::Result<serde_json::Value> {
6465
let values = QueryParameter::parser(parameters.clone()).ok_or(napi::Error::new(
@@ -81,15 +82,46 @@ impl ScyllaSession {
8182
}
8283

8384
#[napi]
84-
pub async fn prepare(&self, query: String) -> napi::Result<ScyllaPreparedStatement> {
85+
pub async fn prepare(&self, query: String) -> napi::Result<PreparedStatement> {
8586
let prepared = self.session.prepare(query.clone()).await.map_err(|_| {
8687
napi::Error::new(
8788
napi::Status::InvalidArg,
8889
format!("Something went wrong with your prepared statement. - [{query}]"),
8990
)
9091
})?;
9192

92-
Ok(ScyllaPreparedStatement::new(prepared))
93+
Ok(PreparedStatement::new(prepared))
94+
}
95+
96+
#[napi]
97+
#[allow(clippy::type_complexity)]
98+
pub async fn batch(
99+
&self,
100+
batch: &ScyllaBatchStatement,
101+
parameters: Vec<Option<Vec<Either3<u32, String, &Uuid>>>>,
102+
) -> napi::Result<serde_json::Value> {
103+
let values = parameters
104+
.iter()
105+
.map(|params| {
106+
QueryParameter::parser(params.clone()).ok_or(napi::Error::new(
107+
napi::Status::InvalidArg,
108+
format!("Something went wrong with your batch parameters. {parameters:?}"),
109+
))
110+
})
111+
.collect::<napi::Result<Vec<_>>>()?;
112+
113+
let query_result = self
114+
.session
115+
.batch(&batch.batch, values)
116+
.await
117+
.map_err(|e| {
118+
napi::Error::new(
119+
napi::Status::InvalidArg,
120+
format!("Something went wrong with your batch. - [{batch}] - {parameters:?}\n{e}"),
121+
)
122+
})?;
123+
124+
Ok(QueryResult::parser(query_result))
93125
}
94126

95127
/// Sends `USE <keyspace_name>` request on all connections\

0 commit comments

Comments
 (0)