Skip to content

Commit a7a20d5

Browse files
authored
Make result summary available behind a feature flag (#199)
* Add cargo alias to run tasks across a number of feature combinations * Add more variations of StreamingSummary, based on features * Add finish method to get stream summary * Change stream method to take a mutref as that we can expose finish * Allow for finish to return a value * Expose RowItem * Turn some summary fields into structured types * Add integration test for streaming summary * Document all options * Only add t_first if we need it * Fix wrong doc links * Maybe it's enough to mark this as tagged * Fix notification parsing * Rename StreamingSummary to ResultSummary and remove ref version
1 parent edc55c3 commit a7a20d5

File tree

12 files changed

+970
-205
lines changed

12 files changed

+970
-205
lines changed

.cargo/config.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
[alias]
22
xtask = "run --package xtask --"
3+
ff = "hack --package neo4rs --each-feature --exclude-features unstable-serde-packstream-format,unstable-bolt-protocol-impl-v2,unstable-result-summary"

.github/workflows/checks.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ env:
2424
RUST_LOG: debug
2525
CARGO_TERM_COLOR: always
2626
MSRV: 1.75.0
27-
HACK: hack --package neo4rs --each-feature --exclude-features unstable-serde-packstream-format,unstable-bolt-protocol-impl-v2,unstable-streaming-summary
27+
HACK: hack --package neo4rs --each-feature --exclude-features unstable-serde-packstream-format,unstable-bolt-protocol-impl-v2,unstable-result-summary
2828

2929
jobs:
3030
check:

lib/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@ rust-version = "1.75.0"
1717

1818
[features]
1919
json = ["serde_json"]
20-
unstable-v1 = ["unstable-bolt-protocol-impl-v2", "unstable-streaming-summary"]
20+
unstable-v1 = ["unstable-bolt-protocol-impl-v2", "unstable-result-summary"]
2121
unstable-serde-packstream-format = []
22-
unstable-streaming-summary = ["unstable-serde-packstream-format"]
22+
unstable-result-summary = ["unstable-serde-packstream-format"]
2323
unstable-bolt-protocol-impl-v2 = [
2424
"unstable-serde-packstream-format",
25-
"unstable-streaming-summary",
25+
"unstable-result-summary",
2626
"dep:nav-types",
2727
"dep:time",
2828
]

lib/include/result_summary.rs

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
{
2+
use ::futures::TryStreamExt as _;
3+
4+
use neo4rs::summary::{Type, Counters, ResultSummary};
5+
6+
#[allow(dead_code)]
7+
#[derive(Debug, PartialEq, serde::Deserialize)]
8+
struct N {
9+
prop: String,
10+
}
11+
12+
fn assert_item(n: N) {
13+
assert_eq!(n.prop, "frobnicate");
14+
}
15+
16+
fn assert_summary(summary: &ResultSummary) {
17+
assert!(summary.available_after().is_some());
18+
assert!(summary.consumed_after().is_some());
19+
assert!(summary.db().is_some());
20+
assert_eq!(summary.query_type(), Type::ReadWrite);
21+
assert_eq!(summary.stats(), &Counters { nodes_created: 1, properties_set: 1, labels_added: 1, ..Default::default()});
22+
}
23+
24+
//
25+
// next_or_summary
26+
27+
let mut stream = graph
28+
.execute(query("CREATE (n:Node {prop: 'frobnicate'}) RETURN n"))
29+
.await
30+
.unwrap();
31+
32+
let Ok(Some(row)) = stream.next_or_summary().await else { panic!() };
33+
assert!(row.row().is_some());
34+
assert!(row.summary().is_none());
35+
36+
assert_item(row.row().unwrap().to().unwrap());
37+
38+
let Ok(Some(row)) = stream.next_or_summary().await else { panic!() };
39+
assert!(row.row().is_none());
40+
assert!(row.summary().is_some());
41+
42+
assert_summary(row.summary().unwrap());
43+
44+
45+
//
46+
// as_items
47+
48+
let mut stream = graph
49+
.execute(query("CREATE (n:Node {prop: 'frobnicate'}) RETURN n"))
50+
.await
51+
.unwrap();
52+
53+
let items = stream.as_items::<N>()
54+
.try_collect::<Vec<_>>()
55+
.await
56+
.unwrap();
57+
58+
for item in items {
59+
match item {
60+
RowItem::Row(row) => assert_item(row),
61+
RowItem::Summary(summary) => assert_summary(&summary),
62+
}
63+
}
64+
65+
66+
//
67+
// into_stream + finish
68+
69+
let mut stream = graph
70+
.execute(query("CREATE (n:Node {prop: 'frobnicate'}) RETURN n"))
71+
.await
72+
.unwrap();
73+
74+
let items = stream.into_stream_as::<N>()
75+
.try_collect::<Vec<_>>()
76+
.await
77+
.unwrap();
78+
79+
let Ok(Some(summary)) = stream.finish().await else { panic!() };
80+
81+
for item in items {
82+
assert_item(item);
83+
}
84+
85+
assert_summary(&summary);
86+
}

lib/src/graph.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ impl Graph {
7171
}
7272

7373
/// Runs a query on the configured database using a connection from the connection pool,
74-
/// It doesn't return any [`RowStream`] as the `run` abstraction discards any stream.
74+
/// It doesn't return any [`DetachedRowStream`] as the `run` abstraction discards any stream.
7575
///
7676
/// This operation retires the query on certain failures.
7777
/// All errors with the `Transient` error class as well as a few other error classes are considered retryable.
@@ -86,7 +86,7 @@ impl Graph {
8686
}
8787

8888
/// Runs a query on the provided database using a connection from the connection pool.
89-
/// It doesn't return any [`RowStream`] as the `run` abstraction discards any stream.
89+
/// It doesn't return any [`DetachedRowStream`] as the `run` abstraction discards any stream.
9090
///
9191
/// This operation retires the query on certain failures.
9292
/// All errors with the `Transient` error class as well as a few other error classes are considered retryable.
@@ -127,7 +127,7 @@ impl Graph {
127127
self.impl_execute_on(self.config.db.clone(), q).await
128128
}
129129

130-
/// Executes a query on the provided database and returns a [`DetaRowStream`]
130+
/// Executes a query on the provided database and returns a [`DetachedRowStream`]
131131
///
132132
/// This operation retires the query on certain failures.
133133
/// All errors with the `Transient` error class as well as a few other error classes are considered retryable.

lib/src/lib.rs

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,37 @@
148148
//!
149149
//! ```
150150
//!
151+
#![cfg_attr(
152+
feature = "unstable-result-summary",
153+
doc = r##"### Streaming summary
154+
155+
To get access to the result summary after streaming a [`RowStream`], you can use the [`RowStream::next_or_summary`] method.
156+
Alternatively, you can use one of the [`RowStream::as_row_items`], [`RowStream::as_items`], or [`RowStream::column_to_items`]
157+
methods to get the result as a stream of [`RowItem`], whis an enum of either the row or the summary.
158+
The last option is to use one of the [`RowStream::into_stream`], [`RowStream::into_stream_as`], or [`RowStream::column_into_stream`] methods
159+
and after the stream is consumed, call [`RowStream::finish`] to get the summary.
160+
161+
```no_run
162+
use neo4rs::*;
163+
164+
#[tokio::main]
165+
async fn main() {
166+
let uri = "127.0.0.1:7687";
167+
let user = "neo4j";
168+
let pass = "neo";
169+
let graph = Graph::new(uri, user, pass).await.unwrap();
170+
171+
"##
172+
)]
173+
#![cfg_attr(feature="unstable-result-summary", doc = include_str!("../include/result_summary.rs"))]
174+
#![cfg_attr(
175+
feature = "unstable-result-summary",
176+
doc = r"
177+
}
178+
```
179+
180+
"
181+
)]
151182
//! ### Rollback a transaction
152183
//! ```no_run
153184
//! use neo4rs::*;
@@ -436,7 +467,7 @@ mod pool;
436467
mod query;
437468
mod row;
438469
mod stream;
439-
#[cfg(feature = "unstable-streaming-summary")]
470+
#[cfg(feature = "unstable-result-summary")]
440471
pub mod summary;
441472
mod txn;
442473
mod types;
@@ -450,7 +481,7 @@ pub use crate::errors::{
450481
pub use crate::graph::{query, Graph};
451482
pub use crate::query::Query;
452483
pub use crate::row::{Node, Path, Point2D, Point3D, Relation, Row, UnboundedRelation};
453-
pub use crate::stream::RowStream;
484+
pub use crate::stream::{DetachedRowStream, RowItem, RowStream};
454485
pub use crate::txn::Txn;
455486
pub use crate::types::serde::{
456487
DeError, EndNodeId, Id, Indices, Keys, Labels, Nodes, Offset, Relationships, StartNodeId,

lib/src/messages.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,10 +148,18 @@ impl BoltRequest {
148148
feature = "unstable-bolt-protocol-impl-v2",
149149
deprecated(since = "0.9.0", note = "Use `crate::bolt::Discard` instead.")
150150
)]
151-
pub fn discard() -> BoltRequest {
151+
pub fn discard_all() -> BoltRequest {
152152
BoltRequest::Discard(discard::Discard::default())
153153
}
154154

155+
#[cfg_attr(
156+
feature = "unstable-bolt-protocol-impl-v2",
157+
deprecated(since = "0.9.0", note = "Use `crate::bolt::Discard` instead.")
158+
)]
159+
pub fn discard_all_for(query_id: i64) -> BoltRequest {
160+
BoltRequest::Discard(discard::Discard::new(-1, query_id))
161+
}
162+
155163
pub fn begin(db: Option<&str>) -> BoltRequest {
156164
let extra = db.into_iter().map(|db| ("db".into(), db.into())).collect();
157165
let begin = Begin::new(extra);

lib/src/query.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ impl Query {
9494

9595
#[cfg(not(feature = "unstable-bolt-protocol-impl-v2"))]
9696
{
97-
match connection.send_recv(BoltRequest::discard()).await {
97+
match connection.send_recv(BoltRequest::discard_all()).await {
9898
Ok(BoltResponse::Success(_)) => Ok(()),
9999
otherwise => wrap_error(otherwise, "DISCARD"),
100100
}
@@ -117,7 +117,17 @@ impl Query {
117117
Self::try_request(request, connection).await.map(|success| {
118118
let fields: BoltList = success.get("fields").unwrap_or_default();
119119
let qid: i64 = success.get("qid").unwrap_or(-1);
120-
RowStream::new(qid, fields, fetch_size)
120+
121+
#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
122+
{
123+
let available: i64 = success.get("t_first").unwrap_or(-1);
124+
RowStream::new(qid, available, fields, fetch_size)
125+
}
126+
127+
#[cfg(not(feature = "unstable-bolt-protocol-impl-v2"))]
128+
{
129+
RowStream::new(qid, fields, fetch_size)
130+
}
121131
})
122132
}
123133

0 commit comments

Comments
 (0)