Skip to content

Commit fc69715

Browse files
authored
Use server default db over hardcoded default db (#197)
* Add option to pass env and server config to testcontainers * Run testcontainers async as all tests are async as well * Only skip EE test when the license was not accepted * Add test to show that the server default db is used * Use server default db over hardcoded default db
1 parent fe990fb commit fc69715

File tree

9 files changed

+200
-62
lines changed

9 files changed

+200
-62
lines changed

lib/src/config.rs

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use crate::errors::{Error, Result};
33
use std::path::Path;
44
use std::{ops::Deref, sync::Arc};
55

6-
const DEFAULT_DATABASE: &str = "neo4j";
76
const DEFAULT_FETCH_SIZE: usize = 200;
87
const DEFAULT_MAX_CONNECTIONS: usize = 16;
98

@@ -24,12 +23,6 @@ impl From<String> for Database {
2423
}
2524
}
2625

27-
impl Default for Database {
28-
fn default() -> Self {
29-
Database(DEFAULT_DATABASE.into())
30-
}
31-
}
32-
3326
impl AsRef<str> for Database {
3427
fn as_ref(&self) -> &str {
3528
&self.0
@@ -47,7 +40,7 @@ impl Deref for Database {
4740
/// The configuration that is used once a connection is alive.
4841
#[derive(Debug, Clone)]
4942
pub struct LiveConfig {
50-
pub(crate) db: Database,
43+
pub(crate) db: Option<Database>,
5144
pub(crate) fetch_size: usize,
5245
}
5346

@@ -58,7 +51,7 @@ pub struct Config {
5851
pub(crate) user: String,
5952
pub(crate) password: String,
6053
pub(crate) max_connections: usize,
61-
pub(crate) db: Database,
54+
pub(crate) db: Option<Database>,
6255
pub(crate) fetch_size: usize,
6356
pub(crate) client_certificate: Option<ClientCertificate>,
6457
}
@@ -77,7 +70,7 @@ pub struct ConfigBuilder {
7770
uri: Option<String>,
7871
user: Option<String>,
7972
password: Option<String>,
80-
db: Database,
73+
db: Option<Database>,
8174
fetch_size: usize,
8275
max_connections: usize,
8376
client_certificate: Option<ClientCertificate>,
@@ -109,9 +102,11 @@ impl ConfigBuilder {
109102

110103
/// The name of the database to connect to.
111104
///
112-
/// Defaults to "neo4j" if not set.
105+
/// Defaults to the server configured default database if not set.
106+
/// The database can also be specified on a per-query level, which will
107+
/// override this value.
113108
pub fn db(mut self, db: impl Into<Database>) -> Self {
114-
self.db = db.into();
109+
self.db = Some(db.into());
115110
self
116111
}
117112

@@ -160,7 +155,7 @@ impl Default for ConfigBuilder {
160155
uri: None,
161156
user: None,
162157
password: None,
163-
db: DEFAULT_DATABASE.into(),
158+
db: None,
164159
max_connections: DEFAULT_MAX_CONNECTIONS,
165160
fetch_size: DEFAULT_FETCH_SIZE,
166161
client_certificate: None,
@@ -186,7 +181,7 @@ mod tests {
186181
assert_eq!(config.uri, "127.0.0.1:7687");
187182
assert_eq!(config.user, "some_user");
188183
assert_eq!(config.password, "some_password");
189-
assert_eq!(&*config.db, "some_db");
184+
assert_eq!(config.db.as_deref(), Some("some_db"));
190185
assert_eq!(config.fetch_size, 10);
191186
assert_eq!(config.max_connections, 5);
192187
assert!(config.client_certificate.is_none());
@@ -203,7 +198,7 @@ mod tests {
203198
assert_eq!(config.uri, "127.0.0.1:7687");
204199
assert_eq!(config.user, "some_user");
205200
assert_eq!(config.password, "some_password");
206-
assert_eq!(&*config.db, "neo4j");
201+
assert_eq!(config.db, None);
207202
assert_eq!(config.fetch_size, 200);
208203
assert_eq!(config.max_connections, 16);
209204
assert!(config.client_certificate.is_none());

lib/src/graph.rs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ impl Graph {
5353
///
5454
/// Transactions will not be automatically retried on any failure.
5555
pub async fn start_txn(&self) -> Result<Txn> {
56-
self.start_txn_on(self.config.db.clone()).await
56+
self.impl_start_txn_on(self.config.db.clone()).await
5757
}
5858

5959
/// Starts a new transaction on the provided database.
@@ -62,8 +62,12 @@ impl Graph {
6262
///
6363
/// Transactions will not be automatically retried on any failure.
6464
pub async fn start_txn_on(&self, db: impl Into<Database>) -> Result<Txn> {
65+
self.impl_start_txn_on(Some(db.into())).await
66+
}
67+
68+
async fn impl_start_txn_on(&self, db: Option<Database>) -> Result<Txn> {
6569
let connection = self.pool.get().await?;
66-
Txn::new(db.into(), self.config.fetch_size, connection).await
70+
Txn::new(db, self.config.fetch_size, connection).await
6771
}
6872

6973
/// Runs a query on the configured database using a connection from the connection pool,
@@ -78,7 +82,7 @@ impl Graph {
7882
///
7983
/// use [`Graph::execute`] when you are interested in the result stream
8084
pub async fn run(&self, q: Query) -> Result<()> {
81-
self.run_on(&self.config.db, q).await
85+
self.impl_run_on(self.config.db.clone(), q).await
8286
}
8387

8488
/// Runs a query on the provided database using a connection from the connection pool.
@@ -92,12 +96,17 @@ impl Graph {
9296
/// Use [`Graph::run`] for cases where you just want a write operation
9397
///
9498
/// use [`Graph::execute`] when you are interested in the result stream
95-
pub async fn run_on(&self, db: &str, q: Query) -> Result<()> {
99+
pub async fn run_on(&self, db: impl Into<Database>, q: Query) -> Result<()> {
100+
self.impl_run_on(Some(db.into()), q).await
101+
}
102+
103+
async fn impl_run_on(&self, db: Option<Database>, q: Query) -> Result<()> {
96104
backoff::future::retry_notify(
97105
self.pool.manager().backoff(),
98106
|| {
99107
let pool = &self.pool;
100108
let query = &q;
109+
let db = db.as_deref();
101110
async move {
102111
let mut connection = pool.get().await.map_err(crate::Error::from)?;
103112
query.run_retryable(db, &mut connection).await
@@ -115,7 +124,7 @@ impl Graph {
115124
/// This includes errors during a leader election or when the transaction resources on the server (memory, handles, ...) are exhausted.
116125
/// Retries happen with an exponential backoff until a retry delay exceeds 60s, at which point the query fails with the last error as it would without any retry.
117126
pub async fn execute(&self, q: Query) -> Result<DetachedRowStream> {
118-
self.execute_on(&self.config.db, q).await
127+
self.impl_execute_on(self.config.db.clone(), q).await
119128
}
120129

121130
/// Executes a query on the provided database and returns a [`DetaRowStream`]
@@ -124,13 +133,18 @@ impl Graph {
124133
/// All errors with the `Transient` error class as well as a few other error classes are considered retryable.
125134
/// This includes errors during a leader election or when the transaction resources on the server (memory, handles, ...) are exhausted.
126135
/// Retries happen with an exponential backoff until a retry delay exceeds 60s, at which point the query fails with the last error as it would without any retry.
127-
pub async fn execute_on(&self, db: &str, q: Query) -> Result<DetachedRowStream> {
136+
pub async fn execute_on(&self, db: impl Into<Database>, q: Query) -> Result<DetachedRowStream> {
137+
self.impl_execute_on(Some(db.into()), q).await
138+
}
139+
140+
async fn impl_execute_on(&self, db: Option<Database>, q: Query) -> Result<DetachedRowStream> {
128141
backoff::future::retry_notify(
129142
self.pool.manager().backoff(),
130143
|| {
131144
let pool = &self.pool;
132145
let fetch_size = self.config.fetch_size;
133146
let query = &q;
147+
let db = db.as_deref();
134148
async move {
135149
let connection = pool.get().await.map_err(crate::Error::from)?;
136150
query.execute_retryable(db, fetch_size, connection).await

lib/src/messages.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,8 @@ impl BoltRequest {
132132
BoltRequest::Hello(hello::Hello::new(data))
133133
}
134134

135-
pub fn run(db: &str, query: &str, params: BoltMap) -> BoltRequest {
136-
BoltRequest::Run(Run::new(db.into(), query.into(), params))
135+
pub fn run(db: Option<&str>, query: &str, params: BoltMap) -> BoltRequest {
136+
BoltRequest::Run(Run::new(db.map(Into::into), query.into(), params))
137137
}
138138

139139
#[cfg_attr(
@@ -152,8 +152,9 @@ impl BoltRequest {
152152
BoltRequest::Discard(discard::Discard::default())
153153
}
154154

155-
pub fn begin(db: &str) -> BoltRequest {
156-
let begin = Begin::new([("db".into(), db.into())].into_iter().collect());
155+
pub fn begin(db: Option<&str>) -> BoltRequest {
156+
let extra = db.into_iter().map(|db| ("db".into(), db.into())).collect();
157+
let begin = Begin::new(extra);
157158
BoltRequest::Begin(begin)
158159
}
159160

lib/src/messages/run.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,13 @@ pub struct Run {
1010
}
1111

1212
impl Run {
13-
pub fn new(db: BoltString, query: BoltString, parameters: BoltMap) -> Run {
13+
pub fn new(db: Option<BoltString>, query: BoltString, parameters: BoltMap) -> Run {
1414
Run {
1515
query,
1616
parameters,
17-
extra: vec![("db".into(), BoltType::String(db))]
17+
extra: db
1818
.into_iter()
19+
.map(|db| ("db".into(), BoltType::String(db)))
1920
.collect(),
2021
}
2122
}
@@ -31,7 +32,7 @@ mod tests {
3132
#[test]
3233
fn should_serialize_run() {
3334
let run = Run::new(
34-
"test".into(),
35+
Some("test".into()),
3536
"query".into(),
3637
vec![("k".into(), "v".into())].into_iter().collect(),
3738
);
@@ -69,7 +70,7 @@ mod tests {
6970

7071
#[test]
7172
fn should_serialize_run_with_no_params() {
72-
let run = Run::new("".into(), "query".into(), BoltMap::default());
73+
let run = Run::new(None, "query".into(), BoltMap::default());
7374

7475
let bytes: Bytes = run.into_bytes(Version::V4_1).unwrap();
7576

@@ -85,11 +86,7 @@ mod tests {
8586
b'r',
8687
b'y',
8788
map::TINY,
88-
map::TINY | 1,
89-
string::TINY | 2,
90-
b'd',
91-
b'b',
92-
string::TINY,
89+
map::TINY,
9390
])
9491
);
9592
}

lib/src/query.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,11 @@ impl Query {
4545
self.params.value.contains_key(key)
4646
}
4747

48-
pub(crate) async fn run(self, db: &str, connection: &mut ManagedConnection) -> Result<()> {
48+
pub(crate) async fn run(
49+
self,
50+
db: Option<&str>,
51+
connection: &mut ManagedConnection,
52+
) -> Result<()> {
4953
let request = BoltRequest::run(db, &self.query, self.params);
5054
Self::try_run(request, connection)
5155
.await
@@ -54,7 +58,7 @@ impl Query {
5458

5559
pub(crate) async fn run_retryable(
5660
&self,
57-
db: &str,
61+
db: Option<&str>,
5862
connection: &mut ManagedConnection,
5963
) -> QueryResult<()> {
6064
let request = BoltRequest::run(db, &self.query, self.params.clone());
@@ -63,7 +67,7 @@ impl Query {
6367

6468
pub(crate) async fn execute_retryable(
6569
&self,
66-
db: &str,
70+
db: Option<&str>,
6771
fetch_size: usize,
6872
mut connection: ManagedConnection,
6973
) -> QueryResult<DetachedRowStream> {
@@ -75,7 +79,7 @@ impl Query {
7579

7680
pub(crate) async fn execute_mut<'conn>(
7781
self,
78-
db: &str,
82+
db: Option<&str>,
7983
fetch_size: usize,
8084
connection: &'conn mut ManagedConnection,
8185
) -> Result<RowStream> {

lib/src/txn.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,18 @@ use crate::{
1414
/// When a transation is started, a dedicated connection is resered and moved into the handle which
1515
/// will be released to the connection pool when the [`Txn`] handle is dropped.
1616
pub struct Txn {
17-
db: Database,
17+
db: Option<Database>,
1818
fetch_size: usize,
1919
connection: ManagedConnection,
2020
}
2121

2222
impl Txn {
2323
pub(crate) async fn new(
24-
db: Database,
24+
db: Option<Database>,
2525
fetch_size: usize,
2626
mut connection: ManagedConnection,
2727
) -> Result<Self> {
28-
let begin = BoltRequest::begin(&db);
28+
let begin = BoltRequest::begin(db.as_deref());
2929
match connection.send_recv(begin).await? {
3030
BoltResponse::Success(_) => Ok(Txn {
3131
db,
@@ -49,12 +49,12 @@ impl Txn {
4949

5050
/// Runs a single query and discards the stream.
5151
pub async fn run(&mut self, q: Query) -> Result<()> {
52-
q.run(&self.db, &mut self.connection).await
52+
q.run(self.db.as_deref(), &mut self.connection).await
5353
}
5454

5555
/// Executes a query and returns a [`RowStream`]
5656
pub async fn execute(&mut self, q: Query) -> Result<RowStream> {
57-
q.execute_mut(&self.db, self.fetch_size, &mut self.connection)
57+
q.execute_mut(self.db.as_deref(), self.fetch_size, &mut self.connection)
5858
.await
5959
}
6060

0 commit comments

Comments
 (0)