Skip to content

Commit 1f146ac

Browse files
authored
Add bookmarks support (#238)
* Add bookmarks support * Fix integration tests * Duplicate commit function * Address comments * Fix compilation error in return of a commit * Damn
1 parent afbe45d commit 1f146ac

18 files changed

+638
-132
lines changed

lib/include/bookmarks.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
{
2+
let mut txn = graph.start_txn().await.expect("Failed to start a new transaction");
3+
let id = uuid::Uuid::new_v4().to_string();
4+
txn.run(query("CREATE (p:Person {id: $id})").param("id", id.clone())).await.unwrap();
5+
txn.run(query("CREATE (p:Person {id: $id})").param("id", id.clone())).await.unwrap();
6+
// graph.execute(..) will not see the changes done above as the txn is not committed yet
7+
let mut result = graph.execute(query("MATCH (p:Person) WHERE p.id = $id RETURN p.id").param("id", id.clone())).await.unwrap();
8+
assert!(result.next().await.unwrap().is_none());
9+
let bookmark = txn.commit().await.unwrap();
10+
assert!(bookmark.is_some());
11+
if let Some(ref b) = bookmark {
12+
println!("Got a bookmark after commit: {:?}", b);
13+
}
14+
15+
//changes are now seen as the transaction is committed.
16+
let mut txn = graph.start_txn_as(Operation::Read, bookmark.map(|b| vec![b])).await.expect("Failed to start a new transaction");
17+
let mut stream = txn.execute(query("MATCH (p:Person) WHERE p.id = $id RETURN p.id").param("id", id.clone())).await.unwrap();
18+
loop {
19+
let next = stream.next(txn.handle());
20+
if let Ok(Some(record)) = next.await {
21+
println!("Record: {:?}", record);
22+
} else {
23+
break;
24+
}
25+
}
26+
}

lib/src/bolt/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ mod structs;
1414
mod summary;
1515

1616
pub use request::{
17-
Commit, Discard, Goodbye, Hello, HelloBuilder, Pull, Reset, Rollback, WrapExtra,
17+
Begin, Commit, Discard, Goodbye, Hello, HelloBuilder, Pull, Reset, Rollback, WrapExtra,
1818
};
1919
pub use structs::{
2020
Bolt, BoltRef, Date, DateDuration, DateTime, DateTimeZoneId, DateTimeZoneIdRef, Duration,

lib/src/bolt/request/begin.rs

Lines changed: 274 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,274 @@
1+
use crate::bolt::{ExpectedResponse, Summary};
2+
use crate::{Database, Version};
3+
use serde::ser::SerializeMap;
4+
use serde::{Deserialize, Serialize};
5+
use std::fmt::Display;
6+
7+
#[derive(Debug, Clone, PartialEq, Eq)]
8+
pub struct Begin<'a> {
9+
metadata: BeginMeta<'a>,
10+
}
11+
12+
#[derive(Debug, Clone, PartialEq, Eq)]
13+
pub enum BeginExtra<'a> {
14+
V4(Option<&'a str>),
15+
V4_4(Extra<'a>),
16+
}
17+
18+
#[derive(Debug, Clone, PartialEq, Eq)]
19+
#[allow(dead_code)]
20+
pub struct Extra<'a> {
21+
pub(crate) db: Option<&'a str>,
22+
pub(crate) imp_user: Option<&'a str>,
23+
}
24+
25+
#[derive(Debug, Clone, PartialEq, Eq)]
26+
pub struct TxMetadata(Vec<(String, String)>);
27+
28+
#[derive(Debug, Clone, PartialEq, Eq)]
29+
pub struct BeginMeta<'a> {
30+
pub(crate) bookmarks: Vec<String>,
31+
pub(crate) tx_timeout: Option<u32>,
32+
pub(crate) tx_metadata: Option<TxMetadata>,
33+
pub(crate) mode: &'a str,
34+
pub(crate) extra: BeginExtra<'a>,
35+
// To be added when implementing protocol version 5.2
36+
// pub(crate) notifications_minimum_severity: &'a str,
37+
// pub(crate) notifications_disabled_categories: Vec<String>
38+
}
39+
40+
pub struct BeginBuilder<'a> {
41+
bookmarks: Vec<String>,
42+
tx_timeout: Option<u32>,
43+
tx_metadata: Option<TxMetadata>,
44+
mode: &'a str,
45+
db: Option<&'a str>,
46+
imp_user: Option<&'a str>,
47+
}
48+
49+
impl<'a> BeginBuilder<'a> {
50+
pub fn new(db: Option<&'a str>) -> Self {
51+
Self {
52+
bookmarks: Vec::new(),
53+
tx_timeout: None,
54+
tx_metadata: None,
55+
mode: "w", // default is write mode
56+
db,
57+
imp_user: None,
58+
}
59+
}
60+
61+
pub fn with_bookmarks(mut self, bookmarks: impl IntoIterator<Item = impl Display>) -> Self {
62+
self.bookmarks = bookmarks
63+
.into_iter()
64+
.map(|b| b.to_string())
65+
.collect::<Vec<String>>();
66+
self
67+
}
68+
69+
pub fn with_tx_timeout(mut self, tx_timeout: u32) -> Self {
70+
self.tx_timeout = Some(tx_timeout);
71+
self
72+
}
73+
74+
pub fn with_tx_metadata(mut self, tx_metadata: Vec<(String, String)>) -> Self {
75+
self.tx_metadata = Some(TxMetadata(tx_metadata));
76+
self
77+
}
78+
79+
pub fn with_mode(mut self, mode: &'a str) -> Self {
80+
self.mode = mode;
81+
self
82+
}
83+
84+
pub fn with_imp_user(mut self, imp_user: &'a str) -> Self {
85+
self.imp_user = Some(imp_user);
86+
self
87+
}
88+
89+
pub fn build(self, version: Version) -> Begin<'a> {
90+
match version.cmp(&Version::V4_4) {
91+
std::cmp::Ordering::Less => Begin {
92+
metadata: BeginMeta {
93+
bookmarks: self.bookmarks,
94+
tx_timeout: self.tx_timeout,
95+
tx_metadata: self.tx_metadata,
96+
mode: self.mode,
97+
extra: BeginExtra::V4(self.db),
98+
},
99+
},
100+
_ => Begin {
101+
metadata: BeginMeta {
102+
bookmarks: self.bookmarks,
103+
tx_timeout: self.tx_timeout,
104+
tx_metadata: self.tx_metadata,
105+
mode: self.mode,
106+
extra: BeginExtra::V4_4(Extra {
107+
db: self.db,
108+
imp_user: self.imp_user,
109+
}),
110+
},
111+
},
112+
}
113+
}
114+
}
115+
116+
impl<'a> Begin<'a> {
117+
pub fn builder(db: Option<&'a str>) -> BeginBuilder<'a> {
118+
BeginBuilder::new(db)
119+
}
120+
}
121+
122+
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
123+
pub struct Response {
124+
pub(crate) db: Option<Database>,
125+
}
126+
127+
impl ExpectedResponse for Begin<'_> {
128+
type Response = Summary<Response>;
129+
}
130+
131+
impl Serialize for Begin<'_> {
132+
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
133+
where
134+
S: serde::Serializer,
135+
{
136+
serializer.serialize_newtype_variant("Request", 0x11, "BEGIN", &self.metadata)
137+
}
138+
}
139+
140+
impl Serialize for TxMetadata {
141+
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
142+
where
143+
S: serde::Serializer,
144+
{
145+
let mut map = serializer.serialize_map(Some(self.0.len()))?;
146+
for (k, v) in self.0.iter() {
147+
map.serialize_entry(k, v)?;
148+
}
149+
map.end()
150+
}
151+
}
152+
153+
impl Serialize for BeginMeta<'_> {
154+
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
155+
where
156+
S: serde::Serializer,
157+
{
158+
let mut fields_count = 2; // minimum number of fields for the map
159+
if self.tx_metadata.is_some() {
160+
fields_count += 1;
161+
}
162+
if self.tx_timeout.is_some() {
163+
fields_count += 1;
164+
}
165+
166+
match &self.extra {
167+
BeginExtra::V4(e) => {
168+
if e.is_some() {
169+
fields_count += 1;
170+
}
171+
}
172+
BeginExtra::V4_4(e) => {
173+
if e.db.is_some() {
174+
fields_count += 1;
175+
}
176+
if e.imp_user.is_some() {
177+
fields_count += 1;
178+
}
179+
}
180+
}
181+
182+
let mut map = serializer.serialize_map(Some(fields_count))?;
183+
map.serialize_entry("bookmarks", &self.bookmarks)?;
184+
map.serialize_entry("mode", &self.mode)?;
185+
if let Some(tx_timeout) = self.tx_timeout {
186+
map.serialize_entry("tx_timeout", &tx_timeout)?;
187+
}
188+
if let Some(tx_metadata) = self.tx_metadata.as_ref() {
189+
map.serialize_entry("tx_metadata", tx_metadata)?;
190+
}
191+
match &self.extra {
192+
BeginExtra::V4(db) => {
193+
if let Some(db) = db {
194+
map.serialize_entry("db", db)?;
195+
}
196+
}
197+
BeginExtra::V4_4(extra) => {
198+
if let Some(db) = extra.db.as_ref() {
199+
map.serialize_entry("db", db)?;
200+
}
201+
if let Some(imp_user) = extra.imp_user.as_ref() {
202+
map.serialize_entry("imp_user", imp_user)?;
203+
}
204+
}
205+
}
206+
map.end()
207+
}
208+
}
209+
210+
#[cfg(test)]
211+
mod tests {
212+
use super::Begin;
213+
use crate::bolt::Message;
214+
use crate::packstream::bolt;
215+
use crate::{Database, Version};
216+
217+
#[test]
218+
fn serialize() {
219+
let begin = Begin::builder(None)
220+
.with_bookmarks(vec!["example-bookmark:1", "example-bookmark:2"])
221+
.with_tx_metadata(
222+
[
223+
("user".to_string(), "alice".to_string()),
224+
("action".to_string(), "data_import".to_string()),
225+
]
226+
.to_vec(),
227+
)
228+
.build(Version::V4);
229+
let bytes = begin.to_bytes().unwrap();
230+
231+
let expected = bolt()
232+
.structure(1, 0x11)
233+
.tiny_map(3)
234+
.tiny_string("bookmarks")
235+
.tiny_list(2)
236+
.string8("example-bookmark:1")
237+
.string8("example-bookmark:2")
238+
.tiny_string("mode")
239+
.tiny_string("w")
240+
.tiny_string("tx_metadata")
241+
.tiny_map(2)
242+
.tiny_string("user")
243+
.tiny_string("alice")
244+
.tiny_string("action")
245+
.tiny_string("data_import")
246+
.build();
247+
248+
assert_eq!(bytes, expected);
249+
250+
let db = Some(Database::from("neo4j"));
251+
let begin = Begin::builder(db.as_deref())
252+
.with_bookmarks(vec!["example-bookmark:1", "example-bookmark:2"])
253+
.with_imp_user("my_user")
254+
.build(Version::V4_4);
255+
let bytes = begin.to_bytes().unwrap();
256+
257+
let expected = bolt()
258+
.structure(1, 0x11)
259+
.tiny_map(4)
260+
.tiny_string("bookmarks")
261+
.tiny_list(2)
262+
.string8("example-bookmark:1")
263+
.string8("example-bookmark:2")
264+
.tiny_string("mode")
265+
.tiny_string("w")
266+
.tiny_string("db")
267+
.tiny_string("neo4j")
268+
.tiny_string("imp_user")
269+
.tiny_string("my_user")
270+
.build();
271+
272+
assert_eq!(bytes, expected);
273+
}
274+
}

lib/src/bolt/request/commit.rs

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,22 @@
1-
use serde::Serialize;
1+
use serde::{Deserialize, Serialize};
22

33
use crate::bolt::{ExpectedResponse, Summary};
4+
use crate::bookmarks::Bookmark;
45

56
#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
67
pub struct Commit;
78

9+
#[derive(Debug, Clone, Default, PartialEq, Eq, Deserialize)]
10+
pub struct CommitResponse {
11+
pub bookmark: Option<String>,
12+
}
13+
14+
impl Bookmark for CommitResponse {
15+
fn get_bookmark(&self) -> Option<&str> {
16+
self.bookmark.as_deref()
17+
}
18+
}
19+
820
impl Serialize for Commit {
921
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1022
where
@@ -15,12 +27,13 @@ impl Serialize for Commit {
1527
}
1628

1729
impl ExpectedResponse for Commit {
18-
type Response = Summary<()>;
30+
type Response = Summary<CommitResponse>;
1931
}
2032

2133
#[cfg(test)]
2234
mod tests {
2335
use super::*;
36+
use crate::bolt::MessageResponse;
2437
use crate::{bolt::Message as _, packstream::bolt};
2538

2639
#[test]
@@ -32,4 +45,17 @@ mod tests {
3245

3346
assert_eq!(bytes, expected);
3447
}
48+
49+
#[test]
50+
fn deserialize() {
51+
let data = bolt()
52+
.tiny_map(1)
53+
.string8("bookmark")
54+
.string8("example-bookmark:1")
55+
.build();
56+
let response = CommitResponse::parse(data).unwrap();
57+
58+
assert!(response.bookmark.is_some());
59+
assert_eq!(response.bookmark.unwrap(), "example-bookmark:1");
60+
}
3561
}

lib/src/bolt/request/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
mod begin;
12
mod commit;
23
mod discard;
34
mod extra;
@@ -8,6 +9,7 @@ mod reset;
89
mod rollback;
910
mod route;
1011

12+
pub use begin::Begin;
1113
pub use commit::Commit;
1214
pub use discard::Discard;
1315
pub use extra::WrapExtra;

0 commit comments

Comments
 (0)