Skip to content

Commit 2fe8578

Browse files
authored
Refactor how connections are created (#241)
* Fix typos * Store routing context in a clonable slice * Split ConnectionInfo into two opts for separate stages * Don't double wrap the RoutingTableProvider, Arc is already boxed * Move import
1 parent fe5baf8 commit 2fe8578

File tree

7 files changed

+106
-70
lines changed

7 files changed

+106
-70
lines changed

lib/src/bolt/request/route.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ impl Serialize for Routing {
2323
Routing::No => serializer.serialize_none(),
2424
Routing::Yes(routing) => {
2525
let mut map = serializer.serialize_map(Some(routing.len()))?;
26-
for (k, v) in routing {
26+
for (k, v) in routing.iter() {
2727
map.serialize_entry(k.value.as_str(), v.value.as_str())?;
2828
}
2929
map.end()
@@ -76,7 +76,7 @@ mod tests {
7676
#[test]
7777
fn serialize() {
7878
let route = RouteBuilder::new(
79-
Routing::Yes(vec![("address".into(), "localhost:7687".into())]),
79+
Routing::Yes([("address".into(), "localhost:7687".into())].into()),
8080
vec!["bookmark".into()],
8181
)
8282
.with_db(Database::from("neo4j"))
@@ -99,7 +99,7 @@ mod tests {
9999
#[test]
100100
fn serialize_no_db() {
101101
let builder = RouteBuilder::new(
102-
Routing::Yes(vec![("address".into(), "localhost:7687".into())]),
102+
Routing::Yes([("address".into(), "localhost:7687".into())].into()),
103103
vec!["bookmark".into()],
104104
);
105105
let route = builder.build(Version::V4_3);
@@ -121,7 +121,7 @@ mod tests {
121121
#[test]
122122
fn serialize_no_db_v4_4() {
123123
let builder = RouteBuilder::new(
124-
Routing::Yes(vec![("address".into(), "localhost:7687".into())]),
124+
Routing::Yes([("address".into(), "localhost:7687".into())].into()),
125125
vec!["bookmark".into()],
126126
);
127127
let route = builder.build(Version::V4_4);
@@ -147,7 +147,7 @@ mod tests {
147147
#[test]
148148
fn serialize_with_db_v4_4() {
149149
let builder = RouteBuilder::new(
150-
Routing::Yes(vec![("address".into(), "localhost:7687".into())]),
150+
Routing::Yes([("address".into(), "localhost:7687".into())].into()),
151151
vec!["bookmark".into()],
152152
);
153153
let route = builder

lib/src/connection.rs

Lines changed: 87 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ pub struct Connection {
5353

5454
impl Connection {
5555
pub(crate) async fn new(info: &ConnectionInfo) -> Result<Self> {
56-
let mut connection = Self::prepare(info).await?;
57-
let hello = info.to_hello(connection.version);
56+
let mut connection = Self::prepare(&info.prepare).await?;
57+
let hello = info.init.to_hello(connection.version);
5858
connection.hello(hello).await?;
5959
Ok(connection)
6060
}
@@ -63,14 +63,14 @@ impl Connection {
6363
self.version
6464
}
6565

66-
pub(crate) async fn prepare(info: &ConnectionInfo) -> Result<Self> {
67-
let mut stream = match &info.host {
68-
Host::Domain(domain) => TcpStream::connect((&**domain, info.port)).await?,
69-
Host::Ipv4(ip) => TcpStream::connect((*ip, info.port)).await?,
70-
Host::Ipv6(ip) => TcpStream::connect((*ip, info.port)).await?,
66+
pub(crate) async fn prepare(opts: &PrepareOpts) -> Result<Self> {
67+
let mut stream = match &opts.host {
68+
Host::Domain(domain) => TcpStream::connect((&**domain, opts.port)).await?,
69+
Host::Ipv4(ip) => TcpStream::connect((*ip, opts.port)).await?,
70+
Host::Ipv6(ip) => TcpStream::connect((*ip, opts.port)).await?,
7171
};
7272

73-
Ok(match &info.encryption {
73+
Ok(match &opts.encryption {
7474
Some((connector, domain)) => {
7575
let mut stream = connector.connect(domain.clone(), stream).await?;
7676
let version = Self::init(&mut stream).await?;
@@ -269,7 +269,7 @@ impl Connection {
269269
#[derive(Debug, Clone, PartialEq, Eq)]
270270
pub(crate) enum Routing {
271271
No,
272-
Yes(Vec<(BoltString, BoltString)>),
272+
Yes(Arc<[(BoltString, BoltString)]>),
273273
}
274274

275275
impl From<Routing> for Option<BoltMap> {
@@ -278,8 +278,8 @@ impl From<Routing> for Option<BoltMap> {
278278
Routing::No => None,
279279
Routing::Yes(routing) => Some(
280280
routing
281-
.into_iter()
282-
.map(|(k, v)| (k, BoltType::String(v)))
281+
.iter()
282+
.map(|(k, v)| (k.clone(), BoltType::String(v.clone())))
283283
.collect(),
284284
),
285285
}
@@ -302,24 +302,78 @@ impl Display for Routing {
302302
}
303303
}
304304

305+
#[derive(Clone)]
306+
pub(crate) struct PrepareOpts {
307+
pub(crate) host: Host<Arc<str>>,
308+
pub(crate) port: u16,
309+
pub(crate) encryption: Option<(TlsConnector, ServerName<'static>)>,
310+
}
311+
312+
impl Debug for PrepareOpts {
313+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
314+
f.debug_struct("PrepareOpts")
315+
.field("host", &self.host)
316+
.field("port", &self.port)
317+
.field("encryption", &self.encryption.is_some())
318+
.finish()
319+
}
320+
}
321+
322+
#[derive(Clone)]
323+
pub(crate) struct InitOpts {
324+
pub(crate) user: Arc<str>,
325+
pub(crate) password: Arc<str>,
326+
pub(crate) routing: Routing,
327+
}
328+
329+
impl Debug for InitOpts {
330+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
331+
f.debug_struct("InitOpts")
332+
.field("user", &self.user)
333+
.field("password", &"***")
334+
.field("routing", &self.routing)
335+
.finish()
336+
}
337+
}
338+
339+
impl InitOpts {
340+
#[cfg(not(feature = "unstable-bolt-protocol-impl-v2"))]
341+
pub(crate) fn to_hello(&self, version: Version) -> BoltRequest {
342+
HelloBuilder::new(&*self.user, &*self.password)
343+
.with_routing(self.routing.clone())
344+
.build(version)
345+
}
346+
347+
#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
348+
pub(crate) fn to_hello(&self, version: Version) -> Hello {
349+
match self.routing {
350+
Routing::No => HelloBuilder::new(&self.user, &self.password).build(version),
351+
Routing::Yes(ref routing) => HelloBuilder::new(&self.user, &self.password)
352+
.with_routing(
353+
routing
354+
.iter()
355+
.map(|(k, v)| (k.value.as_str(), v.value.as_str())),
356+
)
357+
.build(version),
358+
}
359+
}
360+
}
361+
362+
#[derive(Clone)]
305363
pub(crate) struct ConnectionInfo {
306-
pub user: Arc<str>,
307-
pub password: Arc<str>,
308-
pub host: Host<Arc<str>>,
309-
pub port: u16,
310-
pub routing: Routing,
311-
pub encryption: Option<(TlsConnector, ServerName<'static>)>,
364+
pub(crate) prepare: PrepareOpts,
365+
pub(crate) init: InitOpts,
312366
}
313367

314368
impl Debug for ConnectionInfo {
315369
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
316370
f.debug_struct("ConnectionInfo")
317-
.field("user", &self.user)
371+
.field("user", &self.init.user)
318372
.field("password", &"***")
319-
.field("host", &self.host)
320-
.field("port", &self.port)
321-
.field("routing", &self.routing)
322-
.field("encryption", &self.encryption.is_some())
373+
.field("host", &self.prepare.host)
374+
.field("port", &self.prepare.port)
375+
.field("routing", &self.init.routing)
376+
.field("encryption", &self.prepare.encryption.is_some())
323377
.finish_non_exhaustive()
324378
}
325379
}
@@ -360,7 +414,8 @@ impl ConnectionInfo {
360414
"Client-side routing is in experimental mode.",
361415
"It is possible that operations against a cluster (such as Aura) will fail."
362416
));
363-
Routing::Yes(url.routing_context())
417+
let context = url.routing_context();
418+
Routing::Yes(context.into())
364419
} else {
365420
Routing::No
366421
};
@@ -373,14 +428,19 @@ impl ConnectionInfo {
373428
Host::Ipv6(d) => Host::Ipv6(d),
374429
};
375430

376-
Ok(Self {
377-
user: user.into(),
378-
password: password.into(),
431+
let prepare = PrepareOpts {
379432
host,
380433
port: url.port(),
381434
encryption,
435+
};
436+
437+
let init = InitOpts {
438+
user: user.into(),
439+
password: password.into(),
382440
routing,
383-
})
441+
};
442+
443+
Ok(Self { prepare, init })
384444
}
385445

386446
fn tls_connector(
@@ -432,27 +492,6 @@ impl ConnectionInfo {
432492

433493
Ok((connector, domain))
434494
}
435-
436-
#[cfg(not(feature = "unstable-bolt-protocol-impl-v2"))]
437-
pub(crate) fn to_hello(&self, version: Version) -> BoltRequest {
438-
HelloBuilder::new(&*self.user, &*self.password)
439-
.with_routing(self.routing.clone())
440-
.build(version)
441-
}
442-
443-
#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
444-
pub(crate) fn to_hello(&self, version: Version) -> Hello {
445-
match self.routing {
446-
Routing::No => HelloBuilder::new(&self.user, &self.password).build(version),
447-
Routing::Yes(ref routing) => HelloBuilder::new(&self.user, &self.password)
448-
.with_routing(
449-
routing
450-
.iter()
451-
.map(|(k, v)| (k.value.as_str(), v.value.as_str())),
452-
)
453-
.build(version),
454-
}
455-
}
456495
}
457496

458497
#[derive(Clone, Debug)]

lib/src/graph.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use {
55
crate::routing::{ClusterRoutingTableProvider, RoutedConnectionManager},
66
crate::summary::ResultSummary,
77
log::debug,
8+
std::sync::Arc,
89
};
910

1011
use crate::graph::ConnectionPoolManager::Direct;
@@ -75,11 +76,11 @@ impl Graph {
7576
&config.password,
7677
&config.tls_config,
7778
)?;
78-
if matches!(info.routing, Routing::Yes(_)) {
79+
if matches!(info.init.routing, Routing::Yes(_)) {
7980
debug!("Routing enabled, creating a routed connection manager");
8081
let pool = Routed(RoutedConnectionManager::new(
8182
&config,
82-
Box::new(ClusterRoutingTableProvider),
83+
Arc::new(ClusterRoutingTableProvider),
8384
)?);
8485
Ok(Graph {
8586
config: config.into_live_config(),

lib/src/routing/connection_registry.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ impl Default for ConnectionRegistry {
6363
async fn refresh_routing_table(
6464
config: Config,
6565
registry: Arc<ConnectionRegistry>,
66-
provider: Arc<Box<dyn RoutingTableProvider>>,
66+
provider: Arc<dyn RoutingTableProvider>,
6767
bookmarks: &[String],
6868
) -> Result<u64, Error> {
6969
debug!("Routing table expired or empty, refreshing...");
@@ -109,7 +109,7 @@ async fn refresh_routing_table(
109109
pub(crate) fn start_background_updater(
110110
config: &Config,
111111
registry: Arc<ConnectionRegistry>,
112-
provider: Arc<Box<dyn RoutingTableProvider>>,
112+
provider: Arc<dyn RoutingTableProvider>,
113113
) -> Sender<RegistryCommand> {
114114
let config_clone = config.clone();
115115
let (tx, mut rx) = mpsc::channel(1);
@@ -266,9 +266,7 @@ mod tests {
266266
let ttl = refresh_routing_table(
267267
config.clone(),
268268
registry.clone(),
269-
Arc::new(Box::new(TestRoutingTableProvider::new(
270-
cluster_routing_table,
271-
))),
269+
Arc::new(TestRoutingTableProvider::new(cluster_routing_table)),
272270
&[],
273271
)
274272
.await

lib/src/routing/routed_connection_manager.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,7 @@ use crate::{Config, Error, Operation};
1010
use backoff::{ExponentialBackoff, ExponentialBackoffBuilder};
1111
use futures::lock::Mutex;
1212
use log::{debug, error};
13-
use std::sync::Arc;
14-
use std::time::Duration;
13+
use std::{sync::Arc, time::Duration};
1514
use tokio::sync::mpsc::Sender;
1615

1716
#[derive(Clone)]
@@ -24,7 +23,7 @@ pub struct RoutedConnectionManager {
2423
}
2524

2625
impl RoutedConnectionManager {
27-
pub fn new(config: &Config, provider: Box<dyn RoutingTableProvider>) -> Result<Self, Error> {
26+
pub fn new(config: &Config, provider: Arc<dyn RoutingTableProvider>) -> Result<Self, Error> {
2827
let backoff = Arc::new(
2928
ExponentialBackoffBuilder::new()
3029
.with_initial_interval(Duration::from_millis(1))
@@ -35,8 +34,7 @@ impl RoutedConnectionManager {
3534
);
3635

3736
let connection_registry = Arc::new(ConnectionRegistry::default());
38-
let channel =
39-
start_background_updater(config, connection_registry.clone(), provider.into());
37+
let channel = start_background_updater(config, connection_registry.clone(), provider);
4038
Ok(RoutedConnectionManager {
4139
load_balancing_strategy: Arc::new(RoundRobinStrategy::default()),
4240
bookmarks: Arc::new(Mutex::new(vec![])),

lib/src/routing/routing_table_provider.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ impl RoutingTableProvider for ClusterRoutingTableProvider {
3030
&config.tls_config,
3131
)?;
3232
let mut connection = Connection::new(&info).await?;
33-
let mut builder = RouteBuilder::new(info.routing, bookmarks);
33+
let mut builder = RouteBuilder::new(info.init.routing, bookmarks);
3434
if let Some(db) = config.db.clone() {
3535
builder = builder.with_db(db);
3636
}

lib/src/types/serde/typ.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1141,7 +1141,7 @@ mod tests {
11411141
#[test]
11421142
fn tuple_struct_from_map_fails() {
11431143
// We do not support this since maps are unordered and
1144-
// we cannot gurantee that the values are in the same
1144+
// we cannot guarantee that the values are in the same
11451145
// order as the tuple struct fields.
11461146
#[derive(Clone, Debug, PartialEq, Eq, Deserialize)]
11471147
struct Person(String, u8);
@@ -2063,9 +2063,9 @@ mod tests {
20632063
let bolt = BoltLocalTime::from(time);
20642064
let bolt = BoltType::LocalTime(bolt);
20652065

2066-
let acutal = bolt.to::<(NaiveTime, Option<Offset>)>().unwrap();
2067-
assert_eq!(acutal.0, time);
2068-
assert_eq!(acutal.1, None);
2066+
let actual = bolt.to::<(NaiveTime, Option<Offset>)>().unwrap();
2067+
assert_eq!(actual.0, time);
2068+
assert_eq!(actual.1, None);
20692069
}
20702070

20712071
#[test]

0 commit comments

Comments
 (0)