Skip to content

Commit 39daab6

Browse files
committed
Further polishing
1 parent 6958ee2 commit 39daab6

File tree

14 files changed

+222
-481
lines changed

14 files changed

+222
-481
lines changed

Cargo.lock

Lines changed: 47 additions & 252 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,8 @@ criterion = { version = "0.5.1", features = ["async", "async_tokio"] }
3232
datafusion = "47.0.0"
3333
dashmap = "6.1.0"
3434
futures = "0.3.31"
35-
getrandom = "0.2.8"
3635
hex = "0.4.3"
37-
js-sys = "0.3.60"
38-
libc = "0.2.147"
3936
md-5 = "0.10.6"
40-
oxhttp = "0.2.0"
4137
predicates = ">=2.0, <4.0"
4238
rand = "0.8"
4339
regex = "1.11.1"
@@ -49,8 +45,8 @@ text-diff = "0.4"
4945
thiserror = ">=1.0.50, <3.0"
5046
time = "0.3"
5147
tokio = { version = "1.44.2", features = ["rt", "rt-multi-thread"] }
48+
tokio-test = "0.4.4"
5249
uuid = "1.14.0"
53-
zstd = ">=0.12, <0.14"
5450

5551
# Upstream Oxigraph dependencies
5652
oxttl = "0.1.7"

cli/Cargo.toml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,6 @@ name = "graphfusion"
1818
path = "src/main.rs"
1919
doc = false
2020

21-
[features]
22-
default = ["native-tls"]
23-
native-tls = ["graphfusion/http-client-native-tls"]
24-
rustls-native = ["graphfusion/http-client-rustls-native"]
25-
rustls-webpki = ["graphfusion/http-client-rustls-webpki"]
26-
2721
[dependencies]
2822
anyhow.workspace = true
2923
clap = { workspace = true, features = ["derive"] }

lib/arrow-rdf/src/encoded/aggregates/max.rs

Lines changed: 43 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@ use crate::encoded::scalars::{encode_scalar_null, encode_scalar_term};
22
use crate::encoded::write_enc_term::WriteEncTerm;
33
use crate::encoded::{EncTerm, FromEncodedTerm};
44
use crate::{as_enc_term_array, DFResult};
5-
use datafusion::arrow::array::{Array, ArrayRef};
5+
use datafusion::arrow::array::{Array, ArrayRef, AsArray};
6+
use datafusion::arrow::datatypes::DataType;
7+
use datafusion::common::exec_err;
68
use datafusion::logical_expr::{create_udaf, AggregateUDF, Volatility};
9+
use datafusion::physical_plan::Accumulator;
710
use datafusion::scalar::ScalarValue;
8-
use datafusion::{error::Result, physical_plan::Accumulator};
911
use model::{InternalTerm, InternalTermRef, ThinError, ThinResult};
1012
use std::sync::{Arc, LazyLock};
1113

@@ -16,47 +18,54 @@ pub static ENC_MAX: LazyLock<AggregateUDF> = LazyLock::new(|| {
1618
Arc::new(EncTerm::data_type()),
1719
Volatility::Immutable,
1820
Arc::new(|_| Ok(Box::new(SparqlMax::new()))),
19-
Arc::new(vec![EncTerm::data_type()]),
21+
Arc::new(vec![DataType::Boolean, EncTerm::data_type()]),
2022
)
2123
});
2224

2325
#[derive(Debug)]
2426
struct SparqlMax {
25-
max: ThinResult<InternalTerm>,
2627
executed_once: bool,
28+
max: ThinResult<InternalTerm>,
2729
}
2830

2931
impl SparqlMax {
3032
pub fn new() -> Self {
3133
SparqlMax {
32-
max: ThinError::expected(),
3334
executed_once: false,
35+
max: ThinError::expected(),
36+
}
37+
}
38+
39+
fn on_new_value(&mut self, value: ThinResult<InternalTermRef<'_>>) {
40+
if !self.executed_once {
41+
self.max = value.map(InternalTermRef::into_owned);
42+
self.executed_once = true;
43+
} else if let Ok(min) = self.max.as_ref() {
44+
if let Ok(value) = value {
45+
if min.as_ref() < value {
46+
self.max = Ok(value.into_owned());
47+
}
48+
}
3449
}
3550
}
3651
}
3752

3853
impl Accumulator for SparqlMax {
39-
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
54+
fn update_batch(&mut self, values: &[ArrayRef]) -> DFResult<()> {
4055
if values.is_empty() {
4156
return Ok(());
4257
}
43-
// TODO: Can we stop once we error?
58+
59+
// If we already have an error, we can simply stop doing anything.
60+
if self.executed_once && self.max.is_err() {
61+
return Ok(());
62+
}
4463

4564
let arr = as_enc_term_array(&values[0])?;
4665

4766
for i in 0..arr.len() {
4867
let value = InternalTermRef::from_enc_array(arr, i);
49-
50-
if !self.executed_once {
51-
self.max = value.map(InternalTermRef::into_owned);
52-
self.executed_once = true;
53-
} else if let Ok(min) = self.max.as_ref() {
54-
if let Ok(value) = value {
55-
if min.as_ref() < value {
56-
self.max = Ok(value.into_owned());
57-
}
58-
}
59-
}
68+
self.on_new_value(value);
6069
}
6170

6271
Ok(())
@@ -79,10 +88,23 @@ impl Accumulator for SparqlMax {
7988
Ok(value) => encode_scalar_term(value.into_decoded().as_ref())?,
8089
Err(_) => encode_scalar_null(),
8190
};
82-
Ok(vec![value])
91+
Ok(vec![ScalarValue::Boolean(Some(self.executed_once)), value])
8392
}
8493

85-
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
86-
self.update_batch(states)
94+
fn merge_batch(&mut self, states: &[ArrayRef]) -> DFResult<()> {
95+
if states.len() != 2 {
96+
return exec_err!("Unexpected number of states.");
97+
}
98+
99+
let executed_once = states[0].as_boolean();
100+
let terms = as_enc_term_array(&states[1])?;
101+
for i in 0..states[0].len() {
102+
if executed_once.value(i) {
103+
let value = InternalTermRef::from_enc_array(terms, i);
104+
self.on_new_value(value);
105+
}
106+
}
107+
108+
Ok(())
87109
}
88110
}

lib/arrow-rdf/src/encoded/aggregates/min.rs

Lines changed: 41 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ use crate::encoded::scalars::{encode_scalar_null, encode_scalar_term};
22
use crate::encoded::write_enc_term::WriteEncTerm;
33
use crate::encoded::{EncTerm, FromEncodedTerm};
44
use crate::{as_enc_term_array, DFResult};
5-
use datafusion::arrow::array::{Array, ArrayRef};
5+
use datafusion::arrow::array::{Array, ArrayRef, AsArray};
6+
use datafusion::arrow::datatypes::DataType;
7+
use datafusion::common::exec_err;
68
use datafusion::logical_expr::{create_udaf, AggregateUDF, Volatility};
79
use datafusion::physical_plan::Accumulator;
810
use datafusion::scalar::ScalarValue;
@@ -16,21 +18,34 @@ pub static ENC_MIN: LazyLock<AggregateUDF> = LazyLock::new(|| {
1618
Arc::new(EncTerm::data_type()),
1719
Volatility::Immutable,
1820
Arc::new(|_| Ok(Box::new(SparqlMin::new()))),
19-
Arc::new(vec![EncTerm::data_type()]),
21+
Arc::new(vec![DataType::Boolean, EncTerm::data_type()]),
2022
)
2123
});
2224

2325
#[derive(Debug)]
2426
struct SparqlMin {
25-
min: ThinResult<InternalTerm>,
2627
executed_once: bool,
28+
min: ThinResult<InternalTerm>,
2729
}
2830

2931
impl SparqlMin {
3032
pub fn new() -> Self {
3133
SparqlMin {
32-
min: ThinError::expected(),
3334
executed_once: false,
35+
min: ThinError::expected(),
36+
}
37+
}
38+
39+
fn on_new_value(&mut self, value: ThinResult<InternalTermRef<'_>>) {
40+
if !self.executed_once {
41+
self.min = value.map(InternalTermRef::into_owned);
42+
self.executed_once = true;
43+
} else if let Ok(min) = self.min.as_ref() {
44+
if let Ok(value) = value {
45+
if value < min.as_ref() {
46+
self.min = Ok(value.into_owned());
47+
}
48+
}
3449
}
3550
}
3651
}
@@ -40,23 +55,17 @@ impl Accumulator for SparqlMin {
4055
if values.is_empty() {
4156
return Ok(());
4257
}
43-
let arr = as_enc_term_array(&values[0])?;
4458

45-
// TODO: Can we stop once we error?
59+
// If we already have an error, we can simply stop doing anything.
60+
if self.executed_once && self.min.is_err() {
61+
return Ok(());
62+
}
63+
64+
let arr = as_enc_term_array(&values[0])?;
4665

4766
for i in 0..arr.len() {
4867
let value = InternalTermRef::from_enc_array(arr, i);
49-
50-
if !self.executed_once {
51-
self.min = value.map(InternalTermRef::into_owned);
52-
self.executed_once = true;
53-
} else if let Ok(min) = self.min.as_ref() {
54-
if let Ok(value) = value {
55-
if value < min.as_ref() {
56-
self.min = Ok(value.into_owned());
57-
}
58-
}
59-
}
68+
self.on_new_value(value);
6069
}
6170

6271
Ok(())
@@ -79,10 +88,23 @@ impl Accumulator for SparqlMin {
7988
Ok(value) => encode_scalar_term(value.into_decoded().as_ref())?,
8089
Err(_) => encode_scalar_null(),
8190
};
82-
Ok(vec![value])
91+
Ok(vec![ScalarValue::Boolean(Some(self.executed_once)), value])
8392
}
8493

8594
fn merge_batch(&mut self, states: &[ArrayRef]) -> DFResult<()> {
86-
self.update_batch(states)
95+
if states.len() != 2 {
96+
return exec_err!("Unexpected number of states.");
97+
}
98+
99+
let executed_once = states[0].as_boolean();
100+
let terms = as_enc_term_array(&states[1])?;
101+
for i in 0..states[0].len() {
102+
if executed_once.value(i) {
103+
let value = InternalTermRef::from_enc_array(terms, i);
104+
self.on_new_value(value);
105+
}
106+
}
107+
108+
Ok(())
87109
}
88110
}

lib/graphfusion-engine/src/results/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ impl QueryResults {
5757
/// use graphfusion::model::*;
5858
/// use graphfusion::sparql::results::QueryResultsFormat;
5959
///
60-
/// let store = Store::new()?;
60+
/// let store = Store::new();
6161
/// let ex = NamedNodeRef::new("http://example.com")?;
6262
/// store.insert(QuadRef::new(ex, ex, ex, GraphNameRef::DefaultGraph))?;
6363
///
@@ -126,7 +126,7 @@ impl QueryResults {
126126
///
127127
/// let graph = "<http://example.com> <http://example.com> <http://example.com> .\n";
128128
///
129-
/// let store = Store::new()?;
129+
/// let store = Store::new();
130130
/// store.load_graph(
131131
/// graph.as_bytes(),
132132
/// RdfFormat::NTriples,

lib/graphfusion-engine/src/results/query_solution.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use std::task::{ready, Context, Poll};
1616
/// use graphfusion::sparql::QueryResults;
1717
/// use graphfusion::store::Store;
1818
///
19-
/// let store = Store::new()?;
19+
/// let store = Store::new();
2020
/// if let QueryResults::Solutions(solutions) = store.query("SELECT ?s WHERE { ?s ?p ?o }")? {
2121
/// for solution in solutions {
2222
/// println!("{:?}", solution?.get("s"));
@@ -47,7 +47,7 @@ impl QuerySolutionStream {
4747
/// use graphfusion::sparql::{QueryResults, Variable};
4848
/// use graphfusion::store::Store;
4949
///
50-
/// let store = Store::new()?;
50+
/// let store = Store::new();
5151
/// if let QueryResults::Solutions(solutions) = store.query("SELECT ?s ?o WHERE { ?s ?p ?o }")? {
5252
/// assert_eq!(
5353
/// solutions.variables(),

lib/graphfusion-engine/src/results/triples.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use std::task::{ready, Context, Poll};
1414
/// use graphfusion::sparql::QueryResults;
1515
/// use graphfusion::store::Store;
1616
///
17-
/// let store = Store::new()?;
17+
/// let store = Store::new();
1818
/// if let QueryResults::Graph(triples) = store.query("CONSTRUCT WHERE { ?s ?p ?o }")? {
1919
/// for triple in triples {
2020
/// println!("{}", triple?);

lib/graphfusion-engine/src/sparql/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ pub use spargebra::SparqlSyntaxError;
2626
/// use graphfusion::sparql::QueryOptions;
2727
/// use graphfusion::store::Store;
2828
///
29-
/// let store = Store::new()?;
29+
/// let store = Store::new();
3030
/// store.query_opt(
3131
/// "SELECT * WHERE { SERVICE <https://query.wikidata.org/sparql> {} }",
3232
/// QueryOptions::default().without_service_handler(),

lib/graphfusion/Cargo.toml

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,6 @@ a SPARQL database and RDF toolkit
1515
edition.workspace = true
1616
rust-version.workspace = true
1717

18-
[features]
19-
default = ["storage"]
20-
http-client = ["oxhttp"]
21-
http-client-native-tls = ["http-client", "oxhttp/native-tls"]
22-
http-client-rustls-webpki = ["http-client", "oxhttp/rustls-ring-webpki"]
23-
http-client-rustls-native = ["http-client", "oxhttp/rustls-ring-native"]
24-
storage = []
25-
2618
[dependencies]
2719
model.workspace = true
2820
futures.workspace = true
@@ -34,18 +26,9 @@ thiserror.workspace = true
3426
tokio.workspace = true
3527
sparesults.workspace = true
3628

37-
[target.'cfg(not(target_family = "wasm"))'.dependencies]
38-
libc.workspace = true
39-
oxhttp = { workspace = true, optional = true }
40-
41-
[target.'cfg(all(target_family = "wasm", target_os = "unknown"))'.dependencies]
42-
getrandom.workspace = true
43-
js-sys = { workspace = true, optional = true }
44-
45-
[target.'cfg(not(target_family = "wasm"))'.dev-dependencies]
29+
[dev-dependencies]
30+
tokio-test.workspace = true
4631
criterion.workspace = true
47-
oxhttp.workspace = true
48-
zstd.workspace = true
4932

5033
[lints]
5134
workspace = true

0 commit comments

Comments
 (0)