Skip to content

Commit 1af1e52

Browse files
committed
Work on doc tests
1 parent 39daab6 commit 1af1e52

File tree

18 files changed

+423
-363
lines changed

18 files changed

+423
-363
lines changed

lib/arrow-rdf/src/encoded/aggregates/max.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ impl Accumulator for SparqlMax {
9191
Ok(vec![ScalarValue::Boolean(Some(self.executed_once)), value])
9292
}
9393

94+
#[allow(clippy::missing_asserts_for_indexing)]
9495
fn merge_batch(&mut self, states: &[ArrayRef]) -> DFResult<()> {
9596
if states.len() != 2 {
9697
return exec_err!("Unexpected number of states.");

lib/arrow-rdf/src/encoded/aggregates/min.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ impl Accumulator for SparqlMin {
9191
Ok(vec![ScalarValue::Boolean(Some(self.executed_once)), value])
9292
}
9393

94+
#[allow(clippy::missing_asserts_for_indexing)]
9495
fn merge_batch(&mut self, states: &[ArrayRef]) -> DFResult<()> {
9596
if states.len() != 2 {
9697
return exec_err!("Unexpected number of states.");

lib/arrow-rdf/src/encoded/builder.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,7 @@ use datafusion::arrow::array::{
99
use datafusion::arrow::buffer::ScalarBuffer;
1010
use datafusion::arrow::error::ArrowError;
1111
use model::vocab::{rdf, xsd};
12-
use model::{
13-
BlankNodeRef, Date, DateTime, DayTimeDuration, Time, Timestamp, YearMonthDuration,
14-
};
12+
use model::{BlankNodeRef, Date, DateTime, DayTimeDuration, Time, Timestamp, YearMonthDuration};
1513
use model::{Decimal, Double, Float, Int, Integer, Iri, Literal, Term};
1614
use std::sync::Arc;
1715

lib/arrow-rdf/src/encoded/encoding/with_struct_encoding.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,10 +121,7 @@ mod tests {
121121
use datafusion::arrow::array::{Array, AsArray};
122122
use datafusion::logical_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl};
123123
use model::vocab::xsd;
124-
use model::{
125-
BlankNode, Date, DayTimeDuration, InternalTermRef, Timestamp,
126-
YearMonthDuration,
127-
};
124+
use model::{BlankNode, Date, DayTimeDuration, InternalTermRef, Timestamp, YearMonthDuration};
128125
use std::sync::Arc;
129126

130127
#[test]

lib/graphfusion-engine/src/quad_storage.rs

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
use crate::DFResult;
1+
use crate::error::StorageError;
22
use async_trait::async_trait;
33
use datafusion::datasource::TableProvider;
4-
use model::{Quad, QuadRef};
4+
use model::{GraphNameRef, NamedOrBlankNode, NamedOrBlankNodeRef, Quad, QuadRef};
55
use std::sync::Arc;
66

77
#[async_trait]
@@ -16,8 +16,35 @@ pub trait QuadStorage: Send + Sync {
1616
fn table_provider(&self) -> Arc<dyn TableProvider>;
1717

1818
/// Loads the given quads into the storage.
19-
async fn load_quads(&self, quads: Vec<Quad>) -> DFResult<usize>;
19+
async fn extend(&self, quads: Vec<Quad>) -> Result<usize, StorageError>;
20+
21+
/// Creates an empty named graph in the storage.
22+
async fn insert_named_graph<'a>(
23+
&self,
24+
graph_name: NamedOrBlankNodeRef<'a>,
25+
) -> Result<bool, StorageError>;
26+
27+
/// Returns the list of named graphs in the storage.
28+
async fn named_graphs(&self) -> Result<Vec<NamedOrBlankNode>, StorageError>;
29+
30+
/// Returns whether `graph_name` is a named graph in the storage.
31+
async fn contains_named_graph<'a>(
32+
&self,
33+
graph_name: NamedOrBlankNodeRef<'a>,
34+
) -> Result<bool, StorageError>;
35+
36+
/// Clears the entire storage.
37+
async fn clear(&self) -> Result<(), StorageError>;
38+
39+
/// Clears the entire graph.
40+
async fn clear_graph<'a>(&self, graph_name: GraphNameRef<'a>) -> Result<(), StorageError>;
41+
42+
/// Removes the entire named graph from the storage.
43+
async fn remove_named_graph(
44+
&self,
45+
graph_name: NamedOrBlankNodeRef<'_>,
46+
) -> Result<bool, StorageError>;
2047

2148
/// Removes the given quad from the storage.
22-
async fn remove<'a>(&self, quad: QuadRef<'_>) -> DFResult<bool>;
49+
async fn remove(&self, quad: QuadRef<'_>) -> Result<bool, StorageError>;
2350
}
Lines changed: 73 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,82 @@
1-
use crate::error::StorageError;
2-
use model::NamedOrBlankNode;
1+
use crate::results::QuerySolutionStream;
2+
use crate::sparql::error::QueryEvaluationError;
3+
use crate::DFResult;
4+
use arrow_rdf::encoded::EncTerm;
5+
use arrow_rdf::COL_GRAPH;
6+
use datafusion::common::exec_err;
7+
use datafusion::execution::SendableRecordBatchStream;
8+
use futures::{Stream, StreamExt};
9+
use model::{NamedOrBlankNode, Term, Variable};
10+
use std::pin::Pin;
11+
use std::sync::Arc;
12+
use std::task::{ready, Context, Poll};
313

414
/// An iterator returning the graph names contained in a [`Store`].
5-
pub struct GraphNameStream;
15+
pub struct GraphNameStream {
16+
stream: QuerySolutionStream,
17+
}
18+
19+
impl GraphNameStream {
20+
/// Creates a new [GraphNameStream] based on a [SendableRecordBatchStream].
21+
///
22+
/// # Errors
23+
///
24+
/// Returns an error if the schema has not exactly one field with the data type of encoded
25+
/// terms.
26+
pub fn try_new(stream: SendableRecordBatchStream) -> DFResult<Self> {
27+
if stream.schema().fields.len() != 1 {
28+
return exec_err!("Unexpected number of columns in the result");
29+
}
30+
31+
if stream.schema().field(0).data_type() != &EncTerm::data_type() {
32+
return exec_err!("Unexpected data type in the result");
33+
}
34+
35+
let solutions_stream =
36+
QuerySolutionStream::new(Arc::new([Variable::new_unchecked(COL_GRAPH)]), stream);
37+
Ok(Self {
38+
stream: solutions_stream,
39+
})
40+
}
41+
42+
pub async fn try_collect_to_vec(
43+
mut self,
44+
) -> Result<Vec<NamedOrBlankNode>, QueryEvaluationError> {
45+
let mut result = Vec::new();
46+
while let Some(element) = self.next().await {
47+
result.push(element?);
48+
}
49+
Ok(result)
50+
}
51+
}
52+
53+
impl Stream for GraphNameStream {
54+
type Item = Result<NamedOrBlankNode, QueryEvaluationError>;
655

7-
impl Iterator for GraphNameStream {
8-
type Item = Result<NamedOrBlankNode, StorageError>;
56+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
57+
let Some(inner) = ready!(self.stream.poll_next_unpin(cx)) else {
58+
return Poll::Ready(None);
59+
};
960

10-
#[allow(clippy::unimplemented, reason = "Not production ready")]
11-
fn next(&mut self) -> Option<Self::Item> {
12-
unimplemented!()
61+
let graph_name = inner
62+
.and_then(|s| {
63+
s.get(COL_GRAPH)
64+
.cloned()
65+
.ok_or(QueryEvaluationError::InternalError(
66+
"Missing graph name".to_owned(),
67+
))
68+
})
69+
.and_then(|g| match g {
70+
Term::NamedNode(nnode) => Ok(NamedOrBlankNode::from(nnode)),
71+
Term::BlankNode(bnode) => Ok(NamedOrBlankNode::from(bnode)),
72+
Term::Literal(_) => Err(QueryEvaluationError::InternalError(
73+
"Graph name was a literal.".to_owned(),
74+
)),
75+
});
76+
Poll::Ready(Some(graph_name))
1377
}
1478

15-
#[allow(clippy::unimplemented, reason = "Not production ready")]
1679
fn size_hint(&self) -> (usize, Option<usize>) {
17-
unimplemented!()
80+
self.stream.size_hint()
1881
}
1982
}

lib/graphfusion-engine/src/results/mod.rs

Lines changed: 0 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -51,23 +51,6 @@ impl QueryResults {
5151
/// Writes the query results (solutions or boolean).
5252
///
5353
/// This method fails if it is called on the `Graph` results.
54-
///
55-
/// ```
56-
/// use graphfusion::store::Store;
57-
/// use graphfusion::model::*;
58-
/// use graphfusion::sparql::results::QueryResultsFormat;
59-
///
60-
/// let store = Store::new();
61-
/// let ex = NamedNodeRef::new("http://example.com")?;
62-
/// store.insert(QuadRef::new(ex, ex, ex, GraphNameRef::DefaultGraph))?;
63-
///
64-
/// let results = store.query("SELECT ?s WHERE { ?s ?p ?o }")?;
65-
/// assert_eq!(
66-
/// results.write(Vec::new(), QueryResultsFormat::Json)?,
67-
/// r#"{"head":{"vars":["s"]},"results":{"bindings":[{"s":{"type":"uri","value":"http://example.com"}}]}}"#.as_bytes()
68-
/// );
69-
/// # Result::<_, Box<dyn std::error::Error>>::Ok(())
70-
/// ```
7154
pub async fn write<W: Write>(
7255
self,
7356
writer: W,
@@ -118,29 +101,6 @@ impl QueryResults {
118101
/// Writes the graph query results.
119102
///
120103
/// This method fails if it is called on the `Solution` or `Boolean` results.
121-
///
122-
/// ```
123-
/// use graphfusion::io::RdfFormat;
124-
/// use graphfusion::model::*;
125-
/// use graphfusion::store::Store;
126-
///
127-
/// let graph = "<http://example.com> <http://example.com> <http://example.com> .\n";
128-
///
129-
/// let store = Store::new();
130-
/// store.load_graph(
131-
/// graph.as_bytes(),
132-
/// RdfFormat::NTriples,
133-
/// GraphName::DefaultGraph,
134-
/// None,
135-
/// )?;
136-
///
137-
/// let results = store.query("CONSTRUCT WHERE { ?s ?p ?o }")?;
138-
/// assert_eq!(
139-
/// results.write_graph(Vec::new(), RdfFormat::NTriples)?,
140-
/// graph.as_bytes()
141-
/// );
142-
/// # Result::<_, Box<dyn std::error::Error>>::Ok(())
143-
/// ```
144104
pub async fn write_graph<W: Write>(
145105
self,
146106
writer: W,

lib/graphfusion-engine/src/results/quads.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ pub struct QuadStream {
1313
}
1414

1515
impl QuadStream {
16-
pub async fn try_collect(mut self) -> Result<Vec<Quad>, QueryEvaluationError> {
16+
pub async fn try_collect_to_vec(mut self) -> Result<Vec<Quad>, QueryEvaluationError> {
1717
let mut result = Vec::new();
1818
while let Some(element) = self.next().await {
1919
result.push(element?);

lib/graphfusion-engine/src/results/query_solution.rs

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,7 @@ use std::pin::Pin;
1010
use std::sync::Arc;
1111
use std::task::{ready, Context, Poll};
1212

13-
/// An iterator over [`QuerySolution`]s.
14-
///
15-
/// ```
16-
/// use graphfusion::sparql::QueryResults;
17-
/// use graphfusion::store::Store;
18-
///
19-
/// let store = Store::new();
20-
/// if let QueryResults::Solutions(solutions) = store.query("SELECT ?s WHERE { ?s ?p ?o }")? {
21-
/// for solution in solutions {
22-
/// println!("{:?}", solution?.get("s"));
23-
/// }
24-
/// }
25-
/// # Result::<_, Box<dyn std::error::Error>>::Ok(())
13+
/// A stream over [`QuerySolution`]s.
2614
/// ```
2715
pub struct QuerySolutionStream {
2816
variables: Arc<[Variable]>,
@@ -42,20 +30,6 @@ impl QuerySolutionStream {
4230
}
4331

4432
/// The variables used in the solutions.
45-
///
46-
/// ```
47-
/// use graphfusion::sparql::{QueryResults, Variable};
48-
/// use graphfusion::store::Store;
49-
///
50-
/// let store = Store::new();
51-
/// if let QueryResults::Solutions(solutions) = store.query("SELECT ?s ?o WHERE { ?s ?p ?o }")? {
52-
/// assert_eq!(
53-
/// solutions.variables(),
54-
/// &[Variable::new("s")?, Variable::new("o")?]
55-
/// );
56-
/// }
57-
/// # Result::<_, Box<dyn std::error::Error>>::Ok(())
58-
/// ```
5933
#[inline]
6034
pub fn variables(&self) -> &[Variable] {
6135
self.variables.as_ref()

lib/graphfusion-engine/src/results/triples.rs

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,7 @@ use std::collections::{HashMap, HashSet};
88
use std::pin::Pin;
99
use std::task::{ready, Context, Poll};
1010

11-
/// An iterator over the triples that compose a graph solution.
12-
///
13-
/// ```
14-
/// use graphfusion::sparql::QueryResults;
15-
/// use graphfusion::store::Store;
16-
///
17-
/// let store = Store::new();
18-
/// if let QueryResults::Graph(triples) = store.query("CONSTRUCT WHERE { ?s ?p ?o }")? {
19-
/// for triple in triples {
20-
/// println!("{}", triple?);
21-
/// }
22-
/// }
23-
/// # Result::<_, Box<dyn std::error::Error>>::Ok(())
24-
/// ```
11+
/// A stream over the triples that compose a graph solution.
2512
pub struct QueryTripleStream {
2613
template: Vec<TriplePattern>,
2714
inner: QuerySolutionStream,

0 commit comments

Comments
 (0)