Skip to content

Commit 7561bbe

Browse files
authored
Return the result summary from run like methods (#221)
1 parent c2b733d commit 7561bbe

File tree

6 files changed

+64
-35
lines changed

6 files changed

+64
-35
lines changed

lib/src/graph.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@ use {
33
crate::connection::{ConnectionInfo, Routing},
44
crate::graph::ConnectionPoolManager::Routed,
55
crate::routing::{ClusterRoutingTableProvider, RoutedConnectionManager},
6+
crate::summary::ResultSummary,
67
log::debug,
78
};
89

910
use crate::graph::ConnectionPoolManager::Direct;
1011
use crate::pool::ManagedConnection;
12+
use crate::RunResult;
1113
use crate::{
1214
config::{Config, ConfigBuilder, Database, LiveConfig},
1315
errors::Result,
@@ -163,7 +165,7 @@ impl Graph {
163165
/// Use [`Graph::run`] for cases where you just want a write operation
164166
///
165167
/// use [`Graph::execute`] when you are interested in the result stream
166-
pub async fn run(&self, q: impl Into<Query>) -> Result<()> {
168+
pub async fn run(&self, q: impl Into<Query>) -> Result<RunResult> {
167169
self.impl_run_on(self.config.db.clone(), q.into(), Operation::Write)
168170
.await
169171
}
@@ -185,7 +187,7 @@ impl Graph {
185187
db: impl Into<Database>,
186188
q: impl Into<Query>,
187189
operation: Operation,
188-
) -> Result<()> {
190+
) -> Result<ResultSummary> {
189191
self.impl_run_on(Some(db.into()), q.into(), operation).await
190192
}
191193

@@ -201,7 +203,7 @@ impl Graph {
201203
db: Option<Database>,
202204
q: Query,
203205
operation: Operation,
204-
) -> Result<()> {
206+
) -> Result<RunResult> {
205207
backoff::future::retry_notify(
206208
self.pool.backoff(),
207209
|| {

lib/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -481,7 +481,7 @@ pub use crate::errors::{
481481
Error, Neo4jClientErrorKind, Neo4jError, Neo4jErrorKind, Neo4jSecurityErrorKind, Result,
482482
};
483483
pub use crate::graph::{query, Graph};
484-
pub use crate::query::{Query, QueryParameter};
484+
pub use crate::query::{Query, QueryParameter, RunResult};
485485
pub use crate::row::{Node, Path, Point2D, Point3D, Relation, Row, UnboundedRelation};
486486
pub use crate::stream::{DetachedRowStream, RowStream};
487487
pub use crate::txn::Txn;

lib/src/query.rs

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::cell::{Cell, RefCell};
22

33
#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
4-
use crate::bolt::{Discard, Summary, WrapExtra as _};
4+
use crate::{bolt::Summary, summary::ResultSummary};
55
use crate::{
66
errors::Result,
77
messages::{BoltRequest, BoltResponse},
@@ -11,6 +11,11 @@ use crate::{
1111
Error, Success,
1212
};
1313

14+
#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
15+
pub type RunResult = ResultSummary;
16+
#[cfg(not(feature = "unstable-bolt-protocol-impl-v2"))]
17+
pub type RunResult = ();
18+
1419
/// Abstracts a cypher query that is sent to neo4j server.
1520
#[derive(Clone)]
1621
pub struct Query {
@@ -83,7 +88,7 @@ impl Query {
8388
&self.params
8489
}
8590

86-
pub(crate) async fn run(self, connection: &mut ManagedConnection) -> Result<()> {
91+
pub(crate) async fn run(self, connection: &mut ManagedConnection) -> Result<RunResult> {
8792
let request = BoltRequest::run(&self.query, self.params, self.extra);
8893
Self::try_run(request, connection)
8994
.await
@@ -93,7 +98,7 @@ impl Query {
9398
pub(crate) async fn run_retryable(
9499
&self,
95100
connection: &mut ManagedConnection,
96-
) -> QueryResult<()> {
101+
) -> QueryResult<RunResult> {
97102
let request = BoltRequest::run(&self.query, self.params.clone(), self.extra.clone());
98103
Self::try_run(request, connection).await
99104
}
@@ -120,24 +125,12 @@ impl Query {
120125
.map_err(unwrap_backoff)
121126
}
122127

123-
async fn try_run(request: BoltRequest, connection: &mut ManagedConnection) -> QueryResult<()> {
124-
let _ = Self::try_request(request, connection).await?;
125-
126-
#[cfg(not(feature = "unstable-bolt-protocol-impl-v2"))]
127-
{
128-
match connection.send_recv(BoltRequest::discard_all()).await {
129-
Ok(BoltResponse::Success(_)) => Ok(()),
130-
otherwise => wrap_error(otherwise, "DISCARD"),
131-
}
132-
}
133-
134-
#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
135-
{
136-
match connection.send_recv_as(Discard::all()).await {
137-
Ok(Summary::Success(_discard_success)) => Ok(()),
138-
otherwise => wrap_error(otherwise, "DISCARD"),
139-
}
140-
}
128+
async fn try_run(
129+
request: BoltRequest,
130+
connection: &mut ManagedConnection,
131+
) -> QueryResult<RunResult> {
132+
let result = Self::try_execute(request, 4096, connection).await?;
133+
Ok(result.finish(connection).await?)
141134
}
142135

143136
async fn try_execute(

lib/src/stream.rs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use crate::{
1313
row::Row,
1414
txn::TransactionHandle,
1515
types::BoltList,
16-
DeError,
16+
DeError, RunResult,
1717
};
1818

1919
use futures::{stream::try_unfold, TryStream};
@@ -26,11 +26,6 @@ type BoxedSummary = Box<ResultSummary>;
2626
#[cfg(not(feature = "unstable-result-summary"))]
2727
type BoxedSummary = ();
2828

29-
#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
30-
type FinishResult = ResultSummary;
31-
#[cfg(not(feature = "unstable-bolt-protocol-impl-v2"))]
32-
type FinishResult = ();
33-
3429
/// An abstraction over a stream of rows, this is returned as a result of [`crate::Txn::execute`].
3530
///
3631
/// A stream needs a running transaction to be consumed.
@@ -250,7 +245,7 @@ impl RowStream {
250245

251246
/// Stop consuming the stream and return a summary, if available.
252247
/// Stopping the stream will also discard any messages on the server side.
253-
pub async fn finish(mut self, mut handle: impl TransactionHandle) -> Result<FinishResult> {
248+
pub async fn finish(mut self, mut handle: impl TransactionHandle) -> Result<RunResult> {
254249
self.buffer.clear();
255250

256251
#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
@@ -440,7 +435,7 @@ impl DetachedRowStream {
440435

441436
/// Stop consuming the stream and return a summary, if available.
442437
/// Stopping the stream will also discard any messages on the server side.
443-
pub async fn finish(mut self) -> Result<FinishResult> {
438+
pub async fn finish(mut self) -> Result<RunResult> {
444439
self.stream.finish(&mut self.connection).await
445440
}
446441

lib/src/summary.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,29 @@ pub struct Counters {
8383
pub system_updates: u64,
8484
}
8585

86+
impl Counters {
87+
pub fn merge(&mut self, other: &Self) {
88+
self.nodes_created += other.nodes_created;
89+
self.nodes_deleted += other.nodes_deleted;
90+
self.relationships_created += other.relationships_created;
91+
self.relationships_deleted += other.relationships_deleted;
92+
self.properties_set += other.properties_set;
93+
self.labels_added += other.labels_added;
94+
self.labels_removed += other.labels_removed;
95+
self.indexes_added += other.indexes_added;
96+
self.indexes_removed += other.indexes_removed;
97+
self.constraints_added += other.constraints_added;
98+
self.constraints_removed += other.constraints_removed;
99+
self.system_updates += other.system_updates;
100+
}
101+
}
102+
103+
impl std::ops::AddAssign<&Counters> for Counters {
104+
fn add_assign(&mut self, other: &Counters) {
105+
self.merge(other);
106+
}
107+
}
108+
86109
#[derive(Debug, Clone, PartialEq, Deserialize)]
87110
#[serde(from = "SummaryBuilder")]
88111
pub enum Streaming {

lib/src/txn.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::{
77
pool::ManagedConnection,
88
query::Query,
99
stream::RowStream,
10-
Operation,
10+
Operation, RunResult,
1111
};
1212

1313
/// A handle which is used to control a transaction, created as a result of [`crate::Graph::start_txn`]
@@ -40,6 +40,22 @@ impl Txn {
4040
}
4141
}
4242

43+
#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
44+
/// Runs multiple queries one after the other in the same connection,
45+
/// merging all counters from each result summary.
46+
pub async fn run_queries<Q: Into<Query>>(
47+
&mut self,
48+
queries: impl IntoIterator<Item = Q>,
49+
) -> Result<crate::summary::Counters> {
50+
let mut counters = crate::summary::Counters::default();
51+
for query in queries {
52+
let summary = self.run(query.into()).await?;
53+
counters += summary.stats();
54+
}
55+
Ok(counters)
56+
}
57+
58+
#[cfg(not(feature = "unstable-bolt-protocol-impl-v2"))]
4359
/// Runs multiple queries one after the other in the same connection
4460
pub async fn run_queries<Q: Into<Query>>(
4561
&mut self,
@@ -52,7 +68,7 @@ impl Txn {
5268
}
5369

5470
/// Runs a single query and discards the stream.
55-
pub async fn run(&mut self, q: impl Into<Query>) -> Result<()> {
71+
pub async fn run(&mut self, q: impl Into<Query>) -> Result<RunResult> {
5672
let mut query = q.into();
5773
if let Some(db) = self.db.as_ref() {
5874
query = query.extra("db", db.to_string());

0 commit comments

Comments
 (0)